Я продолжаю ряд перевод уроков из Официальный веб-сайт .
Примеры будут на PHP, но их можно реализовать на самых популярных языках.
ЯП .
В предыдущем статья Мы разработали систему журналирования.
Нам удалось отправить сообщения нескольким получателям.
В этой статье мы модернизируем нашу программу - будем отправлять получателю только часть сообщений.
Например, мы сможем сохранять на диске только сообщения с критическими ошибками (экономия дискового пространства), а все сообщения будем отображать в консоли.
Привязки
В предыдущей статье мы создали привязки.Напомним код:
Привязка — это соединение между точкой доступа и очередью.$channel->queue_bind($queue_name, 'logs');
Это можно интерпретировать так: очередь хочет получать сообщения от точки доступа.
Привязка может принимать параметр router_key. Чтобы избежать путаницы с параметром $channel::basic_publish (который также содержит параметр router_key), назовем егоbinding_key. Рассмотрим создание соединения с ключомbinding_key: $binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);
Значение этого ключа зависит от типа точки доступа.
Точка доступа с типом разветвления просто проигнорирует его.
Точка прямого доступа
Наша система журналирования из предыдущей статьи отправляла все сообщения всем подписчикам.Мы хотим расширить нашу систему, чтобы фильтровать сообщения по важности.
Например, мы позаботимся о том, чтобы скрипт, записывающий логи на диск, не тратил свое место на сообщения типа предупреждение или инфо.
Раньше мы использовали точку доступа с разветвлением, которая не дает нам полной гибкости — она подходит только для простого вещания.
Вместо этого мы будем использовать прямой тип.
Его алгоритм очень прост — сообщения попадают в очередь, чейbinding_key совпадает с ключом маршрутизации сообщения.
Давайте посмотрим на схему на картинке:
На схеме показана точка доступа X и две связанные с ней очереди.
Первая очередь привязана к ключу привязки = оранжевый, а вторая очередь имеет две ссылки.
Один с ключом привязки = черный, а второй с ключом = зеленый.
Сообщения с ключом маршрутизации = оранжевого цвета будут направляться в очередь Q1, а сообщения с ключом маршрутизации черного или зеленого цвета будут направляться в очередь Q2. Все остальные сообщения будут удалены.
Несколько привязок
Вполне допустимо связать несколько очередей с одним и тем же ключом привязки.
В этом примере мы связываем точку доступа X и очередь Q1 с тем же черным ключом, что и очередь Q2. В этом примере Direct ведет себя так же, как Fanout: он отправляет сообщения во все связанные очереди.
Сообщения с черным ключом попадут в обе очереди Q1 и Q2.
Отправка журналов
Построим алгоритм отправки сообщений.Вместо fanout мы будем использовать для точки доступа прямой тип.
Ключ маршрутизации будет соответствовать имени типа журнала.
Предположим, что скрипт отправки журнала знает тип журнала.
Для начала создадим точку доступа: $channel->exchange_declare('direct_logs', 'direct', false, false, false);
Теперь отправим сообщение: $channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);
Журнал будет иметь 3 типа: «информация», «предупреждение», «ошибка».
Подписка
Отправка сообщений будет такой же, как и в предыдущем примере.статьи , с одним условием — для каждого типа журнала необходимо создать свое соединение привязки.
foreach($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
Итого мы получаем
Код сценария продюсера emite_log_direct.php:
<Эphp
require_once __DIR__ .
'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = $argv[1];
if(empty($severity)) $severity = "info";
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);
echo " [x] Sent ",$severity,':',$data," \n";
$channel->close();
$connection->close();
?>
Код сценария подписчика получения_logs_direct.php:
<Эphp
require_once __DIR__ .
'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severities = array_slice($argv, 1);
if(empty($severities )) {
file_put_contents(' php://stderr ', "Usage: $argv[0] [info] [warning] [error]\n");
exit(1);
}
foreach($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
Если вы хотите сохранить в файл только журналы ошибок и предупреждений, введите в консоли:
$ php receive_logs_direct.php warning error > logs_from_rabbit.log
Если вы хотите отобразить все логи на экране, введите в консоли
$ php receive_logs_direct.php info warning error
[*] Waiting for logs. To exit press CTRL+C
Или вытащить только журналы ошибок:
$ php emit_log_direct.php error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'
(источники (исходный файл emit_log_direct.php) И (источник receive_logs_direct.php) ) В следующей статье мы рассмотрим прослушивание сообщений, соответствующих любому шаблону.
Теги: #rabbitmq #php-amqp-lib #php-amqp-lib #php #routing #разработка сайтов #php
-
Легче, Чем Кажется. Глава 12
19 Oct, 24 -
Интервью С Dhh (Создателем Rails)
19 Oct, 24 -
Почему Я Ненавижу Eloquent Orm
19 Oct, 24