Apache Kafka Для Чайников

Эта статья будет полезна тем, кто только начал знакомиться с микросервисной архитектурой и сервисом Apache Kafka. Материал не претендует на подробное руководство, но поможет быстро освоить данную технологию.

Я расскажу о том, как установить и настроить Kafka в Windows 10. Также мы создадим проект с использованием Intellij IDEA и Spring Boot.



За что?

Трудности в понимании тех или иных инструментов часто связаны с тем, что разработчик никогда не сталкивался с ситуациями, в которых эти инструменты могут понадобиться.

С Кафкой все точно так же.

Опишем ситуацию, в которой эта технология будет полезна.

Если у вас монолитная архитектура приложения, то никакой Kafka вам, конечно, не нужен.

Все меняется с переходом на микросервисы.

По сути, каждый микросервис — это отдельная программа, выполняющая определенную функцию и которую можно запускать независимо от других микросервисов.

Микросервисы можно сравнить с сотрудниками офиса, которые сидят за отдельными столами и решают свои проблемы независимо от коллег.

Работа такой распределенной команды немыслима без централизованной координации.

Сотрудники должны иметь возможность обмениваться сообщениями и результатами своей работы друг с другом.

Это именно та проблема, которую стремится решить Apache Kafka для микросервисов.

Apache Kafka — брокер сообщений.

С его помощью микросервисы могут взаимодействовать друг с другом, отправляя и получая важную информацию.

Возникает вопрос, а почему бы не использовать для этих целей обычный POST — запрос, в теле которого можно передать необходимые данные и таким же образом получить ответ? Этот подход имеет ряд очевидных недостатков.

Например, производитель (служба, отправляющая сообщение) может отправлять данные только в качестве ответа на запрос потребителя (служба, получающая данные).

Допустим, потребитель отправляет запрос POST, а производитель отвечает на него.

В это время потребитель по каким-то причинам не может принять полученный ответ. Что произойдет с данными? Они будут потеряны.

Потребителю снова придется отправить запрос и надеяться, что данные, которые он хотел получить, за это время не изменились, а производитель все еще готов принять запрос.

Apache Kafka решает эту и многие другие проблемы, возникающие при обмене сообщениями между микросервисами.

Не лишним будет напомнить, что бесперебойный и удобный обмен данными — одна из ключевых проблем, которую необходимо решить для обеспечения стабильной работы микросервисной архитектуры.



Установка и настройка ZooKeeper и Apache Kafka в Windows 10

Первое, что вам нужно знать, чтобы начать, это то, что Apache Kafka работает поверх сервиса ZooKeeper. ZooKeeper — это служба распределенной настройки и синхронизации, и это все, что нам нужно знать о ней в данном контексте.

Мы должны загрузить, настроить и запустить его, прежде чем мы сможем начать работу с Kafka. Прежде чем начать работу с ZooKeeper, убедитесь, что у вас установлена и настроена JRE. Вы можете скачать последнюю версию ZooKeeper с официального сайта.

.

Извлекаем файлы из скачанного архива ZooKeeper в какую-нибудь папку на диске.

В папке Zookeeper с номером версии находим папку conf и в ней файл «zoo_sample.cfg».



Apache Kafka для чайников

Скопируйте его и измените имя копии на «zoo.cfg».

Откройте файл копии и найдите строку dataDir=/tmp/zookeeper. Пишем в этой строке полный путь к нашей папке Zookeeper-x.x.x. У меня это выглядит так: dataDir=C:\\ZooKeeper\\zookeeper-3.6.0

Apache Kafka для чайников

Теперь давайте добавим системную переменную среды: ZOOKEEPER_HOME = C:\ ZooKeeper \zookeeper-3.4.9 и в конце системной переменной Path добавим запись: ;%ZOOKEEPER_HOME%\bin; Запускаем командную строку и пишем команду:

  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
   

zkserver

Если все сделано правильно, вы увидите что-то вроде этого.



Apache Kafka для чайников

Это означает, что ZooKeeper запустился нормально.

Перейдем непосредственно к установке и настройке сервера Apache Kafka. Загрузите последнюю версию с официального сайта и извлеките содержимое архива: kafka.apache.org/downloads В папке с Кафкой находим папку config, в ней находим файл server.properties и открываем его.



Apache Kafka для чайников

Найдите строку log.dirs=/tmp/kafka-logs и укажите в ней путь, по которому Kafka будет сохранять логи: log.dirs=c:/kafka/kafka-logs.

Apache Kafka для чайников

В этой же папке редактируем файл Zookeeper.properties. Измените строку dataDir=/tmp/zookeeper на dataDir=c:/kafka/zookeeper-data, не забыв после имени диска указать путь к вашей папке с Kafka. Если вы все сделали правильно, вы можете запустить ZooKeeper и Kafka.

Apache Kafka для чайников

Для некоторых может стать неприятным сюрпризом отсутствие графического интерфейса для управления Kafka. Возможно, это потому, что сервис рассчитан на крутых зануд, работающих исключительно с консолью.

Так или иначе, для запуска Kafka нам понадобится командная строка.

Сначала вам нужно запустить ZooKeeper. В папке с Кафкой находим папку bin/windows, в ней находим файл для запуска службы Zookeeper-server-start.bat, нажимаем на него.

Ничего не произошло? Так и должно быть.

Откройте консоль в этой папке и напишите:

start zookeeper-server-start.bat

Опять не работаете? Это норма.

Это связано с тем, что для Zookeeper-server-start.bat требуются параметры, указанные в файле Zookeeper.properties, который, как мы помним, находится в папке config. Пишем в консоль:

start zookeeper-server-start.bat c:\kafka\config\zookeeper.properties

Теперь все должно запуститься нормально.



Apache Kafka для чайников

Еще раз откройте консоль в этой папке (не закрывайте ZooKeeper!) и запустите kafka:

start kafka-server-start.bat c:\kafka\config\server.properties

Чтобы не писать каждый раз команды в командной строке, можно воспользоваться старым проверенным способом и создать батник следующего содержания:

start C:\kafka\bin\windows\zookeeper-server-start.bat C:\kafka\config\zookeeper.properties timeout 10 start C:\kafka\bin\windows\kafka-server-start.bat C:\kafka\config\server.properties

Таймаут строки 10 нужен для установки паузы между запуском Zookeeper и Kafka. Если вы все сделали правильно, то при нажатии на батник должны открыться две консоли с запущенными Zookeeper и Kafka. Теперь мы можем создать производителя и потребителя сообщения с необходимыми параметрами прямо из командной строки.

Но на практике это может понадобиться только для тестирования сервиса.

Нас гораздо больше будет интересовать, как работать с кафкой из IDEA.

Работа с Кафкой из IDEA

Мы напишем максимально простое приложение, которое будет одновременно и производителем, и потребителем сообщений, а затем добавим в него полезные функции.

Давайте создадим новый весенний проект. Самый удобный способ сделать это — использовать инициализатор Spring. Добавьте зависимости org.springframework.kafka и Spring-boot-starter-web.

Apache Kafka для чайников



Apache Kafka для чайников

В результате файл pom.xml должен выглядеть так:

Apache Kafka для чайников

Для отправки сообщений нам нужен KafkaTemplate. объект. Как мы видим, объект типизирован.

Первый параметр — тип ключа, второй — само сообщение.

Сейчас мы укажем оба параметра как String. Мы создадим объект в классе restcontroller. Давайте объявим KafkaTemplate и попросим Spring инициализировать его, добавив аннотацию.

Автопроводка .



@Autowired private KafkaTemplate<String, String> kafkaTemplate;

В принципе, наш производитель готов.

Все, что осталось сделать, — это вызвать его метод send().

Существует несколько перегруженных версий этого метода.

В нашем проекте мы используем вариант с 3 параметрами — send(тема String, ключ K, данные V).

Поскольку KafkaTemplate имеет тип String, ключ и данные в методе отправки будут строкой.

Первый параметр указывает тему, то есть тему, в которую будут отправляться сообщения и на которую потребители могут подписаться, чтобы получать их.

Если тема, указанная в методе отправки, не существует, она будет создана автоматически.

Полный текст класса выглядит так.



@RestController @RequestMapping("msg") public class MsgController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @PostMapping public void sendOrder(String msgId, String msg){ kafkaTemplate.send("msg", msgId, msg); } }

Контроллер отображает локальный хост :8080/msg, тело запроса содержит ключ и само сообщение.

Отправитель сообщения готов, теперь давайте создадим слушателя.

Spring также позволяет сделать это без особых усилий.

Достаточно создать метод и пометить его аннотацией @KafkaListener, в параметрах которого можно указать только тему, которая будет прослушиваться.

В нашем случае это выглядит так.



@KafkaListener(topics="msg")

Сам метод, отмеченный аннотацией, может указать один параметр для получения, который имеет тип сообщения, отправленного производителем.

Класс, в котором будет создан потребитель, должен быть помечен аннотацией @EnableKafka.

@EnableKafka @SpringBootApplication public class SimpleKafkaExampleApplication { @KafkaListener(topics="msg") public void msgListener(String msg){ System.out.println(msg); } public static void main(String[] args) { SpringApplication.run(SimpleKafkaExampleApplication.class, args); } }

Также в файле настроек application.property необходимо указать потребительский параметр groupe-id. Если этого не сделать, приложение не запустится.

Параметр имеет тип String и может быть любым.



spring.kafka.consumer.group-id=app.1

Наш простейший проект Kafka готов.

У нас есть отправитель и получатель сообщений.

Остается только запустить его.

Сначала мы запускаем ZooKeeper и Kafka с помощью написанного ранее командного файла, затем запускаем наше приложение.

Самый удобный способ отправить запрос — с помощью Postman. В теле запроса не забудьте указать параметры msgId и msg.

Apache Kafka для чайников

Если мы видим такую картинку в IDEA, значит всё работает: производитель отправил сообщение, потребитель его получил и отобразил в консоли.



Apache Kafka для чайников



Усложнение проекта

Реальные проекты с использованием Kafka, конечно, сложнее, чем тот, который мы создали.

Теперь, когда мы разобрались с основными функциями сервиса, давайте рассмотрим, какие дополнительные возможности он предоставляет. Для начала давайте улучшим продюсера.

Если вы открыли метод send(), возможно, вы заметили, что все его варианты имеют ListenableFuture. > возвращаемое значение.

Сейчас мы не будем подробно рассматривать возможности этого интерфейса.

Здесь достаточно будет сказать, что необходимо просмотреть результат отправки сообщения.



@PostMapping public void sendMsg(String msgId, String msg){ ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("msg", msgId, msg); future.addCallback(System.out::println, System.err::println); kafkaTemplate.flush(); }

Метод addCallback() принимает два параметра — SuccessCallback и FailbackCallback. Оба они являются функциональными интерфейсами.

Из названия можно понять, что первый метод будет вызван в результате успешной отправки сообщения, второй — в результате ошибки.

Теперь, если мы запустим проект, то увидим в консоли что-то вроде следующего:

SendResult [producerRecord=ProducerRecord(topic=msg, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=Hello, world!, timestamp=null), recordMetadata=msg-0@6]

Давайте еще раз присмотримся к нашему производителю.

Интересно, что будет, если ключ будет не String, а, скажем, Long, а передаваемое сообщение, что еще хуже, будет каким-то сложным DTO? Давайте сначала попробуем изменить ключ на числовое значение.



Apache Kafka для чайников

Если мы укажем Long в качестве ключа в продюсере, то приложение запустится нормально, но при попытке отправить сообщение будет выброшено ClassCastException и будет сообщено, что класс Long невозможно привести к классу String.

Apache Kafka для чайников

Если мы попытаемся вручную создать объект KafkaTemplate, мы увидим, что ProducerFactory объект интерфейса передается конструктору в качестве параметра, например DefaultKafkaProducerFactory<> .

Чтобы создать DefaultKafkaProducerFactory, нам нужно передать карту, содержащую настройки производителя, в ее конструктор.

Весь код настройки и создания производителя мы вынесем в отдельный класс.

Для этого мы создадим конфигурационный пакет и в нем класс KafkaProducerConfig.

@Configuration public class KafkaProducerConfig { private String kafkaServer="localhost:9092"; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<Long, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<Long, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }

В методе ProduceConfigs() мы создаем карту с конфигурациями и указываем LongSerializer.class в качестве сериализатора ключа.

Запускаем его, отправляем запрос от Postman и видим, что теперь все работает как надо: производитель отправляет сообщение, а потребитель его получает. Теперь давайте изменим тип передаваемого значения.

Что делать, если у нас не стандартный класс из библиотеки Java, а какой-то собственный DTO. Скажем так.



@Data public class UserDto { private Long age; private String name; private Address address; } @Data @AllArgsConstructor public class Address { private String country; private String city; private String street; private Long homeNumber; private Long flatNumber; }

Чтобы отправить DTO в виде сообщения, вам необходимо внести некоторые изменения в конфигурацию производителя.

Давайте укажем JsonSerializer.class в качестве сериализатора значения сообщения и не забудем везде изменить тип String на UserDto.

@Configuration public class KafkaProducerConfig { private String kafkaServer="localhost:9092"; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory<Long, UserDto> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<Long, UserDto> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }

Давайте отправим сообщение.

На консоль будет выведена следующая строка:

Apache Kafka для чайников

Теперь перейдем к усложнению потребителя.

Ранее наш общедоступный метод void msgListener(String msg), отмеченный аннотацией @KafkaListener(topics="msg"), принимал строку в качестве параметра и выводил ее на консоль.

Что делать, если мы хотим получить другие параметры передаваемого сообщения, например, ключ или раздел? В этом случае необходимо изменить тип передаваемого значения.



@KafkaListener(topics="msg") public void orderListener(ConsumerRecord<Long, UserDto> record){ System.out.println(record.partition()); System.out.println(record.key()); System.out.println(record.value()); }

Из объекта ConsumerRecord мы можем получить все интересующие нас параметры.



Apache Kafka для чайников

Видим, что вместо ключа на консоли отображается какая-то тарабарщина.

Это связано с тем, что для десериализации ключа используется StringDeserializer по умолчанию, и если мы хотим, чтобы ключ в целочисленном формате отображался правильно, мы должны изменить его на LongDeserializer. Чтобы настроить потребителя, мы создадим класс KafkaConsumerConfig в пакете конфигурации.



@Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String kafkaServer; @Value("${spring.kafka.consumer.group-id}") private String kafkaGroupId; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); return props; } @Bean public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Long, UserDto> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<Long, UserDto> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } }

Класс KafkaConsumerConfig очень похож на созданный нами ранее KafkaProducerConfig. Также есть Map, содержащий необходимые конфигурации, например такие как десериализатор ключа и значения.

Созданная карта используется для создания ConsumerFactory.<> , что, в свою очередь, необходимо для создания KafkaListenerContainerFactory должен вызываться kafkaListenerContainerFactory(), иначе Spring не сможет найти необходимый bean-компонент и проект не скомпилируется.

Запустим.



Apache Kafka для чайников

Видим, что теперь ключ отображается как надо, а это значит, что все работает. Конечно, возможности Apache Kafka выходят далеко за рамки описанных в этой статье, однако я надеюсь, что, прочитав ее, вы получите представление об этом сервисе и, самое главное, сможете начать с ним работать.

Чаще мойте руки, носите маски, не выходите на улицу без необходимости и будьте здоровы.

Теги: #windows 10 #kafka #java #Apache #spring boot

Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.