В нашем предыдущая статья Мы создали очередь задач.
Предполагается, что одна задача в виде сообщения доставляется одному получателю.
В этой статье мы поступим по-другому: отправим одно сообщение нескольким получателям одновременно.
Мы создадим систему логирования, которая будет состоять из двух программ: первая будет отправлять сообщения, а вторая — получать и отображать их.
В нашей системе все работающие получатели получат сообщение, отправленное отправителем.
Оглавление всех частей: Часть 1 Часть 2 Часть 3 Часть 4 Часть 5
Обмены
В предыдущих статьях мы отправляли и получали сообщения из очередей, теперь пришло время представить полную модель пересылки сообщений в RabbitMQ. Давайте быстро пробежимся по тому, что мы рассмотрели в предыдущих статьях:- Producer — приложение, которое отправляет сообщения
- Очередь — буфер, в котором хранятся сообщения
- Потребитель — приложение, которое получает и обрабатывает сообщения.
Более того, он даже не знает, попало ли оно в очередь.
Вместо этого отправитель отправляет сообщения обменникам (Exchange).
Обменник — очень простая штука, он делает две вещи: принимает сообщения от отправителей и перенаправляет их в очередь.
Обменники бывают разных типов: одни отправляют сообщения в определенную очередь (прямой тип), другие отправляют одни и те же сообщения сразу в несколько очередей (тип разветвления), третьи перенаправляют сообщения в очереди на основе определенных, заданных правил перенаправления (тип темы).
.
(изображение взято с Официальный сайт RabbitMQ )
Давайте рассмотрим такой обменник, как fanout. Для его создания напишем следующий код:
Обменник Fanout работает по очень простой схеме; он просто отправляет полученные сообщения во все связанные с ним очереди.$producer->newFanoutExchange('logs');
Чтобы получить список всех обменников, существующих на сервере, позвоните по телефону кроликmqctl sudo rabbitmqctl list_exchanges
В этот список войдут amq.* обменники и стандартный безымянный (пустая строка) обменник.
Не обращайте на них внимания; они нам пока не нужны.
В прошлой статье мы ничего не знали об обменниках, но тем не менее смогли отправлять сообщения.
Это произошло потому, что если в MonsterMQ явно не указать обменник в качестве третьего аргумента метода Продюсер::публикация() библиотека будет использовать стандартный безымянный обменник RabbitMQ, который отправляет сообщения в очереди, имена которых передаются в качестве второго аргумента метода Продюсер::публикация() .
Теперь мы можем отправлять сообщения нашему новому обменнику, указав его имя в качестве третьего аргумента метода.
Продюсер::публикация() : $producer->publish($message, '', 'logs');
Теперь давайте создадим двух воркеров с двумя очередями, которые будут существовать до тех пор, пока активны объявившие их воркеры.
Вы можете сделать это следующим образом: //Worker 1
$producer->queue('queue-1')->setExclusive()->declare();
//Worker 2
$producer->queue('queue-2')->setExclusive()->declare();
Теперь, когда воркер, анонсировавший очередь, завершает сессию и отключается, она автоматически удаляется.
Если оба работника завершают работу, обе очереди удаляются.
Привязка очереди к обменнику
Мы уже создали обменник и очередь.Теперь нам остается только указать обменнику отправлять полученные сообщения в наши очереди.
Нам нужно связывать обменник и очереди.
Для этого напишем следующий код: //Worker 1
$producer->queue('queue-1')->bind('logs');
//Worker 2
$producer->queue('queue-2')->bind('logs');
С этого момента наш обменник журналы подключен к нашим очередям и будет пересылать в них полученные сообщения.
Вы можете перечислить все существующие соединения с помощью следующей команды в Linux rabbitmqctl list_bindings
Собираем код вместе
Наш скрипт отправителя не претерпел особых изменений по сравнению с предыдущим уроком.Основное отличие в том, что теперь он отправляет сообщения на заранее анонсированный обменник, а не на стандартный (доступен из коробки).
Вот код send.php try {
$producer = \MonsterMQ\Client\Producer();
$producer->connect('127.0.0.1', 5672);
$producer->logIn('guest', 'guest');
$producer->newFanoutExchange('logs');
$message = implode(' ', array_slice($argv, 1));
$message = empty($message) ? 'Hello world!' : $message;
$producer->publish($message, '', 'logs');
echo "\n Sent {$message} \n";
} catch(\Exception $e) {
var_dump($e);
}
Стоит отметить, что при отправке сообщения на обменник, не привязанный ни к одной очереди, сообщение будет потеряно.
Но пока нас это устраивает, поскольку мы еще не запустили наших получателей.
Вот код нашего первого работника рабочий-1.php try {
$consumer = \MonsterMQ\Client\Consumer();
$consumer->connect('127.0.0.1', 5672);
$consumer->logIn('guest', 'guest');
$producer->queue('queue-1')->setExclusive()->declare();
$producer->queue('queue-1')->bind('logs');
$consumer->consume('queue-1');
$consumer->wait(function ($message, $channelNumber) use ($consumer){
echo "\n $message \n";
});
} catch(\Exception $e) {
var_dump($e);
}
Вот код второго рабочего рабочий-2.php try {
$consumer = \MonsterMQ\Client\Consumer();
$consumer->connect('127.0.0.1', 5672);
$consumer->logIn('guest', 'guest');
$producer->queue('queue-2')->setExclusive()->declare();
$producer->queue('queue-2')->bind('logs');
$consumer->consume('queue-2');
$consumer->wait(function ($message, $channelNumber) use ($consumer){
echo "\n $message \n";
});
} catch(\Exception $e) {
var_dump($e);
}
Если вы хотите сохранить сообщения, отправленные первому работнику, в файл журнала, просто вызовите в консоли следующую команду: php worker-1.php > logs_from_rabbit.log
Чтобы запустить оба работника и отобразить сообщения в окнах терминала, вызовите следующие команды, каждую в своем окне: php worker-1.php
php worker-2.php
А для отправки сообщений вызовите следующую команду: php send.php
В следующий урок мы рассмотрим, как получать только определенное подмножество сообщений из всех отправленных.
Теги: #php
-
Алгоритм Поисковой Системы Svlab Search
19 Oct, 24 -
Роскосмос Опубликовал Видеоэкскурсию По Мкс
19 Oct, 24 -
Два «Гурмана» Столкнулись Интересами
19 Oct, 24