Производитель/Потребитель В Kafka И Kotlin

Перевод статьи был подготовлен до начала курса.

«Бэкенд-разработка на Kotlin»






Производитель/Потребитель в Kafka и Kotlin

В этой статье мы поговорим о том, как создать простое приложение Spring Boot с помощью Kafka и Kotlin.

Введение

Начните с посещения https://start.spring.io и добавьте следующие зависимости: классный
  
  
  
  
  
  
  
  
  
   

implementation("org.springframework.boot:spring-boot-starter-data-rest") implementation("org.springframework.boot:spring-boot-starter-web") implementation("com.fasterxml.jackson.module:jackson-module-kotlin") implementation("org.apache.kafka:kafka-streams") implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") implementation("org.springframework.kafka:spring-kafka")

В нашем примере мы будем использовать Gradle для сборки.

Вы вполне можете выбрать Maven. Создайте и загрузите проект. Затем импортируйте его в IntelliJ IDEA.

Скачать Апач Кафка

Загрузите последнюю версию Apache Kafka с их веб-сайта и извлеките ее в папку.

Я использую операционную систему Windows 10. При запуске Kafka вы можете столкнуться с некоторыми проблемами, такими как "обнаружено слишком много строк" .

Это происходит потому, что Kafka добавляет к своему пути большую структуру папок.

Если эта проблема не будет решена автоматически, вам придется переименовать структуру папок на что-то более короткое и запустить приложение из Power Shell. Чтобы запустить Kafka, используйте следующие команды: Оболочка

.

\zookeeper-server-start.bat .

\.

\config\zookeeper.properties .

\kafka-server-start.bat .

\.

\config\server.properties

Вы увидите эти две команды в папке /бин/окна .

Чтобы запустить Kafka, сначала нужно запустить Zookeeper. Работник зоопарка — это продукт Apache, предоставляющий службу распределенной настройки.



Беговые весенние ботинки

Первый шаг — создать в вашей IDE класс под названием KafkaDemoApplication.kt .

Когда вы создаете проект на веб-сайте Spring, класс будет создан автоматически.

Добавьте следующий код: Котлин

import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication @SpringBootApplication class KafkaDemoApplication fun main(args: Array<String>) { runApplication<KafkaDemoApplication>(*args) }



Производитель

Мы можем отправлять сообщения в темы двумя способами.

Мы рассмотрим их ниже.

Мы разработаем класс контроллера, который необходим для отправки и получения сообщений.

Назовем этот класс KafkaController.kt .

И добавьте следующий метод: Котлин

var kafkaTemplate:KafkaTemplate<String, String>? = null; val topic:String = "test_topic" @GetMapping("/send") fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> { var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.

send(topic, message)!! var sendResult: SendResult<String, String> = lf.get() return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic") }

Чтобы отправлять сообщения в тему под названием test_topic , мы используем КафкаШаблон .

Он вернет объект СлушаемыйБудущее , из которого мы можем получить результат этого действия.

Этот подход является самым простым, если вы просто хотите опубликовать сообщение в теме.



Второй способ

Следующий способ отправить сообщение в тему Kafka — использовать объект КафкаПродюсер .

Для этого напишем следующий код: Котлин

@GetMapping("/produce") fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> { var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message) val map = mutableMapOf<String, String>() map["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" map["bootstrap.servers"] = "localhost:9092" var producer = KafkaProducer<String, String>(map as Map<String, Any>?) var future:Future<RecordMetadata> = producer?.

send(producerRecord)!! return ResponseEntity.ok(" message sent to " + future.get().

topic()); }

И здесь необходимо сделать небольшое уточнение.

Нам нужно инициализировать объект КафкаПродукт с картой, которая будет содержать ключ и значение для сериализации.

В нашем примере мы говорим о строковом сообщении, поэтому нам нужно только Строковый сериализатор .

По сути, Serializer — это интерфейс Kafka, который преобразует строки в байты.

Apache Kafka имеет другие сериализаторы, такие как БайтмассивСериализатор , БайтСериализатор , FloatСериализатор и так далее.

Для карты мы указываем ключ и значение с помощью Строковый сериализатор .

Котлин

map["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"

Следующее значение — информация о загрузочном сервере, необходимом для связи с кластером Kafka. Котлин

map["bootstrap.servers"] = "localhost:9092"

Все эти атрибуты необходимы, если мы используем KafkaProducer. Затем нам нужно создать ПродюсерRecord с названием темы и самим сообщением.

Именно это мы и сделаем в следующей строке: Котлин

var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)

Теперь мы можем отправить наше сообщение в тему, используя следующий код: Котлин

var future:Future<RecordMetadata> = producer?.

send(producerRecord)!!

Эта операция вернет будущее имя темы, которая используется для отправки сообщения.



Потребитель

Мы рассмотрели, как отправлять сообщения в темы.

Но нам также необходимо прослушивать входящие сообщения.

Для этого вам нужно создать прослушиватель, который будет потреблять сообщения.

Давайте создадим класс СообщениеConsumer.kt и отметьте его знаком Услуга .

Котлин

@KafkaListener(topics= ["test_topic"], groupId = "test_id") fun consume(message:String) :Unit { println(" message received from topic : $message"); }

Этот метод можно использовать для прослушивания сообщения с использованием аннотации.

@KafkaListener и выводить сообщение в консоль, как только оно появится в теме.

Просто убедитесь, что вы используете то же название темы, которое использовали для отправки сообщения.

Вы можете увидеть исходный код на мой репозиторий на GitHub .




Узнайте больше о курсе «Бэкенд-разработка на Kotlin»


Теги: #kafka #java #Kotlin #otus #spring boot #spring #kafka apache #kotlin and Spring
Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.