Автор статьи Александр Романов, разработчик интеграционных решений.Теги: #java #spring; Джава; интеграцияВ процессе системной интеграции мы часто сталкиваемся с необходимостью гарантированной доставки сообщений между системами.
В таких случаях нам на помощь приходят очереди.
Но не все задачи так просты, как доставка сообщений из системы А в систему Б.
Бывают случаи, когда необходимо дополнить доставляемые сообщения данными из соседних систем, участвующих в интеграции.
Которые не всегда могут быть интегрированы через очереди, а имеют только синхронные сервисы.
И вот теперь в нашей интеграции возникают такие явления, как недоступность, сбои и прочие «приятные» особенности использования «синхронистов».
Можно было бы переложить обработку промежуточных сбоев на исходную систему, но это некультурно и невозможно, если мы будем публиковать события сразу для нескольких систем (в тему).
Удобным и работающим решением проблемы, с нашей точки зрения, является асинхронная пошаговая обработка сообщений через внутреннюю очередь с вызовом внешнего сервиса на каждом этапе.Если сервис выходит из строя по ошибке или из-за его временной неработоспособности, сообщение попадает во внутреннюю очередь отказов и отправляется повторно после устранения проблем, возникших в сервисе.
Данное решение также устраняет проблему невозможности отката транзакции при работе с внешними сервисами.Ни один вызов не пройдет дважды — обработка начинается ровно с того шага, на котором произошел сбой.
Все вышеперечисленное очень легко реализовать на интеграционной шине, в которой из коробки выходит асинхронное взаимодействие между компонентами посредством внутренних очередей.
Но слишком высокие цены за «коробку» могут очень затруднить использование интеграционной шины.
Приведем пример реализации простого приложения с использованием Spring Integration (далее SI) + Rabbit MQ. Оговоримся, что мы не используем Rabbit MQ в продакшене, потому что с XA невозможно работать.
Сердцем всего приложения является весна-интеграция-context.xml .
Он описывает компонентную модель, инициализирует ресурсные bean-компоненты и менеджер транзакций для работы с MQ. Опишем это подробнее.
Подключаем встроенный в SI драйвер и регистрируем ресурсы:
Нам нужен низкоуровневый бин amqpTemplate, посредством которого осуществляется взаимодействие с ресурсами.<rabbit:queue name="si.test.queue.to"/> <rabbit:queue name="si.test.queue.from"/>
Мы используем этот компонент непосредственно в тестах, и он необходим для SI-компонентов, работающих с Rabbit MQ. ConnectionFactory, необходимый для подключения к ресурсам, настраивает Spring Boot, используя настройки из application.yml ( см.
org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration. ).
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" mandatory="true"/>
Для транзакционной работы с Rabbit MQ необходим TransactionManager (нужен для отката сообщения обратно в очередь, если во время работы возникает ошибка).К сожалению, Rabbit MQ не поддерживает транзакции XA, иначе менеджер транзакций настроил бы Spring Boot. Давайте настроим то, что предоставляет Spring, вручную.
<bean id="rabbitTransactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> <constructor-arg name="connectionFactory" ref="rabbitConnectionFactory"/> </bean>
А теперь самое приятное.«Рисующий» поток! В кавычках, потому что пишем в формате xml, что менее приятно.
Поток
Нам понадобится:Для организации гарантированной доставки в этом приложении мы используем асинхронные вызовы через внутреннюю очередь.
- inbound-адаптер для чтения из входной очереди;
- асинхронные компоненты для вызовов служб REST;
- адаптер исходящего канала для записи в очередь вывода.
Так как компонентов много, требуется несколько очередей, что неудобно как с точки зрения разработчика, так и администратора.
Попробуем обойтись одним.
Рассмотрим сценарий взаимодействия двух компонентов.
SomeComponentOne получает сообщение из канала, вызывает какой-нибудь синхронный REST-сервис (работает с базой данных, записывает в файл и т. д.) и отправляет сообщение на дальнейшую обработку, которой должен заниматься SomeComponentTwo. Если SomeComponentOne не смог завершить порученную ему часть работы, он должен откатить транзакцию и вернуть полученное сообщение туда, откуда он его получил.
Если все в порядке, отправьте сообщение во внутреннюю очередь и завершите транзакцию.
SomeComponentOne берет сообщение из внутренней очереди и отправляет в нее сообщение, причем не обязательно в той же форме, в которой оно было получено.
Послание может быть дополнено или изменено, для нас это не имеет значения.
Он предназначен для работы с компонентом SomeComponentTwo. Есть проблема с маршрутизацией.
Сообщение попадает во внутреннюю очередь и должно быть забрано оттуда компонентом, необходимым в данный момент. Другими словами, необходима маршрутизация.
Это приложение демонстрирует маршрутизацию на основе заголовков сообщений.
Сообщение заполняется настраиваемым заголовком PartnerComponent, указывающим компонент, который должен работать с сообщением.
Опишем технические детали представленного потока.
Адаптер для чтения из входной очереди.
Получает сообщение и в транзакции бросает его прямо во внутреннюю очередь.
<int-amqp:inbound-channel-adapter channel="innerChannel" queue-names="si.test.queue.to" connection-factory="rabbitConnectionFactory" transaction-manager="rabbitTransactionManager"/> <int-amqp:channel id="innerChannel" queue-name="si.test.queue.inner" connection-factory="rabbitConnectionFactory" transaction-manager="rabbitTransactionManager"/>
Мы использовали асинхронный канал, специализированный для работы с очередями, предоставляемый Spring. Мы получили интерфейс SI-канала, а сообщения хранили прямо в очереди, в нашем случае во внутренней mq-очереди приложения.При получении сообщения из этого канала очереди будет открыта транзакция, потому что мы подключили наш менеджер транзакций.
К этому каналу очереди мы подключаем SI-маршрутизатор, который работает с заголовками сообщений.
<int:header-value-router id="wireRouter" input-channel="innerChannel" header-name="PartnerComponent" default-output-channel="component1Channel"> <int:mapping value="ComponentTwo" channel="component2Channel"/> <int:mapping value="ComponentThree" channel="component3Channel"/> <int:mapping value="OutboundComponent" channel="outboundRabbitChannel"/> </int:header-value-router>
Новое сообщение не имеет технического заголовка.ПартнерКомпонент , поэтому по умолчанию он будет обработан компонентом someComponentOne , в чьи обязанности входит указание в заголовке сообщения ПартнерКомпонент следующий компонент и отправка сообщения во внутреннюю очередь.
Маршрутизатор снова забирает сообщение из внутренней очереди и отправляет его на обработку указанному компоненту.
Описание компонентов, на которые отправляются сообщения от маршрутизатора.
<int:channel id="component1Channel"/> <int:service-activator input-channel="component1Channel" ref="someComponentOne" method="process"/> <int:channel id="component2Channel"/> <int:service-activator input-channel="component2Channel" ref="someComponentTwo" method="process"/> <int:channel id="component3Channel"/> <int:service-activator input-channel="component3Channel" ref="someComponentThree" method="process"/> <int:channel id="outboundRabbitChannel"/> <int:service-activator input-channel="outboundRabbitChannel" ref="outboundRabbitComponent" method="process"/
Адаптер для отправки в очередь вывода.
<int:channel id="toRabbit"/> <int-amqp:outbound-channel-adapter channel="toRabbit" amqp-template="amqpTemplate" routing-key="si.test.queue.from"/>
Сборка (pom.xml)
Старый добрый Мейвен.Стандартная сборка из Spring Boot. Зависимости от SI и AMQP предоставляют все необходимые библиотеки.
Мы также подключаем Spring-boot-starter-test для реализации тестовых примеров с помощью JUnit.
Рабочий SomeComponent*.
java Транзакционные компоненты, подключенные как сервис-активатор к потоку SI. Вызов REST через RestTemplate и отправка во внутреннюю очередь через внутренний канал .
Достаточно продемонстрировать, как пользоваться сервисом и удобно помакать его в тестах.
Тестирование
В тесте тестHappyPath мы проверили работоспособность потока, когда нет сбоев при вызове REST. Мокаем все обращения к REST-сервисам без сбоев, кидаем сообщение во входную очередь, ждем выходных, проверяем прохождение всех компонентов по содержимому тела полученного сообщения.В тесте testГарантированная доставка мы протестировали гарантированную доставку при сбое в одном из REST. — моделируем разовый сбой при вызове сервиса из третьего компонента, ждем доставки сообщения в очередь вывода и проверяем тело полученного сообщения.
Заключение
Мы оформили заявку с гарантированной доставкой.Есть ряд мелких нерешенных проблем: сообщения отправляются бесконечно и бесконтрольно, кто поддерживает это решение - администратор, или можно ли эту поддержку автоматизировать? Мы решаем эти вопросы с помощью кастомных приложений и индивидуальных настроек для каждого типа сообщений.
Возможно, мы опишем эти решения в будущем.
Все приложение находится на Git-хаб
-
Обучающие Видео По Работе В Axure
19 Oct, 24 -
Mail.ru Перешёл На Поиск Google Без Логотипа
19 Oct, 24 -
Хитрость
19 Oct, 24