Эта статья будет полезна тем, кто только начал знакомиться с микросервисной архитектурой и сервисом Apache Kafka. Материал не претендует на подробное руководство, но поможет быстро освоить данную технологию.
Я расскажу о том, как установить и настроить Kafka в Windows 10. Также мы создадим проект с использованием Intellij IDEA и Spring Boot.
За что?
Трудности в понимании тех или иных инструментов часто связаны с тем, что разработчик никогда не сталкивался с ситуациями, в которых эти инструменты могут понадобиться.С Кафкой все точно так же.
Опишем ситуацию, в которой эта технология будет полезна.
Если у вас монолитная архитектура приложения, то никакой Kafka вам, конечно, не нужен.
Все меняется с переходом на микросервисы.
По сути, каждый микросервис — это отдельная программа, выполняющая определенную функцию и которую можно запускать независимо от других микросервисов.
Микросервисы можно сравнить с сотрудниками офиса, которые сидят за отдельными столами и решают свои проблемы независимо от коллег.
Работа такой распределенной команды немыслима без централизованной координации.
Сотрудники должны иметь возможность обмениваться сообщениями и результатами своей работы друг с другом.
Это именно та проблема, которую стремится решить Apache Kafka для микросервисов.
Apache Kafka — брокер сообщений.
С его помощью микросервисы могут взаимодействовать друг с другом, отправляя и получая важную информацию.
Возникает вопрос, а почему бы не использовать для этих целей обычный POST — запрос, в теле которого можно передать необходимые данные и таким же образом получить ответ? Этот подход имеет ряд очевидных недостатков.
Например, производитель (служба, отправляющая сообщение) может отправлять данные только в качестве ответа на запрос потребителя (служба, получающая данные).
Допустим, потребитель отправляет запрос POST, а производитель отвечает на него.
В это время потребитель по каким-то причинам не может принять полученный ответ. Что произойдет с данными? Они будут потеряны.
Потребителю снова придется отправить запрос и надеяться, что данные, которые он хотел получить, за это время не изменились, а производитель все еще готов принять запрос.
Apache Kafka решает эту и многие другие проблемы, возникающие при обмене сообщениями между микросервисами.
Не лишним будет напомнить, что бесперебойный и удобный обмен данными — одна из ключевых проблем, которую необходимо решить для обеспечения стабильной работы микросервисной архитектуры.
Установка и настройка ZooKeeper и Apache Kafka в Windows 10
Первое, что вам нужно знать, чтобы начать, это то, что Apache Kafka работает поверх сервиса ZooKeeper. ZooKeeper — это служба распределенной настройки и синхронизации, и это все, что нам нужно знать о ней в данном контексте.Мы должны загрузить, настроить и запустить его, прежде чем мы сможем начать работу с Kafka. Прежде чем начать работу с ZooKeeper, убедитесь, что у вас установлена и настроена JRE. Вы можете скачать последнюю версию ZooKeeper с официального сайта.
.
Извлекаем файлы из скачанного архива ZooKeeper в какую-нибудь папку на диске.
В папке Zookeeper с номером версии находим папку conf и в ней файл «zoo_sample.cfg».
Скопируйте его и измените имя копии на «zoo.cfg».
Откройте файл копии и найдите строку dataDir=/tmp/zookeeper. Пишем в этой строке полный путь к нашей папке Zookeeper-x.x.x. У меня это выглядит так: dataDir=C:\\ZooKeeper\\zookeeper-3.6.0
Теперь давайте добавим системную переменную среды: ZOOKEEPER_HOME = C:\ ZooKeeper \zookeeper-3.4.9 и в конце системной переменной Path добавим запись: ;%ZOOKEEPER_HOME%\bin;
Запускаем командную строку и пишем команду:
Если все сделано правильно, вы увидите что-то вроде этого.zkserver
Это означает, что ZooKeeper запустился нормально.
Перейдем непосредственно к установке и настройке сервера Apache Kafka. Загрузите последнюю версию с официального сайта и извлеките содержимое архива: kafka.apache.org/downloads В папке с Кафкой находим папку config, в ней находим файл server.properties и открываем его.
Найдите строку log.dirs=/tmp/kafka-logs и укажите в ней путь, по которому Kafka будет сохранять логи: log.dirs=c:/kafka/kafka-logs.
В этой же папке редактируем файл Zookeeper.properties. Измените строку dataDir=/tmp/zookeeper на dataDir=c:/kafka/zookeeper-data, не забыв после имени диска указать путь к вашей папке с Kafka. Если вы все сделали правильно, вы можете запустить ZooKeeper и Kafka.
Для некоторых может стать неприятным сюрпризом отсутствие графического интерфейса для управления Kafka. Возможно, это потому, что сервис рассчитан на крутых зануд, работающих исключительно с консолью.
Так или иначе, для запуска Kafka нам понадобится командная строка.
Сначала вам нужно запустить ZooKeeper. В папке с Кафкой находим папку bin/windows, в ней находим файл для запуска службы Zookeeper-server-start.bat, нажимаем на него.
Ничего не произошло? Так и должно быть.
Откройте консоль в этой папке и напишите: start zookeeper-server-start.bat
Опять не работаете? Это норма.
Это связано с тем, что для Zookeeper-server-start.bat требуются параметры, указанные в файле Zookeeper.properties, который, как мы помним, находится в папке config. Пишем в консоль: start zookeeper-server-start.bat c:\kafka\config\zookeeper.properties
Теперь все должно запуститься нормально.
Еще раз откройте консоль в этой папке (не закрывайте ZooKeeper!) и запустите kafka: start kafka-server-start.bat c:\kafka\config\server.properties
Чтобы не писать каждый раз команды в командной строке, можно воспользоваться старым проверенным способом и создать батник следующего содержания: start C:\kafka\bin\windows\zookeeper-server-start.bat C:\kafka\config\zookeeper.properties
timeout 10
start C:\kafka\bin\windows\kafka-server-start.bat C:\kafka\config\server.properties
Таймаут строки 10 нужен для установки паузы между запуском Zookeeper и Kafka. Если вы все сделали правильно, то при нажатии на батник должны открыться две консоли с запущенными Zookeeper и Kafka. Теперь мы можем создать производителя и потребителя сообщения с необходимыми параметрами прямо из командной строки.
Но на практике это может понадобиться только для тестирования сервиса.
Нас гораздо больше будет интересовать, как работать с кафкой из IDEA.
Работа с Кафкой из IDEA
Мы напишем максимально простое приложение, которое будет одновременно и производителем, и потребителем сообщений, а затем добавим в него полезные функции.
Давайте создадим новый весенний проект. Самый удобный способ сделать это — использовать инициализатор Spring. Добавьте зависимости org.springframework.kafka и Spring-boot-starter-web.
В результате файл pom.xml должен выглядеть так:
Для отправки сообщений нам нужен KafkaTemplate. объект. Как мы видим, объект типизирован.
Первый параметр — тип ключа, второй — само сообщение.
Сейчас мы укажем оба параметра как String. Мы создадим объект в классе restcontroller. Давайте объявим KafkaTemplate и попросим Spring инициализировать его, добавив аннотацию.
Автопроводка .
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
В принципе, наш производитель готов.
Все, что осталось сделать, — это вызвать его метод send().
Существует несколько перегруженных версий этого метода.
В нашем проекте мы используем вариант с 3 параметрами — send(тема String, ключ K, данные V).
Поскольку KafkaTemplate имеет тип String, ключ и данные в методе отправки будут строкой.
Первый параметр указывает тему, то есть тему, в которую будут отправляться сообщения и на которую потребители могут подписаться, чтобы получать их.
Если тема, указанная в методе отправки, не существует, она будет создана автоматически.
Полный текст класса выглядит так.
@RestController
@RequestMapping("msg")
public class MsgController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping
public void sendOrder(String msgId, String msg){
kafkaTemplate.send("msg", msgId, msg);
}
}
Контроллер отображает локальный хост :8080/msg, тело запроса содержит ключ и само сообщение.
Отправитель сообщения готов, теперь давайте создадим слушателя.
Spring также позволяет сделать это без особых усилий.
Достаточно создать метод и пометить его аннотацией @KafkaListener, в параметрах которого можно указать только тему, которая будет прослушиваться.
В нашем случае это выглядит так.
@KafkaListener(topics="msg")
Сам метод, отмеченный аннотацией, может указать один параметр для получения, который имеет тип сообщения, отправленного производителем.
Класс, в котором будет создан потребитель, должен быть помечен аннотацией @EnableKafka. @EnableKafka
@SpringBootApplication
public class SimpleKafkaExampleApplication {
@KafkaListener(topics="msg")
public void msgListener(String msg){
System.out.println(msg);
}
public static void main(String[] args) {
SpringApplication.run(SimpleKafkaExampleApplication.class, args);
}
}
Также в файле настроек application.property необходимо указать потребительский параметр groupe-id. Если этого не сделать, приложение не запустится.
Параметр имеет тип String и может быть любым.
spring.kafka.consumer.group-id=app.1
Наш простейший проект Kafka готов.
У нас есть отправитель и получатель сообщений.
Остается только запустить его.
Сначала мы запускаем ZooKeeper и Kafka с помощью написанного ранее командного файла, затем запускаем наше приложение.
Самый удобный способ отправить запрос — с помощью Postman. В теле запроса не забудьте указать параметры msgId и msg.
Если мы видим такую картинку в IDEA, значит всё работает: производитель отправил сообщение, потребитель его получил и отобразил в консоли.
Усложнение проекта
Реальные проекты с использованием Kafka, конечно, сложнее, чем тот, который мы создали.Теперь, когда мы разобрались с основными функциями сервиса, давайте рассмотрим, какие дополнительные возможности он предоставляет. Для начала давайте улучшим продюсера.
Если вы открыли метод send(), возможно, вы заметили, что все его варианты имеют ListenableFuture. > возвращаемое значение.
Сейчас мы не будем подробно рассматривать возможности этого интерфейса.
Здесь достаточно будет сказать, что необходимо просмотреть результат отправки сообщения.
@PostMapping
public void sendMsg(String msgId, String msg){
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("msg", msgId, msg);
future.addCallback(System.out::println, System.err::println);
kafkaTemplate.flush();
}
Метод addCallback() принимает два параметра — SuccessCallback и FailbackCallback. Оба они являются функциональными интерфейсами.
Из названия можно понять, что первый метод будет вызван в результате успешной отправки сообщения, второй — в результате ошибки.
Теперь, если мы запустим проект, то увидим в консоли что-то вроде следующего: SendResult [producerRecord=ProducerRecord(topic=msg, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=Hello, world!, timestamp=null), recordMetadata=msg-0@6]
Давайте еще раз присмотримся к нашему производителю.
Интересно, что будет, если ключ будет не String, а, скажем, Long, а передаваемое сообщение, что еще хуже, будет каким-то сложным DTO? Давайте сначала попробуем изменить ключ на числовое значение.
Если мы укажем Long в качестве ключа в продюсере, то приложение запустится нормально, но при попытке отправить сообщение будет выброшено ClassCastException и будет сообщено, что класс Long невозможно привести к классу String.
Если мы попытаемся вручную создать объект KafkaTemplate, мы увидим, что ProducerFactory объект интерфейса передается конструктору в качестве параметра, например DefaultKafkaProducerFactory<> .
Чтобы создать DefaultKafkaProducerFactory, нам нужно передать карту, содержащую настройки производителя, в ее конструктор.
Весь код настройки и создания производителя мы вынесем в отдельный класс.
Для этого мы создадим конфигурационный пакет и в нем класс KafkaProducerConfig. @Configuration
public class KafkaProducerConfig {
private String kafkaServer="localhost:9092";
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<Long, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Long, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
В методе ProduceConfigs() мы создаем карту с конфигурациями и указываем LongSerializer.class в качестве сериализатора ключа.
Запускаем его, отправляем запрос от Postman и видим, что теперь все работает как надо: производитель отправляет сообщение, а потребитель его получает. Теперь давайте изменим тип передаваемого значения.
Что делать, если у нас не стандартный класс из библиотеки Java, а какой-то собственный DTO. Скажем так.
@Data
public class UserDto {
private Long age;
private String name;
private Address address;
}
@Data
@AllArgsConstructor
public class Address {
private String country;
private String city;
private String street;
private Long homeNumber;
private Long flatNumber;
}
Чтобы отправить DTO в виде сообщения, вам необходимо внести некоторые изменения в конфигурацию производителя.
Давайте укажем JsonSerializer.class в качестве сериализатора значения сообщения и не забудем везде изменить тип String на UserDto. @Configuration
public class KafkaProducerConfig {
private String kafkaServer="localhost:9092";
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<Long, UserDto> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Long, UserDto> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Давайте отправим сообщение.
На консоль будет выведена следующая строка:
Теперь перейдем к усложнению потребителя.
Ранее наш общедоступный метод void msgListener(String msg), отмеченный аннотацией @KafkaListener(topics="msg"), принимал строку в качестве параметра и выводил ее на консоль.
Что делать, если мы хотим получить другие параметры передаваемого сообщения, например, ключ или раздел? В этом случае необходимо изменить тип передаваемого значения.
@KafkaListener(topics="msg")
public void orderListener(ConsumerRecord<Long, UserDto> record){
System.out.println(record.partition());
System.out.println(record.key());
System.out.println(record.value());
}
Из объекта ConsumerRecord мы можем получить все интересующие нас параметры.
Видим, что вместо ключа на консоли отображается какая-то тарабарщина.
Это связано с тем, что для десериализации ключа используется StringDeserializer по умолчанию, и если мы хотим, чтобы ключ в целочисленном формате отображался правильно, мы должны изменить его на LongDeserializer. Чтобы настроить потребителя, мы создадим класс KafkaConsumerConfig в пакете конфигурации.
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String kafkaServer;
@Value("${spring.kafka.consumer.group-id}")
private String kafkaGroupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
return props;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Long, UserDto> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Long, UserDto> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
}
Класс KafkaConsumerConfig очень похож на созданный нами ранее KafkaProducerConfig. Также есть Map, содержащий необходимые конфигурации, например такие как десериализатор ключа и значения.
Созданная карта используется для создания ConsumerFactory.<> , что, в свою очередь, необходимо для создания KafkaListenerContainerFactory должен вызываться kafkaListenerContainerFactory(), иначе Spring не сможет найти необходимый bean-компонент и проект не скомпилируется.
Запустим.
Видим, что теперь ключ отображается как надо, а это значит, что все работает. Конечно, возможности Apache Kafka выходят далеко за рамки описанных в этой статье, однако я надеюсь, что, прочитав ее, вы получите представление об этом сервисе и, самое главное, сможете начать с ним работать.
Чаще мойте руки, носите маски, не выходите на улицу без необходимости и будьте здоровы.
Теги: #windows 10 #kafka #java #Apache #spring boot
-
Идеалы Маркетинга В Социальных Сетях
19 Oct, 24 -
Как Поделиться Видео С Другими?
19 Oct, 24 -
Черная Оптимизация На Примере Mail.ru
19 Oct, 24 -
Тормоза В Snow Leopard
19 Oct, 24 -
С Чего Начинаются Онтологии?
19 Oct, 24 -
Files.inbox.lv
19 Oct, 24 -
Блог «Пишу Правильно»
19 Oct, 24 -
Периодическая Таблица Контент-Маркетинга
19 Oct, 24