Реализация Гарантированной Асинхронной Доставки Сообщений.

Автор статьи Александр Романов, разработчик интеграционных решений.

В процессе системной интеграции мы часто сталкиваемся с необходимостью гарантированной доставки сообщений между системами.

В таких случаях нам на помощь приходят очереди.

Но не все задачи так просты, как доставка сообщений из системы А в систему Б.

Бывают случаи, когда необходимо дополнить доставляемые сообщения данными из соседних систем, участвующих в интеграции.

Которые не всегда могут быть интегрированы через очереди, а имеют только синхронные сервисы.

И вот теперь в нашей интеграции возникают такие явления, как недоступность, сбои и прочие «приятные» особенности использования «синхронистов».

Можно было бы переложить обработку промежуточных сбоев на исходную систему, но это некультурно и невозможно, если мы будем публиковать события сразу для нескольких систем (в тему).



Реализация гарантированной асинхронной доставки сообщений.
</p><p>

Удобным и работающим решением проблемы, с нашей точки зрения, является асинхронная пошаговая обработка сообщений через внутреннюю очередь с вызовом внешнего сервиса на каждом этапе.

Если сервис выходит из строя по ошибке или из-за его временной неработоспособности, сообщение попадает во внутреннюю очередь отказов и отправляется повторно после устранения проблем, возникших в сервисе.



Реализация гарантированной асинхронной доставки сообщений.
</p><p>

Данное решение также устраняет проблему невозможности отката транзакции при работе с внешними сервисами.

Ни один вызов не пройдет дважды — обработка начинается ровно с того шага, на котором произошел сбой.

Все вышеперечисленное очень легко реализовать на интеграционной шине, в которой из коробки выходит асинхронное взаимодействие между компонентами посредством внутренних очередей.

Но слишком высокие цены за «коробку» могут очень затруднить использование интеграционной шины.

Приведем пример реализации простого приложения с использованием Spring Integration (далее SI) + Rabbit MQ. Оговоримся, что мы не используем Rabbit MQ в продакшене, потому что с XA невозможно работать.

Сердцем всего приложения является весна-интеграция-context.xml .

Он описывает компонентную модель, инициализирует ресурсные bean-компоненты и менеджер транзакций для работы с MQ. Опишем это подробнее.

Подключаем встроенный в SI драйвер и регистрируем ресурсы:

  
  
  
  
  
  
   

<rabbit:queue name="si.test.queue.to"/> <rabbit:queue name="si.test.queue.from"/>

Нам нужен низкоуровневый бин amqpTemplate, посредством которого осуществляется взаимодействие с ресурсами.

Мы используем этот компонент непосредственно в тестах, и он необходим для SI-компонентов, работающих с Rabbit MQ. ConnectionFactory, необходимый для подключения к ресурсам, настраивает Spring Boot, используя настройки из application.yml ( см.

org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration. ).



<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" mandatory="true"/>

Для транзакционной работы с Rabbit MQ необходим TransactionManager (нужен для отката сообщения обратно в очередь, если во время работы возникает ошибка).

К сожалению, Rabbit MQ не поддерживает транзакции XA, иначе менеджер транзакций настроил бы Spring Boot. Давайте настроим то, что предоставляет Spring, вручную.



<bean id="rabbitTransactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> <constructor-arg name="connectionFactory" ref="rabbitConnectionFactory"/> </bean>

А теперь самое приятное.

«Рисующий» поток! В кавычках, потому что пишем в формате xml, что менее приятно.



Поток

Нам понадобится:
  • inbound-адаптер для чтения из входной очереди;
  • асинхронные компоненты для вызовов служб REST;
  • адаптер исходящего канала для записи в очередь вывода.

Для организации гарантированной доставки в этом приложении мы используем асинхронные вызовы через внутреннюю очередь.

Так как компонентов много, требуется несколько очередей, что неудобно как с точки зрения разработчика, так и администратора.

Попробуем обойтись одним.

Рассмотрим сценарий взаимодействия двух компонентов.

SomeComponentOne получает сообщение из канала, вызывает какой-нибудь синхронный REST-сервис (работает с базой данных, записывает в файл и т. д.) и отправляет сообщение на дальнейшую обработку, которой должен заниматься SomeComponentTwo. Если SomeComponentOne не смог завершить порученную ему часть работы, он должен откатить транзакцию и вернуть полученное сообщение туда, откуда он его получил.

Если все в порядке, отправьте сообщение во внутреннюю очередь и завершите транзакцию.

SomeComponentOne берет сообщение из внутренней очереди и отправляет в нее сообщение, причем не обязательно в той же форме, в которой оно было получено.

Послание может быть дополнено или изменено, для нас это не имеет значения.

Он предназначен для работы с компонентом SomeComponentTwo. Есть проблема с маршрутизацией.

Сообщение попадает во внутреннюю очередь и должно быть забрано оттуда компонентом, необходимым в данный момент. Другими словами, необходима маршрутизация.

Это приложение демонстрирует маршрутизацию на основе заголовков сообщений.

Сообщение заполняется настраиваемым заголовком PartnerComponent, указывающим компонент, который должен работать с сообщением.

Опишем технические детали представленного потока.

Адаптер для чтения из входной очереди.

Получает сообщение и в транзакции бросает его прямо во внутреннюю очередь.



<int-amqp:inbound-channel-adapter channel="innerChannel" queue-names="si.test.queue.to" connection-factory="rabbitConnectionFactory" transaction-manager="rabbitTransactionManager"/> <int-amqp:channel id="innerChannel" queue-name="si.test.queue.inner" connection-factory="rabbitConnectionFactory" transaction-manager="rabbitTransactionManager"/>

Мы использовали асинхронный канал, специализированный для работы с очередями, предоставляемый Spring. Мы получили интерфейс SI-канала, а сообщения хранили прямо в очереди, в нашем случае во внутренней mq-очереди приложения.

При получении сообщения из этого канала очереди будет открыта транзакция, потому что мы подключили наш менеджер транзакций.

К этому каналу очереди мы подключаем SI-маршрутизатор, который работает с заголовками сообщений.



<int:header-value-router id="wireRouter" input-channel="innerChannel" header-name="PartnerComponent" default-output-channel="component1Channel"> <int:mapping value="ComponentTwo" channel="component2Channel"/> <int:mapping value="ComponentThree" channel="component3Channel"/> <int:mapping value="OutboundComponent" channel="outboundRabbitChannel"/> </int:header-value-router>

Новое сообщение не имеет технического заголовка.

ПартнерКомпонент , поэтому по умолчанию он будет обработан компонентом someComponentOne , в чьи обязанности входит указание в заголовке сообщения ПартнерКомпонент следующий компонент и отправка сообщения во внутреннюю очередь.

Маршрутизатор снова забирает сообщение из внутренней очереди и отправляет его на обработку указанному компоненту.

Описание компонентов, на которые отправляются сообщения от маршрутизатора.



<int:channel id="component1Channel"/> <int:service-activator input-channel="component1Channel" ref="someComponentOne" method="process"/> <int:channel id="component2Channel"/> <int:service-activator input-channel="component2Channel" ref="someComponentTwo" method="process"/> <int:channel id="component3Channel"/> <int:service-activator input-channel="component3Channel" ref="someComponentThree" method="process"/> <int:channel id="outboundRabbitChannel"/> <int:service-activator input-channel="outboundRabbitChannel" ref="outboundRabbitComponent" method="process"/

Адаптер для отправки в очередь вывода.



<int:channel id="toRabbit"/> <int-amqp:outbound-channel-adapter channel="toRabbit" amqp-template="amqpTemplate" routing-key="si.test.queue.from"/>



Сборка (pom.xml)

Старый добрый Мейвен.

Стандартная сборка из Spring Boot. Зависимости от SI и AMQP предоставляют все необходимые библиотеки.

Мы также подключаем Spring-boot-starter-test для реализации тестовых примеров с помощью JUnit.

Рабочий SomeComponent*.

java

Транзакционные компоненты, подключенные как сервис-активатор к потоку SI. Вызов REST через RestTemplate и отправка во внутреннюю очередь через внутренний канал .

Достаточно продемонстрировать, как пользоваться сервисом и удобно помакать его в тестах.



Тестирование

В тесте тестHappyPath мы проверили работоспособность потока, когда нет сбоев при вызове REST. Мокаем все обращения к REST-сервисам без сбоев, кидаем сообщение во входную очередь, ждем выходных, проверяем прохождение всех компонентов по содержимому тела полученного сообщения.

В тесте testГарантированная доставка мы протестировали гарантированную доставку при сбое в одном из REST. — моделируем разовый сбой при вызове сервиса из третьего компонента, ждем доставки сообщения в очередь вывода и проверяем тело полученного сообщения.



Заключение

Мы оформили заявку с гарантированной доставкой.

Есть ряд мелких нерешенных проблем: сообщения отправляются бесконечно и бесконтрольно, кто поддерживает это решение - администратор, или можно ли эту поддержку автоматизировать? Мы решаем эти вопросы с помощью кастомных приложений и индивидуальных настроек для каждого типа сообщений.

Возможно, мы опишем эти решения в будущем.

Все приложение находится на Git-хаб

Теги: #java #spring; Джава; интеграция
Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.