PhpAmqpLib: проблема разрыва соединения

Всем привет 😉

Какое-то время назад при работе с RabbitMQ посредством PhpAmqpLib в Symfony иногда возникали странные ошибки вида

Или схожая

Причина, в принципе, у этих ошибок одна: разрыв соединения между воркером, обрабатывающем сообщение («задачу») от рэббита, и самим рэбитом.

Перед тем, ка предложить решение, объясню, почему это происходит. Причины может быть 2, и обе кроются в том, что PHP — язык не многопоточный, и пока воркер чем-то очень сильно занят (очень объемная задача), соединение простаивает. И тут происходит одно из двух:

  1. У вас настроен Heartbeat (>0). Heartbeat — это своего рода ping в протоколе AMQP. Клиент, то есть воркер, при запуске сообщает серверу свой heartbeat (например, 60). Это означает, что раз в 60 секунд клиент будет слать серверу «сердцебиение». Как только сервер понимает, что было пропущено два таких сердцебиения — он разрывает соединение и возвращает задачу обратно себе. При этом воркер об этом еще не знает и спокойно продолжает выполнять задачу, а исключение он выбросит только при следующей попытке взаимодействия с сервером рэббита.
  2. Казалось бы, есть очевидное решение: поставить heartbeat == 0, тем самым отключив эту проверку на сердцебиение. Частично это может решить проблему, так как сервер RabbitMQ не будет обрубать соединение. Но возникнет другая проблема: сам Linux имеет свойство закрывать неактивные соединения.

Соотв-но, логичное решение: как-то хранить соединение активным. И в этом нам поможет все тот же Heartbeat, поскольку, если я не ошибаюсь, это единственный пакет в протоколе AMQP, который можно безболезненно слать серверу по открытому соединению.

К сожалению, в API PhpAmqpLib нет публичных методов, которые можно «дернуть», чтобы послать принудительно этот самый heartbeat, поэтому остается единственный вариант — скопипастить код отправки heartbeat’а из нутрей библиотеки.

Итак, если речь о Symfony и бандле RabbitMQ-Bundle, мы имеем свои Consumer’ы, которые обрабатывают определенные очереди и реализуют интерфейс ConsumerInterface. У этого интерфейса единственный метод: execute, в который передается объект класса AMQPMessage. Именно через него мы и сможем получить соединение к серверу и слать в него принудительный heartbeat. Надеюсь, все ваши Consumer’ы унаследованы от вашего общего базового класса, называемого, допустим, BaseRabbitConsumer.

Чтобы реализовать описанную выше схему, нужно в базовом execute() сохранить передаваемый AMQPMessage в свойство этого класса, а также добавить метод со следующим содержимым:

Где  $this->messasge  — это сохраненный AMQPMessage.

Остается дело за малым: найти в ваших воркерах «уязвимые» места, то есть те места, которые занимают основные временные затраты, и после каждого такого места вызывать $this->keepAlive() , чтобы держать соединение «на плаву». Это может быть, например, конец каждого цикла, если суть задачи заключается в массовой обработке каких-то данных.

Также, метод keepAlive можно доработать, сделав в нем ограничение на кол-во вызовов в определенный промежуток времени. Чтобы, даже если он будет «дергаться» по 100 раз в секунду, по факту heartbeat слался, скажем, не чаще одного раза в 10 секунд. Надеюсь, такая доработка не вызовет сложностей, приводить не буду.

Кто-то может сказать, что такое решение — костыльное, и я отчасти соглашусь. Но по крайней мере это решило нашу проблему и я не смог придумать что-то лучше, учитывая, что в сети готового решения данной проблемы не нашлось 🙂

Всем спасибо за внимание и пока 🙂 Надеюсь, еще что-нибудь напишу в блоге, а то что-то забросил совсем.

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Лимит времени истёк. Пожалуйста, перезагрузите CAPTCHA.