Книга «Потоки Кафки В Действии. Приложения И Микросервисы Для Работы В Реальном Времени»



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Здравствуйте, жители Хабро! Эта книга подойдет любому разработчику, желающему разобраться в обработке потоков.

Понимание распределенного программирования поможет вам лучше понять Kafka и Kafka Streams. Было бы неплохо узнать сам фреймворк Kafka, но это не обязательно: я расскажу вам все, что вам нужно.

В этой книге как опытные разработчики Kafka, так и новички узнают, как создавать интересные приложения для потоковой обработки с использованием библиотеки Kafka Streams. Разработчики Java среднего и продвинутого уровня, уже знакомые с такими понятиями, как сериализация, научатся применять свои навыки для создания приложений Kafka Streams. Исходный код книги написан на Java 8 и в значительной степени использует синтаксис лямбда-выражений Java 8, поэтому знание того, как работать с лямбда-функциями (даже на другом языке программирования), будет полезно.



Отрывок.

5.3. Агрегация и оконные операции

В этом разделе мы продолжим исследовать наиболее перспективные части Kafka Streams. На данный момент мы рассмотрели следующие аспекты Kafka Streams:
  • создание топологии обработки;
  • использование состояния в потоковых приложениях;
  • выполнение соединений потоков данных;
  • различия между потоками событий (KStream) и потоками обновлений (KTable).

В следующих примерах мы объединим все эти элементы.

Вы также узнаете об оконном режиме — еще одной замечательной особенности потоковых приложений.

Нашим первым примером будет простая агрегация.



5.3.1. Агрегирование продаж акций по отраслям промышленности

Агрегация и группировка — жизненно важные инструменты при работе с потоковыми данными.

Изучение отдельных записей по мере их поступления зачастую оказывается недостаточным.

Чтобы извлечь из данных дополнительную информацию, необходимо их сгруппировать и объединить.

В этом примере вы наденете костюм дневного трейдера, которому необходимо отслеживать объем продаж акций компаний в нескольких отраслях.

В частности, вас интересуют пять компаний с крупнейшими продажами акций в каждой отрасли.

Такое агрегирование потребует следующих нескольких шагов для перевода данных в желаемую форму (говоря в общих чертах).

  1. Создайте тематический источник, который будет публиковать необработанную информацию о торговле акциями.

    Нам нужно будет сопоставить объект типа StockTransaction с объектом типа ShareVolume. Дело в том, что объект StockTransaction содержит метаданные о продажах, а нам нужны только данные о количестве продаваемых акций.

  2. Группируйте данные ShareVolume по символам акций.

    После группировки по символу вы можете свернуть эти данные в промежуточные итоги объемов продаж акций.

    Стоит отметить, что метод KStream.groupBy возвращает экземпляр типа KGroupedStream. И вы можете получить экземпляр KTable, вызвав метод KGroupedStream.reduce.

Что такое интерфейс KGroupedStream Методы KStream.groupBy и KStream.groupByKey возвращают экземпляр KGroupedStream. KGroupedStream — промежуточное представление потока событий после группировки по ключам.

Он совершенно не предназначен для непосредственной работы с ним.

Вместо этого KGroupedStream используется для операций агрегации, результатом которых всегда является KTable. А поскольку результатом операций агрегации является KTable и они используют хранилище состояний, то возможно, что не все обновления в результате отправляются дальше по конвейеру.

Метод KTable.groupBy возвращает аналогичный KGroupedTable — промежуточное представление потока обновлений, перегруппированного по ключу.

Давайте сделаем небольшую паузу и посмотрим на рис.

5.9, на котором показано, чего мы достигли.

Эта топология должна быть вам уже хорошо знакома.



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Давайте теперь посмотрим на код этой топологии (его можно найти в файле src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (листинг 5.2).



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Данный код отличается краткостью и большим объемом действий, выполняемых в несколько строк.

Вы можете заметить кое-что новое в первом параметре метода builder.stream: значение перечислимого типа AutoOffsetReset.EARLIEST (есть еще LATEST), заданное с помощью метода Consumed.withOffsetResetPolicy. Этот тип перечисления может использоваться для указания стратегии сброса смещения для каждого KStream или KTable и имеет приоритет над опцией сброса смещения из конфигурации.

GroupByKey и GroupBy Интерфейс KStream имеет два метода группировки записей: GroupByKey и GroupBy. Оба возвращают KGroupedTable, поэтому вам может быть интересно, в чем разница между ними и когда какой из них использовать? Метод GroupByKey используется, когда ключи в KStream уже не пусты.

И самое главное, флаг «требуется переразметка» ни разу не был установлен.

Метод GroupBy предполагает, что вы изменили ключи группировки, поэтому для флага перераспределения установлено значение true. Выполнение соединений, агрегаций и т. д. после метода GroupBy приведет к автоматическому переразбиению.

Резюме: По возможности следует использовать GroupByKey, а не GroupBy.

Что делают методы mapValues и groupBy, понятно, поэтому давайте взглянем на метод sum() (находится в src/main/java/bbejeck/model/ShareVolume.java) (листинг 5.3).



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Метод ShareVolume.sum возвращает промежуточную сумму объема продаж акций, а результатом всей цепочки вычислений является таблица KTable. объект. Теперь вы понимаете роль, которую играет KTable. Когда поступают объекты ShareVolume, соответствующий объект KTable сохраняет последнее текущее обновление.

Важно помнить, что все обновления отражаются в предыдущей таблице ShareVolumeKTable, но не все отправляются дальше.

Затем мы используем эту таблицу KTable для агрегирования (по количеству проданных акций) и получения пяти компаний с наибольшим объемом торгов акций в каждой отрасли.

Наши действия в этом случае будут аналогичны действиям для первой агрегации.

  1. Выполните еще одну операцию groupBy, чтобы сгруппировать отдельные объекты ShareVolume по отраслям.

  2. Начните суммировать объекты ShareVolume. На этот раз объектом агрегации является приоритетная очередь фиксированного размера.

    В этой очереди фиксированного размера остаются только пять компаний с наибольшим количеством проданных акций.

  3. Сопоставьте очереди из предыдущего абзаца со строковым значением и верните пять самых торгуемых акций по количеству по отраслям.

  4. Запишите результаты в строковой форме в тему.

На рис.

5.10 показан граф топологии потока данных.

Как видите, второй этап обработки довольно прост.

Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Теперь, когда у нас есть четкое представление о структуре второго раунда обработки, мы можем обратиться к его исходному коду (вы найдете его в файле src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (листинг 5.4).

.

Этот инициализатор содержит переменную фиксированной очереди.

Это пользовательский объект, который является адаптером для java.util.TreeSet, который используется для отслеживания N первых результатов в порядке убывания торгуемых акций.



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Вы уже видели вызовы groupBy и mapValues, поэтому мы не будем в них углубляться (мы вызываем метод KTable.toStream, поскольку метод KTable.print устарел).

Но вы еще не видели версию агрегата() KTable, поэтому мы уделим этому немного времени.

Как вы помните, отличие KTable заключается в том, что записи с одинаковыми ключами считаются обновлениями.

KTable заменяет старую запись новой.

Агрегация происходит аналогичным образом: агрегируются самые последние записи с одинаковым ключом.

При поступлении записи она добавляется в экземпляр класса FixSizePriorityQueue с помощью сумматора (второй параметр в вызове агрегатного метода), но если уже существует другая запись с таким же ключом, то старая запись удаляется с помощью вычитателя (третий параметр в вызов совокупного метода).

Это все означает, что наш агрегатор, фиксированныйSizePriorityQueue, не агрегирует все значения одним ключом, а хранит скользящую сумму количеств N наиболее торгуемых типов акций.

Каждая входящая запись содержит общее количество проданных на данный момент акций.

KTable предоставит вам информацию о том, акции каких компаний в настоящее время наиболее торгуются, не требуя скользящего агрегирования каждого обновления.

Мы научились делать две важные вещи:

  • группировать значения в KTable по общему ключу;
  • выполнять полезные операции, такие как сведение и агрегирование этих сгруппированных значений.

Знание того, как выполнять эти операции, важно для понимания значения данных, передаваемых через приложение Kafka Streams, и понимания того, какую информацию они несут. Мы также собрали воедино некоторые ключевые концепции, обсуждавшиеся ранее в этой книге.

В главе 4 мы обсуждали, насколько отказоустойчивое локальное состояние важно для потокового приложения.

Первый пример в этой главе продемонстрировал, почему локальное состояние так важно — оно дает вам возможность отслеживать ту информацию, которую вы уже видели.

Локальный доступ позволяет избежать задержек в сети, что делает приложение более производительным и устойчивым к ошибкам.

При выполнении любой операции объединения или агрегации необходимо указать имя хранилища состояний.

Операции свертки и агрегации возвращают экземпляр KTable, а KTable использует хранилище состояний для замены старых результатов новыми.

Как вы видели, не все обновления передаются по конвейеру, и это важно, поскольку операции агрегирования предназначены для получения сводной информации.

Если вы не примените локальное состояние, KTable пересылает все результаты агрегации и сведения.

Далее мы рассмотрим выполнение таких операций, как агрегирование, в течение определенного периода времени — так называемых оконных операций.



5.3.2. Оконные операции

В предыдущем разделе мы представили скользящую свертку и агрегацию.

Приложение выполняло непрерывный анализ объема продаж акций с последующим агрегированием пяти наиболее торгуемых акций на бирже.

Иногда такое непрерывное агрегирование и сведение результатов необходимо.

А иногда нужно выполнять операции только за заданный период времени.

Например, подсчитайте, сколько биржевых сделок было совершено с акциями конкретной компании за последние 10 минут. Или сколько пользователей кликнули по новому рекламному баннеру за последние 15 минут. Приложение может выполнять такие операции несколько раз, но результаты будут применимы только к указанным периодам времени (временным окнам).



Учет биржевых операций по покупателю

В следующем примере мы будем отслеживать операции с акциями нескольких трейдеров — либо крупных организаций, либо умных отдельных финансистов.

Есть две возможные причины такого отслеживания.

Один из них – необходимость знать, что покупают/продают лидеры рынка.

Если эти крупные игроки и опытные инвесторы увидят возможности, имеет смысл следовать их стратегии.

Вторая причина – желание выявить любые возможные признаки незаконной инсайдерской торговли.

Для этого вам нужно будет проанализировать корреляцию больших всплесков продаж с важными пресс-релизами.

Такое отслеживание состоит из следующих этапов:

  • создание потока для чтения из темы биржевых сделок;
  • группировка входящих записей по идентификатору покупателя и биржевому символу.

    Вызов метода groupBy возвращает экземпляр класса KGroupedStream;

  • Метод KGroupedStream.windowedBy возвращает поток данных, ограниченный временным окном, что позволяет выполнять оконную агрегацию.

    В зависимости от типа окна возвращается либо TimeWindowedKStream, либо SessionWindowedKStream;

  • количество транзакций для операции агрегации.

    Поток оконных данных определяет, учитывается ли при этом подсчет конкретная запись;

  • запись результатов в тему или вывод их в консоль во время разработки.

Топология этого приложения проста, но ее четкое представление было бы полезно.

Давайте посмотрим на рис.

5.11. Далее мы рассмотрим функциональность оконных операций и соответствующий код.

Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»



Типы окон

В Kafka Streams есть три типа окон:
  • сессионный;
  • «кувыркающийся» (кувыркающийся);
  • скольжение/прыжки.

Какой из них выбрать, зависит от требований вашего бизнеса.

Окна переворачивания и прыжков ограничены по времени, а окна сеанса ограничены активностью пользователя — продолжительность сеанса(ов) определяется исключительно тем, насколько активен пользователь.

Главное, что следует помнить, это то, что все типы окон основаны на отметках даты и времени записей, а не на системном времени.

Далее мы реализуем нашу топологию с каждым из типов окон.

Полный код будет приведен только в первом примере; для других типов окон ничего не изменится, кроме типа работы окна.



Окна сеанса

Окна сеансов сильно отличаются от всех других типов окон.

Они ограничены не столько временем, сколько активностью пользователя (или активностью сущности, которую вы хотели бы отслеживать).

Окна сеансов разделены периодами бездействия.

Рисунок 5.12 иллюстрирует концепцию окон сеанса.

Меньший сеанс объединится с сеансом слева от него.

А сеанс справа будет отдельным, поскольку он следует за длительным периодом бездействия.

Окна сеансов основаны на активности пользователя, но для определения того, к какому сеансу принадлежит запись, используются отметки даты и времени из записей.



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»



Использование окон сеансов для отслеживания операций с акциями

Давайте использовать окна сеанса для сбора информации об обменных транзакциях.

Реализация окон сеанса показана в листинге 5.5 (который можно найти в src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Вы уже видели большинство операций в этой топологии, поэтому нет необходимости рассматривать их здесь снова.

Но здесь есть и несколько новых элементов, о которых мы сейчас поговорим.

Любая операция groupBy обычно выполняет какую-либо операцию агрегирования (агрегирование, сведение или подсчет).

Вы можете выполнить либо накопительную агрегацию с промежуточным итогом, либо оконную агрегацию, при которой учитываются записи в пределах указанного временного окна.

Код в листинге 5.5 подсчитывает количество транзакций в окнах сеанса.

На рис.

5.13 эти действия проанализированы поэтапно.

Вызвав windowedBy(SessionWindows.with(twentySeconds).

until(fifteenMinutes)) мы создаем окно сеанса с интервалом неактивности 20 секунд и интервалом сохранения 15 минут. Интервал простоя в 20 секунд означает, что приложение включит любую запись, поступившую в течение 20 секунд после окончания или начала текущего сеанса, в текущий (активный) сеанс.



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Далее мы указываем, какую операцию агрегации необходимо выполнить в окне сеанса — в данном случае считать.

Если входящая запись выходит за пределы окна неактивности (по обе стороны от отметки даты/времени), приложение создает новый сеанс.

Интервал хранения означает поддержание сеанса в течение определенного периода времени и допускает поздние данные, которые выходят за рамки периода неактивности сеанса, но все же могут быть прикреплены.

Кроме того, начало и конец нового сеанса, возникшего в результате слияния, соответствуют самой ранней и самой последней отметке даты/времени.

Давайте посмотрим на несколько записей метода count, чтобы понять, как работают сеансы (таблица 5.1).



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

При поступлении записей мы ищем существующие сеансы с тем же ключом, временем окончания, меньшим, чем текущая отметка даты/времени — интервал неактивности, и временем начала, большим, чем текущая отметка даты/времени + интервал неактивности.

С учетом этого четыре записи из табл.

5.1 объединяются в один сеанс следующим образом.

1. Запись 1 поступает первой, поэтому время начала равно времени окончания и равно 00:00:00. 2. Далее поступает запись 2, и мы ищем сессии, которые заканчиваются не раньше 23:59:55 и начинаются не позднее 00:00:35. Находим запись 1 и объединяем сеансы 1 и 2. Берем время начала сеанса 1 (раньше) и время окончания сеанса 2 (позже), чтобы наш новый сеанс начинался в 00:00:00 и заканчивался в 00: 00:15. 3. Приходит запись 3, ищем сессии между 00:00:30 и 00:01:10 и не находим.

Добавьте второй сеанс для ключа 123-345-654,FFBE, начинающийся и заканчивающийся в 00:00:50. 4. Приходит запись 4 и ищем сессии между 23:59:45 и 00:00:25. На этот раз найдены оба сеанса 1 и 2. Все три сеанса объединяются в один со временем начала 00:00:00 и временем окончания 00:00:15. Из того, что описано в этом разделе, стоит запомнить следующие важные нюансы:

  • сеансы не являются окнами фиксированного размера.

    Продолжительность сеанса определяется активностью в течение заданного периода времени;

  • Отметки даты и времени в данных определяют, приходится ли событие на существующий сеанс или на период простоя.

Далее мы поговорим о следующем типе окон – «кувыркающихся» окнах.



«кувыркающиеся» окна

Переворачивающиеся окна фиксируют события, происходящие в определенный период времени.

Представьте, что вам нужно фиксировать все операции с акциями определенной компании каждые 20 секунд, поэтому вы собираете все события за этот период времени.

В конце 20-секундного интервала окно переворачивается и переходит к новому 20-секундному интервалу наблюдения.

Рисунок 5.14 иллюстрирует эту ситуацию.



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Как видите, в окно включены все события, полученные за последние 20 секунд. По истечении этого периода времени создается новое окно.

В листинге 5.6 показан код, демонстрирующий использование переключающихся окон для захвата транзакций с акциями каждые 20 секунд (находится в src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

С этим небольшим изменением в вызове метода TimeWindows.of вы можете использовать переворачивающееся окно.

В этом примере не вызывается метод до(), поэтому будет использоваться интервал хранения по умолчанию, равный 24 часам.

Наконец, пришло время перейти к последней из опций окон – «прыгающим» окнам.



Раздвижные («прыгающие») окна

Скользящие/прыгающие окна аналогичны переворачивающимся окнам, но с небольшой разницей.

Скользящие окна не ждут окончания временного интервала, прежде чем создавать новое окно для обработки последних событий.

Они начинают новые вычисления после интервала ожидания, меньшего, чем длительность окна.

Чтобы проиллюстрировать разницу между падающими и прыгающими окнами, вернемся к примеру подсчета биржевых сделок.

Наша цель по-прежнему состоит в подсчете количества транзакций, но мы не хотим ждать все время, прежде чем обновить счетчик.

Вместо этого мы будем обновлять счетчик через более короткие промежутки времени.

Например, мы по-прежнему будем считать количество транзакций каждые 20 секунд, но обновлять счетчик каждые 5 секунд, как показано на рис.

5.15. В этом случае мы получаем три окна результатов с перекрывающимися данными.



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

В листинге 5.7 показан код для определения скользящих окон (находится в src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Переворачивающееся окно можно преобразовать в прыгающее окно, добавив вызов метода заранееBy().

В показанном примере интервал сохранения составляет 15 минут. В этом разделе вы видели, как ограничить результаты агрегирования временными окнами.

В частности, я хочу, чтобы вы запомнили следующие три вещи из этого раздела:

  • размер окон сеансов ограничен не периодом времени, а активностью пользователя;
  • «переворачивающиеся» окна дают обзор событий за заданный период времени;
  • Продолжительность переходов между окнами фиксирована, но они часто обновляются и могут содержать перекрывающиеся записи во всех окнах.

Далее мы узнаем, как преобразовать KTable обратно в KStream для подключения.



5.3.3. Соединение объектов KStream и KTable

В главе 4 мы обсуждали соединение двух объектов KStream. Теперь нам предстоит научиться соединять KTable и KStream. Это может понадобиться по следующей простой причине.

KStream — это поток записей, а KTable — это поток обновлений записей, но иногда вам может потребоваться добавить дополнительный контекст к потоку записей, используя обновления из KTable. Возьмем данные о количестве биржевых сделок и объединим их с биржевыми новостями по соответствующим отраслям.

Вот что вам нужно сделать, чтобы добиться этого, учитывая уже имеющийся у вас код.

  1. Преобразовать объект KTable с данными о количестве биржевых транзакций в KStream с последующей заменой ключа на ключ, указывающий отрасль промышленности, соответствующую этому биржевому символу.

  2. Создайте объект KTable, который считывает данные из темы с новостями фондовой биржи.

    Эта новая таблица KTable будет классифицирована по отраслям промышленности.

  3. Связывайте обновления новостей с информацией о количестве биржевых сделок по отраслям.

Теперь посмотрим, как реализовать этот план действий.



Преобразование KTable в KStream

Чтобы преобразовать KTable в KStream, вам необходимо сделать следующее.

  1. Вызовите метод KTable.toStream().

  2. Вызвав метод KStream.map, замените ключ названием отрасли, а затем получите объект TransactionSummary из экземпляра Windowed.
Мы объединим эти операции следующим образом (код можно найти в файле src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (листинг 5.8).



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Поскольку мы выполняем операцию KStream.map, возвращаемый экземпляр KStream автоматически перераспределяется при использовании в соединении.

Мы завершили процесс преобразования, теперь нам нужно создать объект KTable для чтения биржевых новостей.



Создание KTable для биржевых новостей

К счастью, для создания объекта KTable требуется всего одна строка кода (код можно найти в src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (листинг 5.9).



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Стоит отметить, что никакие объекты Serde указывать не требуется, так как в настройках используются строковые Serdes. Кроме того, при использовании перечисления EARLIEST таблица заполняется записями в самом начале.

Теперь можно перейти к заключительному этапу – подключению.



Соединение обновлений новостей с данными о количестве транзакций

Создать соединение не сложно.

Мы будем использовать левое соединение в случае отсутствия биржевых новостей по соответствующей отрасли (необходимый код можно найти в файле src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (листинг 5.10).



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Этот оператор leftJoin довольно прост. В отличие от соединений, описанных в главе 4, метод JoinWindow не используется, поскольку при выполнении соединения KStream-KTable в KTable имеется только одна запись для каждого ключа.

Такое соединение не ограничено во времени: запись либо есть в KTable, либо отсутствует. Основной вывод: используя объекты KTable, вы можете обогатить KStream менее часто обновляемыми справочными данными.

Теперь мы рассмотрим более эффективный способ обогащения событий из KStream.

5.3.4. Объекты GlobalKTable

Как видите, существует необходимость обогатить потоки событий или добавить к ним контекст. В главе 4 вы видели связи между двумя объектами KStream, а в предыдущем разделе вы видели связь между KStream и KTable. Во всех этих случаях необходимо переразбить поток данных при сопоставлении ключей с новым типом или значением.

Иногда перераспределение выполняется явно, а иногда Kafka Streams делает это автоматически.

Переразметка необходима, поскольку ключи изменились и записи должны попасть в новые разделы, иначе подключение будет невозможно (об этом говорилось в Главе 4, в разделе «Переразбивка данных» подраздела 4.2.4).



Переразметка имеет свою цену

Переразметка требует затрат — дополнительных затрат ресурсов на создание промежуточных тем, хранение дублирующихся данных в другой теме; это также означает увеличение задержки из-за записи и чтения из этой темы.

Кроме того, если вам необходимо объединить несколько аспектов или измерений, вам необходимо объединить объединения, сопоставить записи с новыми ключами и снова запустить процесс переразбиения.



Подключение к меньшим наборам данных

В некоторых случаях объем подключаемых справочных данных относительно невелик, поэтому их полные копии легко помещаются локально на каждом узле.

Для подобных ситуаций Kafka Streams предоставляет класс GlobalKTable. Экземпляры GlobalKTable уникальны, поскольку приложение реплицирует все данные на каждый узел.

А поскольку все данные присутствуют на каждом узле, нет необходимости разбивать поток событий по ключу ссылочных данных, чтобы он был доступен всем разделам.

Вы также можете выполнять соединения без ключа, используя объекты GlobalKTable. Давайте вернемся к одному из предыдущих примеров, чтобы продемонстрировать эту функцию.



Соединение объектов KStream с объектами GlobalKTable

В подразделе 5.3.2 мы выполнили оконную агрегацию биржевых операций по покупателям.

Результаты этого агрегирования выглядели примерно так:

   

{customerId='074-09-3705', stockTicker='GUTM'}, 17 {customerId='037-34-5184', stockTicker='CORK'}, 16

Хотя эти результаты послужили цели, было бы более полезно, если бы также отображалось имя клиента и полное название компании.

Чтобы добавить имя клиента и название компании, вы можете выполнить обычные соединения, но вам потребуется выполнить два сопоставления ключей и переразметку.

С GlobalKTable вы можете избежать затрат на такие операции.

Для этого мы воспользуемся объектом countStream из листинга 5.11 (соответствующий код можно найти в src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) и подключим его к двум объектам GlobalKTable.

Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Мы уже обсуждали это раньше, поэтому я не буду повторяться.

Но я отмечу, что код в функции toStream().

map абстрагируется в объект функции вместо встроенного лямбда-выражения ради удобства чтения.

Следующий шаг — объявить два экземпляра GlobalKTable (показанный код можно найти в файле src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (листинг 5.12).



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Обратите внимание, что названия тем описываются с использованием перечислимых типов.

Теперь, когда у нас готовы все компоненты, осталось только написать код для подключения (который можно найти в файле src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (листинг 5.13).



Книга «Потоки Кафки в действии.
</p><p>
 Приложения и микросервисы для работы в реальном времени»

Хотя в этом коде есть два соединения, они объединены в цепочку, поскольку ни один из их результатов не используется отдельно.

Результаты отображаются в конце всей операции.

При выполнении вышеуказанной операции подключите Теги: #Высокая производительность #Большие данные #Apache #Профессиональная литература #книги #книги

Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.