Использование Rabbitmq С Monstermq, Часть 3

В нашем предыдущая статья Мы создали очередь задач.

Предполагается, что одна задача в виде сообщения доставляется одному получателю.

В этой статье мы поступим по-другому: отправим одно сообщение нескольким получателям одновременно.

Мы создадим систему логирования, которая будет состоять из двух программ: первая будет отправлять сообщения, а вторая — получать и отображать их.

В нашей системе все работающие получатели получат сообщение, отправленное отправителем.

Оглавление всех частей: Часть 1 Часть 2 Часть 3 Часть 4 Часть 5



Обмены

В предыдущих статьях мы отправляли и получали сообщения из очередей, теперь пришло время представить полную модель пересылки сообщений в RabbitMQ. Давайте быстро пробежимся по тому, что мы рассмотрели в предыдущих статьях:
  • Producer — приложение, которое отправляет сообщения
  • Очередь — буфер, в котором хранятся сообщения
  • Потребитель — приложение, которое получает и обрабатывает сообщения.

Ключевая идея RabbitMQ (точнее AMQP) заключается в том, что Производитель никогда не отправляет сообщение напрямую в очередь.

Более того, он даже не знает, попало ли оно в очередь.

Вместо этого отправитель отправляет сообщения обменникам (Exchange).

Обменник — очень простая штука, он делает две вещи: принимает сообщения от отправителей и перенаправляет их в очередь.

Обменники бывают разных типов: одни отправляют сообщения в определенную очередь (прямой тип), другие отправляют одни и те же сообщения сразу в несколько очередей (тип разветвления), третьи перенаправляют сообщения в очереди на основе определенных, заданных правил перенаправления (тип темы).

.



Использование RabbitMQ с MonsterMQ, часть 3

(изображение взято с Официальный сайт RabbitMQ ) Давайте рассмотрим такой обменник, как fanout. Для его создания напишем следующий код:

  
  
  
  
  
  
  
  
  
  
  
  
  
  
   

$producer->newFanoutExchange('logs');

Обменник Fanout работает по очень простой схеме; он просто отправляет полученные сообщения во все связанные с ним очереди.

Чтобы получить список всех обменников, существующих на сервере, позвоните по телефону кроликmqctl

sudo rabbitmqctl list_exchanges

В этот список войдут amq.* обменники и стандартный безымянный (пустая строка) обменник.

Не обращайте на них внимания; они нам пока не нужны.

В прошлой статье мы ничего не знали об обменниках, но тем не менее смогли отправлять сообщения.

Это произошло потому, что если в MonsterMQ явно не указать обменник в качестве третьего аргумента метода Продюсер::публикация() библиотека будет использовать стандартный безымянный обменник RabbitMQ, который отправляет сообщения в очереди, имена которых передаются в качестве второго аргумента метода Продюсер::публикация() .

Теперь мы можем отправлять сообщения нашему новому обменнику, указав его имя в качестве третьего аргумента метода.

Продюсер::публикация() :

$producer->publish($message, '', 'logs');

Теперь давайте создадим двух воркеров с двумя очередями, которые будут существовать до тех пор, пока активны объявившие их воркеры.

Вы можете сделать это следующим образом:

//Worker 1 $producer->queue('queue-1')->setExclusive()->declare();



//Worker 2 $producer->queue('queue-2')->setExclusive()->declare();

Теперь, когда воркер, анонсировавший очередь, завершает сессию и отключается, она автоматически удаляется.

Если оба работника завершают работу, обе очереди удаляются.



Привязка очереди к обменнику

Мы уже создали обменник и очередь.

Теперь нам остается только указать обменнику отправлять полученные сообщения в наши очереди.

Нам нужно связывать обменник и очереди.

Для этого напишем следующий код:

//Worker 1 $producer->queue('queue-1')->bind('logs');



//Worker 2 $producer->queue('queue-2')->bind('logs');

С этого момента наш обменник журналы подключен к нашим очередям и будет пересылать в них полученные сообщения.

Вы можете перечислить все существующие соединения с помощью следующей команды в Linux

rabbitmqctl list_bindings



Собираем код вместе

Наш скрипт отправителя не претерпел особых изменений по сравнению с предыдущим уроком.

Основное отличие в том, что теперь он отправляет сообщения на заранее анонсированный обменник, а не на стандартный (доступен из коробки).

Вот код send.php

try { $producer = \MonsterMQ\Client\Producer(); $producer->connect('127.0.0.1', 5672); $producer->logIn('guest', 'guest'); $producer->newFanoutExchange('logs'); $message = implode(' ', array_slice($argv, 1)); $message = empty($message) ? 'Hello world!' : $message; $producer->publish($message, '', 'logs'); echo "\n Sent {$message} \n"; } catch(\Exception $e) { var_dump($e); }

Стоит отметить, что при отправке сообщения на обменник, не привязанный ни к одной очереди, сообщение будет потеряно.

Но пока нас это устраивает, поскольку мы еще не запустили наших получателей.

Вот код нашего первого работника рабочий-1.php

try { $consumer = \MonsterMQ\Client\Consumer(); $consumer->connect('127.0.0.1', 5672); $consumer->logIn('guest', 'guest'); $producer->queue('queue-1')->setExclusive()->declare(); $producer->queue('queue-1')->bind('logs'); $consumer->consume('queue-1'); $consumer->wait(function ($message, $channelNumber) use ($consumer){ echo "\n $message \n"; }); } catch(\Exception $e) { var_dump($e); }

Вот код второго рабочего рабочий-2.php

try { $consumer = \MonsterMQ\Client\Consumer(); $consumer->connect('127.0.0.1', 5672); $consumer->logIn('guest', 'guest'); $producer->queue('queue-2')->setExclusive()->declare(); $producer->queue('queue-2')->bind('logs'); $consumer->consume('queue-2'); $consumer->wait(function ($message, $channelNumber) use ($consumer){ echo "\n $message \n"; }); } catch(\Exception $e) { var_dump($e); }

Если вы хотите сохранить сообщения, отправленные первому работнику, в файл журнала, просто вызовите в консоли следующую команду:

php worker-1.php > logs_from_rabbit.log

Чтобы запустить оба работника и отобразить сообщения в окнах терминала, вызовите следующие команды, каждую в своем окне:

php worker-1.php



php worker-2.php

А для отправки сообщений вызовите следующую команду:

php send.php

В следующий урок мы рассмотрим, как получать только определенное подмножество сообщений из всех отправленных.

Теги: #php

Вместе с данным постом часто просматривают: