Полезные Методы Apache Camel

Если вы когда-либо создавали интеграционные решения на Java, вы, вероятно, знакомы с замечательной средой Java под названием Апач Верблюд .

Он может легко подключить несколько сервисов, импортировать данные из файлов, баз данных и других источников, уведомить вас о различных событиях в Jabber-клиенте или по электронной почте, а также стать основой составного приложения на основе большого количества других приложений.



Введение

Модель Apache Camel основана на концепции маршрутов ( маршруты ), который можно настроить как статически (например, в файле контекста Spring), так и во время работы приложения.

Караваны сообщений путешествуют по маршрутам, одновременно попадая в различные процессоры, конвертеры, агрегаторы и другие преобразователи, что в конечном итоге позволяет обрабатывать данные из множества разных источников в одном приложении и передавать эти данные в другие сервисы или сохранять их в каком-либо виде.

хранения.

В целом Camel — полностью самодостаточный фреймворк.

Используя его, зачастую даже не придется писать собственный код — достаточно лишь набрать правильный маршрут, который решит проблему.

Однако для создания собственной модели обработки данных вам все равно может потребоваться написать код. То же самое было и с нами.

Мы используем Camel для реализации конвейеров для обработки нескольких сообщений из разных источников.

Такой подход позволяет, например, отслеживать состояние сервисов, оперативно оповещать о проблемах, получать агрегированные аналитические разрезы, готовить данные для отправки в другие системы и т.д. Поток обработанных и «переваренных» сообщений в систему может быть достаточно большим.

(тысячи сообщений в минуту), поэтому мы стараемся использовать горизонтально масштабируемые решения там, где это возможно.

Например, у нас есть система отслеживания статуса запуска тестов и мониторинга сервисов.

Ежедневно выполняются миллионы таких тестов, и мы получаем в разы больше сообщений для отслеживания хода их выполнения.

Чтобы «выучить» такой объем сообщений, необходимо четко определить стратегию агрегации — от большего параллелизма к меньшему.

Кроме того, необходимо иметь хотя бы базовую горизонтальную масштабируемость и отказоустойчивость сервиса.

В качестве очереди сообщений мы используем ActiveMQ , как оперативное хранилище - Хейзелкаст .



Масштабирование

Для организации параллельной обработки организуется кластер из нескольких равноправных серверов.

На каждом из них живет брокер ActiveMQ , в очередь которого добавляются сообщения, поступающие по протоколу HTTP. Дескрипторы HTTP находятся за балансировщиком, который распределяет сообщения на работающие серверы.

Очередь входных сообщений на каждом сервере анализируется приложением Camel с использованием кластера.

Хейзелкаст для хранения состояний, а также при необходимости синхронизации обработки.

ActiveMQ также кластеризуются с помощью NetworkConnectors и могут «делиться» сообщениями друг с другом.

В целом схема выглядит так:

Полезные методы Apache Camel

Как видно из схемы, выход из строя одного из компонентов системы не нарушает ее работоспособность, учитывая равенство элементов.

Например, если обработчик сообщений на одном из серверов выходит из строя, ActiveMQ начинает отправлять сообщения из своих очередей другим.

Если один из брокеров ActiveMQ выходит из строя, то обработчик «перехватывает» соседнего.

И наконец, если выйдет из строя весь сервер, остальные серверы продолжат усердно работать, как ни в чем не бывало.

Для повышения сохранности данных узлы Hazelcast хранят резервные копии данных своих соседей (копии выполняются асинхронно, их количество на каждом узле настраивается дополнительно).

Данная схема также позволяет без особых затрат масштабировать сервис за счет добавления дополнительных серверов, тем самым увеличивая вычислительный ресурс.



Распределенные агрегаторы

При использовании агрегации Apache Camel включает в себя концепции « хранилище агрегации " И " ключ корреляции ".

Первый — это репозиторий, в котором хранятся агрегированные состояния (например, количество неудачных тестов за день).

Второй — это ключ, используемый для распределения потока сообщений между состояниями.

Другими словами, ключ корреляции — это ключ в хранилище агрегации (например, текущая дата).

Для агрегаторов в такой схеме нам нужно было реализовать собственный репозиторий агрегации, который сможет хранить состояния в Hazelcast и синхронизировать обработку идентичных ключей внутри кластера.

К сожалению, в стандартной комплектации Camel мы не нашли такой возможности.

К счастью, создать его оказалось довольно легко — достаточно реализовать интерфейс.

Репозиторий агрегации : Скрытый текст

  
  
  
  
  
  
  
  
   

public class HazelcastAggregatorRepository implements AggregationRepository { private final Logger logger = LoggerFactory.getLogger(getClass()); // maximum time of waiting for the lock from hz public static final long WAIT_FOR_LOCK_SEC = 20; private final HazelcastInstance hazelcastInstance; private final String repositoryName; private IMap<String, DefaultExchangeHolder> map; public HazelcastAggregatorRepository(HazelcastInstance hazelcastInstance, String repositoryName){ this.hazelcastInstance = hazelcastInstance; this.repositoryName = repositoryName; } @Override protected void doStart() throws Exception { map = hazelcastInstance.getMap(repositoryName); } @Override protected void doStop() throws Exception { /* Nothing to do */ } @Override public Exchange add(CamelContext camelContext, String key, Exchange exchange) { try { DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange); map.tryPut(key, holder, WAIT_FOR_LOCK_SEC, TimeUnit.SECONDS); return toExchange(camelContext, holder); } catch (Exception e) { logger.error("Failed to add new exchange", e); } finally { map.unlock(key); } return null; } @Override public Exchange get(CamelContext camelContext, String key) { try { map.tryLock(key, WAIT_FOR_LOCK_SEC, TimeUnit.SECONDS); return toExchange(camelContext, map.get(key)); } catch (Exception e) { logger.error("Failed to get the exchange", e); } return null; } @Override public void remove(CamelContext camelContext, String key, Exchange exchange) { try { logger.debug("Removing '" + key + "' tryRemove."); map.tryRemove(key, WAIT_FOR_LOCK_SEC, TimeUnit.SECONDS); } catch (Exception e) { logger.error("Failed to remove the exchange", e); } finally { map.unlock(key); } } @Override public void confirm(CamelContext camelContext, String exchangeId) { /* Nothing to do */ } @Override public Set<String> getKeys() { return Collections.unmodifiableSet(map.keySet()); } private Exchange toExchange(CamelContext camelContext, DefaultExchangeHolder holder) { Exchange exchange = null; if (holder != null) { exchange = new DefaultExchange(camelContext); DefaultExchangeHolder.unmarshal(exchange, holder); } return exchange; } }

Чтобы использовать такой репозиторий, теперь вам просто нужно подключить Hazelcast к проекту и объявить его в контексте, а затем добавить набор репозиториев, указывающих на экземпляр Hazelcast. Важно помнить, что каждый агрегатор должен иметь собственное пространство ключей и, следовательно, ему также должно передаваться имя репозитория.

В настройках Hazelcast нужно прописать все серверы, входящие в состав кластера.

Таким образом, мы получаем возможность использовать агрегаторы в распределенной среде, не задумываясь о том, на каком сервере будет происходить агрегация.



Распределенные таймеры

Количество состояний, хранящихся в кластере, довольно велико.

Но не все они нужны постоянно.

Кроме того, некоторые состояния (например, состояния тестов, которые давно не использовались, и поэтому по ним давно не было сообщений) хранить вообще не нужно.

Хотелось бы избавиться от таких условий и дополнительно уведомить об этом другие системы.

Для этого необходимо с заданной периодичностью проверять состояния агрегаторов на устаревание и удалять их.

Самый простой способ сделать это — добавить периодическую задачу, например, с помощью Quartz. Более того, Camel позволяет это сделать.

Однако следует помнить, что выполнение происходит в кластере со множеством одноранговых серверов.

И мне не очень хочется, чтобы периодические задания Кварца работали на всех одновременно.

Чтобы этого избежать, достаточно выполнить повторную синхронизацию с помощью блокировок Hazelcast. А как заставить Кварц инициализироваться только на одном сервере, точнее в какой момент синхронизироваться? Для инициализации контекста Camel и всех остальных компонентов системы мы используем Spring, а чтобы заставить Quartz запускать планировщик только на одном сервере из кластера, во-первых, нужно отключить его автоматический запуск, явно объявив в контексте:

<bean id="quartz" class="org.apache.camel.component.quartz.QuartzComponent"> <property name="autoStartScheduler" value="false"/> </bean>

Во-вторых, нужно где-то синхронизироваться и запускать планировщик только в том случае, если удалось захватить блокировку, а затем ждать следующего момента ее захвата (на случай, если предыдущий сервер, захвативший блокировку, вышел из строя или по какой-то причине освободил ее).

В Spring это можно реализовать несколькими способами, например, через ApplicationListener, который позволяет обрабатывать события запуска контекста:

<bean class="com.my.hazelcast.HazelcastQuartzSchedulerStartupListener"> <property name="hazelcastInstance" ref="hazelcastInstance"/> <property name="quartzComponent" ref="quartz"/> </bean>

Получаем следующую реализацию класса инициализации планировщика: Скрытый текст

public class HazelcastQuartzSchedulerStartupListener implements ShutdownPrepared, ApplicationListener { public static final String DEFAULT_QUARTZ_LOCK = "defaultQuartzLock"; protected volatile boolean initialized = false; Logger log = LoggerFactory.getLogger(getClass()); Lock lock; protected volatile boolean initialized = false; protected String lockName; protected HazelcastInstance hazelcastInstance; protected QuartzComponent quartzComponent; public HazelcastQuartzSchedulerStartupListener() { super(); log.info("HazelcastQuartzSchedulerStartupListener created"); } public void setLockName(final String lockName) { this.lockName = lockName; } public synchronized Lock getLock() { if (lock == null) { lock = hazelcastInstance.getLock(lockName != null ? lockName : DEFAULT_QUARTZ_LOCK); } return lock; } @Override public void prepareShutdown(boolean forced) { unlock(); } @Required public void setQuartzComponent(QuartzComponent quartzComponent) { this.quartzComponent = quartzComponent; } @Required public void setHazelcastInstance(HazelcastInstance hazelcastInstance) { this.hazelcastInstance = hazelcastInstance; } @Override public synchronized void onApplicationEvent(ApplicationEvent event) { if (initialized) { return; } try { while (true) { try { getLock().

lock(); log.warn("This node is now the master Quartz!"); try { quartzComponent.startScheduler(); } catch (Exception e) { unlock(); throw new RuntimeException(e); } return; } catch (OperationTimeoutException e) { log.warn("This node is not the master Quartz and failed to wait for the lock!"); } } } catch (Exception e) { log.error("Error while trying to wait for the lock from Hazelcast!", e); } } private synchronized void unlock() { try { getLock().

unlock(); } catch (IllegalStateException e) { log.warn("Exception while trying to unlock quartz lock: Hazelcast instance is already inactive!"); } catch (Exception e) { log.warn("Exception during the unlock of the master Quartz!", e); } } }

Таким образом, мы сможем использовать периодические задачи.

рекомендуемый способ в Camel и с учетом распределенной среды выполнения.

Например, вот так:

<route id="quartz-route"> <from uri=" quartz://quartz-test/testЭcron=*+*+*+*+*+?"/ > <log message="Quartz each second message caught ${in.body.class}!"/> <to uri=" direct:queue:done-quartz"/ > </route>



Конечный автомат

Помимо простых методов агрегации (например, подсчета сумм), нам также часто приходилось переключать состояния агрегаторов в зависимости от входящих сообщений, например, чтобы всегда помнить текущее состояние завершенного теста.

Они хорошо подходят для реализации этой функции.

конечные автоматы .

Давайте представим, что у нас есть некоторое тестовое состояние.

Например, Тестпасседстате.

При получении сообщения TestFailed для данного теста мы должны переключить состояние агрегатора на TestFailedState, а при получении TestPassed снова на TestPassedState. И так до бесконечности.

На основании этих переходов можно сделать некоторые выводы, например, если происходит переход TestPassed -> TestFailed, нужно уведомить всех заинтересованных лиц о том, что тест сломался.

А если произойдет обратный переход, то, наоборот, скажите им, что все в порядке.



Полезные методы Apache Camel

Подбирая варианты реализации такой стратегии агрегации, мы пришли к выводу, что необходима некая модель конечного автомата, адаптированная к реалиям обработки сообщений.

Во-первых, сообщения, поступающие на вход агрегаторов, представляют собой некий набор объектов.

Каждое событие имеет свой тип и поэтому легко вписывается в классы Java. Для описания типов событий мы используем схему xsd, согласно которой генерируем набор классов с помощью xjc. Эти классы легко сериализуются и десериализуются в xml и json с помощью jaxb. Состояния, хранящиеся в Hazelcast, также представлены набором классов, созданных xsd. Таким образом, нам нужно было найти реализацию конечного автомата, которая могла бы легко обрабатывать переходы состояний в зависимости от типа сообщения и типа текущего состояния.

А ещё мне хотелось, чтобы эти переходы определялись декларативно, а не императивно, как во многих подобных библиотеках.

Легковесной реализации такого функционала мы не нашли, поэтому решили написать свою, учитывающую наши потребности и обеспечивающую хорошую основу для обработки сообщений, поступающих по маршруту в Camel. Небольшая библиотека, отвечающая нашим потребностям, называется Ятомата (от слова «Еще одна ауТомата») и доступен на github .

Было решено несколько упростить модель FSM — например, контекст задается объектом текущего состояния, сообщение также хранит некоторые данные.

Однако переходы определяются только типами состояний и сообщений.

Конечный автомат определен для класса, который используется в качестве агрегатора.

Для этого класс помечается аннотацией @ФСМ .

Он имеет начальное состояние (start) и набор переходов, некоторые из которых останавливают агрегацию (stop=true), автоматически отправляя накопленное состояние дальше по маршруту.

Набор переходов объявляется аннотацией @Переходы и массив аннотаций @Транзит , в каждом из которых можно указать набор начальных состояний (от), конечного состояния (до), набор событий, при которых этот переход активируется (вкл), а также указать, является ли это состояние концом работы машины.

операция (остановка).

Аннотации предоставляются для обработки переходов.

@В пути , @BeforeTransit , и @AfterTransit , который можно использовать для обозначения общедоступных методов внутри класса.

Эти методы будут вызываться, если будет найден соответствующий переход, удовлетворяющий его сигнатуре.



@FSM(start = Undefined.class) @Transitions({ @Transit(on = TestPassed.class, to = TestPassedState.class), @Transit(on = TestFailed.class, to = TestFailedState.class), @Transit(stop = true, on = TestExpired.class), }) public class TestStateFSM { @OnTransit public void onTestFailed(State oldState, TestFailedState newState, TestFailed event){} @OnTransit public void onTestPassed(State oldState, TestPassedState newState, TestPassed event){} }

Работа с государственным автоматом осуществляется следующим образом:

Yatomata<TestStateFSM> fsm = new FSMBuilder(TestStateFSM.class).

build(); fsm.getCurrentState(); // returns instance of Undefined fsm.isStopped(); // returns false fsm.getFSM(); // returns instance of TestStateFSM fsm.fire(new TestPassed()); // returns instance of TestPassedState fsm.fire(new TestFailed()); // returns instance of TestFailedState fsm.fire(new TestExpired()); // returns instance of TestFailedState fsm.isStopped(); // returns true

Реализуя интерфейс Стратегия агрегирования мы создали FSMaggregationStrategy, которая объявляется в контексте Spring следующим образом:

<bean id="runnableAggregator" class="com.my.FSMAggregationStrategy"> <constructor-arg value="com.my.TestStateFSM"/> </bean>

Простейшая реализация стратегии агрегации при использовании этого конечного автомата может выглядеть так: Скрытый текст

public class FSMAggregationStrategy<T> implements AggregationStrategy { private final Yatomata<T> fsmEngine; public FSMAggregationStrategy(Class fsmClass) { this.fsmEngine = new FSMBuilder(fsmClass).

build(); } @Override public Exchange aggregate(Exchange state, Exchange message) { Object result = state == null ? null : state.getIn().

getBody(); try { Object event = message.getIn().

getBody(); Object fsm = fsmEngine.getFSM(); result = fsmEngine.fire(event); } catch (Exception e) { logger.error(fsm + " error", e); } if (result != null) { message.getIn().

setBody(result); } return message; } public boolean isCompleted() { return fsmEngine.isCompleted(); } }



выводы

Перечисленные приемы позволили нам реализовать несколько горизонтально масштабируемых сервисов различного назначения.

Apache Camel показал себя с лучшей стороны и оправдал возложенные на него ожидания.

Декларативность сочетается с высокой гибкостью, что в совокупности обеспечивает отличное масштабирование интеграционных приложений с минимумом усилий, необходимых для поддержки и добавления нового функционала.

Теги: #java #camel #highload #интеграция сервисов #Высокая производительность #java

Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.