Перевод статьи был подготовлен до начала курса.
В этой статье мы поговорим о том, как создать простое приложение Spring Boot с помощью Kafka и Kotlin.
Введение
Начните с посещения https://start.spring.io и добавьте следующие зависимости: классныйВ нашем примере мы будем использовать Gradle для сборки.implementation("org.springframework.boot:spring-boot-starter-data-rest") implementation("org.springframework.boot:spring-boot-starter-web") implementation("com.fasterxml.jackson.module:jackson-module-kotlin") implementation("org.apache.kafka:kafka-streams") implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") implementation("org.springframework.kafka:spring-kafka")
Вы вполне можете выбрать Maven. Создайте и загрузите проект. Затем импортируйте его в IntelliJ IDEA.
Скачать Апач Кафка
Загрузите последнюю версию Apache Kafka с их веб-сайта и извлеките ее в папку.Я использую операционную систему Windows 10. При запуске Kafka вы можете столкнуться с некоторыми проблемами, такими как "обнаружено слишком много строк" .
Это происходит потому, что Kafka добавляет к своему пути большую структуру папок.
Если эта проблема не будет решена автоматически, вам придется переименовать структуру папок на что-то более короткое и запустить приложение из Power Shell. Чтобы запустить Kafka, используйте следующие команды: Оболочка .
\zookeeper-server-start.bat .
\.
\config\zookeeper.properties .
\kafka-server-start.bat .
\.
\config\server.properties
Вы увидите эти две команды в папке /бин/окна .
Чтобы запустить Kafka, сначала нужно запустить Zookeeper. Работник зоопарка — это продукт Apache, предоставляющий службу распределенной настройки.
Беговые весенние ботинки
Первый шаг — создать в вашей IDE класс под названием KafkaDemoApplication.kt .Когда вы создаете проект на веб-сайте Spring, класс будет создан автоматически.
Добавьте следующий код: Котлин import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@SpringBootApplication
class KafkaDemoApplication
fun main(args: Array<String>) {
runApplication<KafkaDemoApplication>(*args)
}
Производитель
Мы можем отправлять сообщения в темы двумя способами.Мы рассмотрим их ниже.
Мы разработаем класс контроллера, который необходим для отправки и получения сообщений.
Назовем этот класс KafkaController.kt .
И добавьте следующий метод: Котлин var kafkaTemplate:KafkaTemplate<String, String>? = null;
val topic:String = "test_topic"
@GetMapping("/send")
fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.
send(topic, message)!!
var sendResult: SendResult<String, String> = lf.get()
return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic")
}
Чтобы отправлять сообщения в тему под названием test_topic , мы используем КафкаШаблон .
Он вернет объект СлушаемыйБудущее , из которого мы можем получить результат этого действия.
Этот подход является самым простым, если вы просто хотите опубликовать сообщение в теме.
Второй способ
Следующий способ отправить сообщение в тему Kafka — использовать объект КафкаПродюсер .
Для этого напишем следующий код: Котлин @GetMapping("/produce")
fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)
val map = mutableMapOf<String, String>()
map["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["bootstrap.servers"] = "localhost:9092"
var producer = KafkaProducer<String, String>(map as Map<String, Any>?)
var future:Future<RecordMetadata> = producer?.
send(producerRecord)!! return ResponseEntity.ok(" message sent to " + future.get().
topic());
}
И здесь необходимо сделать небольшое уточнение.
Нам нужно инициализировать объект КафкаПродукт с картой, которая будет содержать ключ и значение для сериализации.
В нашем примере мы говорим о строковом сообщении, поэтому нам нужно только Строковый сериализатор .
По сути, Serializer — это интерфейс Kafka, который преобразует строки в байты.
Apache Kafka имеет другие сериализаторы, такие как БайтмассивСериализатор , БайтСериализатор , FloatСериализатор и так далее.
Для карты мы указываем ключ и значение с помощью Строковый сериализатор .
Котлин map["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
Следующее значение — информация о загрузочном сервере, необходимом для связи с кластером Kafka. Котлин map["bootstrap.servers"] = "localhost:9092"
Все эти атрибуты необходимы, если мы используем KafkaProducer. Затем нам нужно создать ПродюсерRecord с названием темы и самим сообщением.
Именно это мы и сделаем в следующей строке: Котлин var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)
Теперь мы можем отправить наше сообщение в тему, используя следующий код: Котлин var future:Future<RecordMetadata> = producer?.
send(producerRecord)!!
Эта операция вернет будущее имя темы, которая используется для отправки сообщения.
Потребитель
Мы рассмотрели, как отправлять сообщения в темы.Но нам также необходимо прослушивать входящие сообщения.
Для этого вам нужно создать прослушиватель, который будет потреблять сообщения.
Давайте создадим класс СообщениеConsumer.kt и отметьте его знаком Услуга .
Котлин @KafkaListener(topics= ["test_topic"], groupId = "test_id")
fun consume(message:String) :Unit {
println(" message received from topic : $message");
}
Этот метод можно использовать для прослушивания сообщения с использованием аннотации.
@KafkaListener и выводить сообщение в консоль, как только оно появится в теме.
Просто убедитесь, что вы используете то же название темы, которое использовали для отправки сообщения.
Вы можете увидеть исходный код на мой репозиторий на GitHub .
Узнайте больше о курсе «Бэкенд-разработка на Kotlin»
Теги: #kafka #java #Kotlin #otus #spring boot #spring #kafka apache #kotlin and Spring
-
Подарок Айтишнику
19 Oct, 24