Как Spring-Kafka Обрабатывает Сообщения И Мешает Ли Этому Автофиксация?

В предыдущей статье мы посмотрели, как это работает КафкаПотребитель и как реализован механизм автоматической фиксации.

В этой статье я хочу сосредоточиться на том, как принимаются и обрабатываются сообщения Spring-Kafka. Стоит отметить, что сейчас мы рассматриваем ситуацию с включить.

auto.commit = правда.

Согласно документации начиная с версии 2.3, для параметра auto.commit по умолчанию установлено значение false, хотя ранее это значение было похоже на значение по умолчанию в kafka-clients, т.е.

true. Это связано с тем, что контейнер KafkaMessageListenerContainer имеет собственные механизмы управления фиксациями.

Насколько это удобно и какие плюсы и минусы – пожалуй, тема отдельной статьи.

Поскольку контейнер прослушивателя имеет собственный механизм фиксации смещений, он предпочитает, чтобы Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG имел значение false. Начиная с версии 2.3, он безоговорочно устанавливает для него значение false, если только оно специально не установлено в фабрике-потребителе или в переопределении потребительского свойства контейнера.

Я постараюсь ответить на следующие вопросы:
  1. Как Spring-Kafka работает с KafkaConsumer?
  2. Каковы возможности параллельной обработки сообщений?
  3. Что происходит, если при обработке сообщений возникают ошибки?
Рассмотрим типичный минимальный пример получения сообщений из топика.

Прежде всего нам нужна конфигурация для подключения к теме:

   

@Configuration public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> myListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory());

Теги: #kafka #java #spring #Consumer #commit
Вместе с данным постом часто просматривают: