Как Кафка Стал Реальностью



Как Кафка стал реальностью

Привет, Хабр! Я работаю в команде Тинькофф, которая занимается разработкой собственного центра уведомлений.

В основном я занимаюсь разработкой на Java с использованием Spring boot и решаю различные технические проблемы, возникающие в проекте.

Большинство наших микросервисов взаимодействуют друг с другом асинхронно через брокера сообщений.

Раньше в качестве брокера мы использовали IBM MQ, который уже не справлялся с нагрузкой, но при этом имел высокие гарантии доставки.

В качестве замены нам предложили Apache Kafka, который имеет высокий потенциал масштабирования, но, к сожалению, требует практически индивидуального подхода к настройке для разных сценариев.

Кроме того, механизм доставки хотя бы один раз, работающий в Kafka по умолчанию, не позволял «из коробки» поддерживать необходимый уровень согласованности.

Далее я поделюсь нашим опытом настройки Kafka, в частности расскажу, как настроить и жить ровно с однократной доставкой.



Гарантированная доставка и многое другое.

Рассмотренные ниже настройки помогут предотвратить ряд проблем с настройками подключения по умолчанию.

Но сначала хотелось бы обратить внимание на один параметр, который облегчит возможную отладку.

Это поможет ID клиента для производителя и потребителя.

На первый взгляд, в качестве значения можно использовать имя приложения, и в большинстве случаев это сработает. Хотя ситуация, когда приложение использует несколько потребителей и вы даете им один и тот же client.id, приводит к следующему предупреждению:

  
   

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Если вы хотите использовать JMX в приложении с Kafka, это может стать проблемой.

В этом случае лучше всего использовать в качестве значения client.id комбинацию имени приложения и, например, названия темы.

Результат нашей конфигурации можно увидеть в выводе команды.

группы потребителей Кафки из утилит от Confluent:

Как Кафка стал реальностью

Теперь давайте рассмотрим сценарий гарантированной доставки сообщений.

Kafka Producer имеет параметр атаки , который позволяет настроить, после какого количества подтверждений лидер кластера считает сообщение успешно написанным.

Этот параметр может принимать следующие значения:

  • 0 — подтверждение не будет учитываться.

  • 1 — параметр по умолчанию, для подтверждения требуется только 1 реплика.

  • −1 — требуется подтверждение от всех синхронизированных реплик (настройка кластера мин.

    insync.replicas ).

Из перечисленных значений видно, что acks, равный −1, дает самую сильную гарантию того, что сообщение не будет потеряно.

Как мы все знаем, распределенные системы ненадежны.

Для защиты от временных сбоев Kafka Producer предоставляет возможность повторные попытки , который позволяет вам установить количество попыток повторной отправки в течение доставка.

timeout.ms .

Поскольку параметр повторов имеет значение по умолчанию Integer.MAX_VALUE (2147483647), количество повторных попыток сообщения можно регулировать, изменяя только Delivery.timeout.ms.

Мы движемся к ровно однократной доставке

Перечисленные настройки позволяют нашему Продюсеру доставлять сообщения с высокой гарантией.

Давайте теперь поговорим о том, как гарантировать, что в тему Kafka будет записана только одна копия сообщения? В простейшем случае для этого нужно установить параметр Producer включить.

идемпотентность к истине.

Идемпотентность гарантирует, что в определенный раздел одной темы будет записано только одно сообщение.

Предпосылкой для включения идемпотентности являются значения acks = все, повтор > 0, макс.

in.flight.requests.per.connection ≤ 5 .

Если эти параметры не указаны разработчиком, указанные выше значения будут установлены автоматически.

При настройке идемпотентности необходимо обеспечить, чтобы одни и те же сообщения каждый раз попадали в одни и те же разделы.

Это можно сделать, установив для ключа и параметра Partitioner.class значение Producer. Начнем с ключа.

Оно должно быть одинаковым для каждой заявки.

Этого можно легко добиться, используя любой бизнес-идентификатор из исходного сообщения.

Параметр partitioner.class имеет значение по умолчанию — DefaultPartitioner .

Используя эту стратегию разделения, по умолчанию мы действуем следующим образом:

  • Если раздел указан явно при отправке сообщения, то используем его.

  • Если раздел не указан, но указан ключ, выберите раздел по хешу ключа.

  • Если раздел и ключ не указаны, выберите разделы по одному (циклически).

Также, используя ключ и идемпотентную отправку с параметром max.in.flight.requests.per.connection = 1 обеспечивает упрощенную обработку сообщений на потребителе.

Также стоит помнить, что если на вашем кластере настроен контроль доступа, то вам потребуются права на идемпотентную запись в тему.

Если вдруг вам не хватает возможностей идемпотентной отправки по ключу или логика на стороне Producer требует поддержания согласованности данных между разными партициями, то на помощь придут транзакции.

Кроме того, с помощью цепной транзакции можно условно синхронизировать запись в Kafka, например, с записью в базе данных.

Чтобы включить транзакционную отправку Продюсеру, он должен быть идемпотентным и дополнительно установлен транзакционный.

id .

Если в вашем кластере Kafka настроен контроль доступа, то транзакционной записи, например идемпотентной записи, потребуются права на запись, которые можно предоставить по маске, используя значение, хранящееся в транзакциональном.

id. Формально в качестве идентификатора транзакции можно использовать любую строку, например имя приложения.

Но если вы запустите несколько экземпляров одного и того же приложения с одним и тем же транзакциональным.

id, то первый запущенный экземпляр будет остановлен с ошибкой, так как Kafka посчитает его процессом-зомби.



org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Чтобы решить эту проблему, мы добавляем к имени приложения суффикс в виде имени хоста, которое мы получаем из переменных среды.

Производитель настроен, но транзакции в Kafka контролируют только объем сообщения.

Независимо от статуса транзакции, сообщение сразу попадает в тему, но имеет дополнительные системные атрибуты.

Чтобы такие сообщения не были прочитаны Потребителем раньше времени, ему необходимо установить параметр уровень изоляции для значения read_committed. Такой Потребитель сможет читать нетранзакционные сообщения, как и раньше, а транзакционные сообщения только после фиксации.

Если вы установили все параметры, перечисленные ранее, то вы настроили ровно один раз доставку.

Поздравляем! Но есть еще один нюанс.

Transactional.id, который мы настроили выше, на самом деле является префиксом транзакции.

В диспетчере транзакций к нему добавляется порядковый номер.

Полученный идентификатор выдается транзакция.

id.expiration.ms , который настроен в кластере Kafka и имеет значение по умолчанию «7 дней».

Если за это время приложение не получило ни одного сообщения, то при попытке следующей транзакционной отправки вы получите Инвалидпидмаппингисключение .

Координатор транзакции затем выдаст новый порядковый номер для следующей транзакции.

Однако сообщение может быть потеряно, если исключение InvalidPidMappingException не будет обработано правильно.



Вместо результатов

Как видите, просто отправлять сообщения Кафке недостаточно.

Вам необходимо выбрать комбинацию параметров и быть готовым к быстрым изменениям.

В этой статье я постарался подробно показать настройку ровно однократной доставки и описал несколько проблем с конфигурациями client.id иtransactional.id, с которыми мы столкнулись.

Ниже приведена сводка настроек производителя и потребителя.

Режиссер:

  1. аккс = все
  2. повторов > 0
  3. включить.

    идемпотенс = правда

  4. max.in.flight.requests.per.connection ≤ 5 (1 для упорядоченной отправки)
  5. транзакция.

    id = ${имя-приложения}-${имя хоста}

Потребитель:
  1. изоляция.

    уровень = read_commited

Чтобы минимизировать ошибки в будущих приложениях, мы сделали собственную обертку над конфигурацией пружины, где уже заданы значения некоторых из перечисленных параметров.

Вот пара материалов для самостоятельного изучения:

Теги: #Микросервисы #kafka #java #Распределенные системы
Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.