Всем привет 😉
Какое-то время назад при работе с RabbitMQ посредством PhpAmqpLib в Symfony иногда возникали странные ошибки вида
1 |
fwrite(): Send of 123 bytes failed with errno-104 - Connection reset by peer |
Или схожая
1 |
fwrite(): Send of 123 bytes failed with errno-32 - Broken pipe |
Причина, в принципе, у этих ошибок одна: разрыв соединения между воркером, обрабатывающем сообщение («задачу») от рэббита, и самим рэбитом.
Перед тем, ка предложить решение, объясню, почему это происходит. Причины может быть 2, и обе кроются в том, что PHP — язык не многопоточный, и пока воркер чем-то очень сильно занят (очень объемная задача), соединение простаивает. И тут происходит одно из двух:
- У вас настроен Heartbeat (>0). Heartbeat — это своего рода ping в протоколе AMQP. Клиент, то есть воркер, при запуске сообщает серверу свой heartbeat (например, 60). Это означает, что раз в 60 секунд клиент будет слать серверу «сердцебиение». Как только сервер понимает, что было пропущено два таких сердцебиения — он разрывает соединение и возвращает задачу обратно себе. При этом воркер об этом еще не знает и спокойно продолжает выполнять задачу, а исключение он выбросит только при следующей попытке взаимодействия с сервером рэббита.
- Казалось бы, есть очевидное решение: поставить heartbeat == 0, тем самым отключив эту проверку на сердцебиение. Частично это может решить проблему, так как сервер RabbitMQ не будет обрубать соединение. Но возникнет другая проблема: сам Linux имеет свойство закрывать неактивные соединения.
Соотв-но, логичное решение: как-то хранить соединение активным. И в этом нам поможет все тот же Heartbeat, поскольку, если я не ошибаюсь, это единственный пакет в протоколе AMQP, который можно безболезненно слать серверу по открытому соединению.
К сожалению, в API PhpAmqpLib нет публичных методов, которые можно «дернуть», чтобы послать принудительно этот самый heartbeat, поэтому остается единственный вариант — скопипастить код отправки heartbeat’а из нутрей библиотеки.
Итак, если речь о Symfony и бандле RabbitMQ-Bundle, мы имеем свои Consumer’ы, которые обрабатывают определенные очереди и реализуют интерфейс ConsumerInterface. У этого интерфейса единственный метод: execute, в который передается объект класса AMQPMessage. Именно через него мы и сможем получить соединение к серверу и слать в него принудительный heartbeat. Надеюсь, все ваши Consumer’ы унаследованы от вашего общего базового класса, называемого, допустим, BaseRabbitConsumer.
Чтобы реализовать описанную выше схему, нужно в базовом execute() сохранить передаваемый AMQPMessage в свойство этого класса, а также добавить метод со следующим содержимым:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
/** * Принудительно посылает в канал Heartbeat-пакет */ public function keepAlive() { if (!isset($this->message->delivery_info['channel'])) { return; } /** @var AMQPChannel $channel */ $channel = $this->message->delivery_info['channel']; $pkt = new AMQPWriter(); $pkt->write_octet(8); $pkt->write_short(0); $pkt->write_long(0); $pkt->write_octet(0xCE); $channel->getConnection()->write($pkt->getvalue()); } |
Где $this->messasge — это сохраненный AMQPMessage.
Остается дело за малым: найти в ваших воркерах «уязвимые» места, то есть те места, которые занимают основные временные затраты, и после каждого такого места вызывать $this->keepAlive() , чтобы держать соединение «на плаву». Это может быть, например, конец каждого цикла, если суть задачи заключается в массовой обработке каких-то данных.
Также, метод keepAlive можно доработать, сделав в нем ограничение на кол-во вызовов в определенный промежуток времени. Чтобы, даже если он будет «дергаться» по 100 раз в секунду, по факту heartbeat слался, скажем, не чаще одного раза в 10 секунд. Надеюсь, такая доработка не вызовет сложностей, приводить не буду.
Кто-то может сказать, что такое решение — костыльное, и я отчасти соглашусь. Но по крайней мере это решило нашу проблему и я не смог придумать что-то лучше, учитывая, что в сети готового решения данной проблемы не нашлось 🙂
Всем спасибо за внимание и пока 🙂 Надеюсь, еще что-нибудь напишу в блоге, а то что-то забросил совсем.