Apache Kafka И Потоковая Обработка Данных С Помощью Spark Streaming

Привет, Хабр! Сегодня мы построим систему, которая будет обрабатывать потоки сообщений Apache Kafka с помощью Spark Streaming и записывать результаты обработки в облачную базу данных AWS RDS. Представим, что некая кредитная организация ставит перед нами задачу обрабатывать входящие операции «на лету» по всем своим филиалам.

Это можно сделать с целью оперативного расчета открытой валютной позиции казны, лимитов или финансовых результатов по сделкам и т.п.

Как реализовать это дело без использования магии и магических заклинаний – читайте под катом! Идти!

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

(Источник изображения)



Введение

Разумеется, обработка большого количества данных в режиме реального времени предоставляет широкие возможности для использования в современных системах.

Одной из самых популярных комбинаций для этого является тандем Apache Kafka и Spark Streaming, где Kafka создает поток входящих пакетов сообщений, а Spark Streaming обрабатывает эти пакеты через заданный интервал времени.

Для повышения отказоустойчивости приложения мы будем использовать контрольные точки.

Благодаря этому механизму, когда механизму Spark Streaming необходимо восстановить потерянные данные, ему нужно всего лишь вернуться к последней контрольной точке и возобновить вычисления оттуда.



Архитектура разрабатываемой системы



Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Используемые компоненты:
  • Апач Кафка это распределенная система обмена сообщениями публикации и подписки.

    Подходит как для автономного, так и для онлайн-потребления сообщений.

    Чтобы предотвратить потерю данных, сообщения Kafka хранятся на диске и реплицируются внутри кластера.

    Система Kafka построена на базе службы синхронизации ZooKeeper;

  • Потоковая передача Apache Spark — Компонент Spark для обработки потоковых данных.

    Модуль Spark Streaming построен с использованием микропакетной архитектуры, в которой поток данных интерпретируется как непрерывная последовательность небольших пакетов данных.

    Spark Streaming берет данные из разных источников и объединяет их в небольшие пакеты.

    Новые пакеты создаются через регулярные промежутки времени.

    В начале каждого временного интервала создается новый пакет, и все данные, полученные в течение этого интервала, включаются в пакет. В конце интервала рост пакетов прекращается.

    Размер интервала определяется параметром, называемым пакетным интервалом;

  • Apache Spark SQL — сочетает реляционную обработку с функциональным программированием Spark. Структурированные данные — это данные, имеющие схему, то есть единый набор полей для всех записей.

    Spark SQL поддерживает ввод из множества источников структурированных данных и благодаря наличию информации о схеме может эффективно извлекать только необходимые поля записей, а также предоставляет API DataFrame;

  • АВС РДС — это относительно недорогая облачная реляционная база данных, веб-сервис, который упрощает настройку, эксплуатацию и масштабирование и администрируется непосредственно Amazon.


Установка и запуск сервера Kafka

Прежде чем напрямую использовать Kafka, нужно убедиться, что у вас есть Java, потому что.

для работы используется JVM:

  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
   

sudo apt-get update sudo apt-get install default-jre java -version

Создадим нового пользователя для работы с Kafka:

sudo useradd kafka -m sudo passwd kafka sudo adduser kafka sudo

Далее скачиваем дистрибутив с официального сайта Apache Kafka:

wget -P /YOUR_PATH " http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz "

Распакуйте скачанный архив:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Следующий шаг не является обязательным.

Дело в том, что настройки по умолчанию не позволяют полностью использовать все возможности Apache Kafka. Например, удалить тему, категорию, группу, в которую можно публиковать сообщения.

Чтобы изменить это, давайте отредактируем файл конфигурации:

vim ~/kafka/config/server.properties

Добавьте в конец файла следующее:

delete.topic.enable = true

Перед запуском сервера Kafka вам необходимо запустить сервер ZooKeeper; мы будем использовать вспомогательный скрипт, поставляемый с дистрибутивом Kafka:

Cd ~/kafka bin/zookeeper-server-start.sh config/zookeeper.properties

После успешного запуска ZooKeeper запустите сервер Kafka в отдельном терминале:

bin/kafka-server-start.sh config/server.properties

Давайте создадим новую тему под названием «Транзакция»:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Убедимся, что тема с необходимым количеством разделов и репликацией создана:

bin/kafka-topics.sh --describe --zookeeper localhost:2181



Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Давайте упустим моменты тестирования производителя и потребителя для вновь созданной темы.

Подробнее о том, как можно протестировать отправку и получение сообщений, написано в официальной документации — Отправить несколько сообщений .

Что ж, переходим к написанию продюсера на Python с использованием API KafkaProducer.

Написание продюсера

Производитель будет генерировать случайные данные — 100 сообщений каждую секунду.

Под случайными данными мы подразумеваем словарь, состоящий из трех полей:

  • Ветвь — наименование точки продаж кредитной организации;
  • Валюта — валюта транзакции;
  • Количество — сумма сделки.

    Сумма будет положительным числом, если это покупка валюты Банком, и отрицательным числом, если это продажа.

Код производителя выглядит следующим образом:

from numpy.random import choice, randint def get_random_value(): new_dict = {} branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut'] currency_list = ['RUB', 'USD', 'EUR', 'GBP'] new_dict['branch'] = choice(branch_list) new_dict['currency'] = choice(currency_list) new_dict['amount'] = randint(-100, 100) return new_dict

Далее методом send отправляем на сервер сообщение, в нужную нам тему, в формате JSON:

from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x:dumps(x).

encode('utf-8'), compression_type='gzip') my_topic = 'transaction' data = get_random_value() try: future = producer.send(topic = my_topic, value = data) record_metadata = future.get(timeout=10) print('--> The message has been sent to a topic: \ {}, partition: {}, offset: {}' \ .

format(record_metadata.topic, record_metadata.partition, record_metadata.offset )) except Exception as e: print('--> It seems an Error occurred: {}'.

format(e)) finally: producer.flush()

При запуске скрипта мы получаем в терминале следующие сообщения:

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Это значит, что все работает так, как мы хотели — продюсер генерирует и отправляет сообщения в нужную нам тему.

Следующий шаг — установка Spark и обработка этого потока сообщений.



Установка Apache Spark

Апач Спарк — универсальная и высокопроизводительная платформа кластерных вычислений.

Spark работает лучше, чем популярные реализации модели MapReduce, поддерживая при этом более широкий спектр типов вычислений, включая интерактивные запросы и потоковую обработку.

Скорость играет важную роль при обработке больших объемов данных, поскольку именно скорость позволяет работать в интерактивном режиме, не тратя минуты или часы на ожидание.

Одна из самых важных вещей, которая делает Spark таким быстрым, — это его способность выполнять вычисления в памяти.

Этот фреймворк написан на Scala, поэтому сначала его необходимо установить:

sudo apt-get install scala

Загрузите дистрибутив Spark с официального сайта:

wget " http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz "

Распакуйте архив:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Добавьте путь к Spark в файл bash:

vim ~/.

bashrc

Добавьте следующие строки через редактор:

SPARK_HOME=/usr/local/spark export PATH=$SPARK_HOME/bin:$PATH

Запустите команду ниже после внесения изменений в bashrc:

source ~/.

bashrc



Развертывание AWS PostgreSQL

Остается только развернуть базу данных, в которую мы будем загружать обработанную информацию из потоков.

Для этого мы будем использовать сервис AWS RDS. Перейдите в консоль AWS -> AWS RDS -> Базы данных -> Создать базу данных:

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Выберите PostgreSQL и нажмите «Далее»:

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Потому что этот пример предназначен только для образовательных целей; мы будем использовать бесплатный сервер «как минимум» (Free Tier):

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Далее ставим галочку в блоке Free Tier, и после этого нам автоматически будет предложен экземпляр класса t2.micro — хоть и слабый, но бесплатный и вполне подходящий для нашей задачи:

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Дальше идут очень важные вещи: имя экземпляра базы данных, имя главного пользователя и его пароль.

Назовем экземпляр: myHabrTest, главный пользователь: хабр , пароль: хабр12345 и нажмите кнопку «Далее»:

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

На следующей странице находятся параметры, отвечающие за доступность нашего сервера базы данных извне (Public accessibility) и доступность портов:

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Давайте создадим новую настройку для группы безопасности VPC, которая позволит внешний доступ к нашему серверу базы данных через порт 5432 (PostgreSQL).

Перейдем в консоль AWS в отдельном окне браузера в раздел VPC Dashboard --> Security Groups --> Create Security Group:

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Задаем для группы Security имя — PostgreSQL, описание, указываем, с каким VPC должна быть связана эта группа и нажимаем кнопку «Создать»:

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Заполните правила входящего трафика для порта 5432 для вновь созданной группы, как показано на рисунке ниже.

Вы можете не указывать порт вручную, а выбрать PostgreSQL из раскрывающегося списка Тип.

Строго говоря, значение ::/0 означает наличие входящего трафика на сервер со всего мира, что канонически не совсем верно, но для анализа примера позволим себе использовать такой подход:

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Возвращаемся на страницу браузера, где у нас открыта «Настройка дополнительных параметров» и в разделе Группы безопасности VPC выбираем --> Выбрать существующие группы безопасности VPC --> раздел PostgreSQL:

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Далее в параметрах базы данных --> Имя базы данных --> задайте имя - хабрБД .

Остальные параметры, за исключением отключения резервного копирования (срок хранения резервных копий — 0 дней), мониторинга и Performance Insights, можем оставить по умолчанию.

Нажмите на кнопку Создать базу данных :

Apache Kafka и потоковая обработка данных с помощью Spark Streaming



Обработчик потока

Завершающим этапом станет разработка задания Spark, которое будет каждые две секунды обрабатывать новые данные, поступающие от Kafka, и заносить результат в базу данных.

Как отмечалось выше, контрольные точки — это основной механизм в SparkStreaming, который необходимо настроить для обеспечения отказоустойчивости.

Мы будем использовать контрольные точки и в случае сбоя процедуры модулю Spark Streaming достаточно будет вернуться к последней контрольной точке и возобновить вычисления с нее, чтобы восстановить потерянные данные.

Создание контрольных точек можно включить, установив каталог в отказоустойчивой и надежной файловой системе (например, HDFS, S3 и т. д.), в котором будет храниться информация о контрольных точках.

Это делается с помощью, например:

streamingContext.checkpoint(checkpointDirectory)

В нашем примере мы будем использовать следующий подход, а именно, если checkpointDirectory существует, то контекст будет воссоздан из данных контрольной точки.

Если каталог не существует (т.е.

выполняется впервые), то вызывается функцияToCreateContext для создания нового контекста и настройки DStreams:

from pyspark.streaming import StreamingContext context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Создадим объект DirectStream для подключения к теме «транзакция», используя метод createDirectStream библиотеки KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 2) broker_list = 'localhost:9092' topic = 'transaction' directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": broker_list})

Парсинг входящих данных в формате JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'], currency=w['currency'], amount=w['amount'])) testDataFrame = spark.createDataFrame(rowRdd) testDataFrame.createOrReplaceTempView("treasury_stream")

Используя Spark SQL, делаем простую группировку и выводим результат в консоль:

select from_unixtime(unix_timestamp()) as curr_time, t.branch as branch_name, t.currency as currency_code, sum(amount) as batch_value from treasury_stream t group by t.branch, t.currency

Получение текста запроса и запуск его через Spark SQL:

sql_query = get_sql_query() testResultDataFrame = spark.sql(sql_query) testResultDataFrame.show(n=5)

А затем сохраняем полученные агрегированные данные в таблицу в AWS RDS. Чтобы сохранить результаты агрегации в таблицу базы данных, мы воспользуемся методом записи объекта DataFrame:

testResultDataFrame.write \ .

format("jdbc") \ .

mode("append") \ .

option("driver", 'org.postgresql.Driver') \ .

option("url"," jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB ") \ .

option("dbtable", "transaction_flow") \ .

option("user", "habr") \ .

option("password", "habr12345") \ .

save()

Несколько слов о настройке подключения к AWS RDS. Мы создали для него пользователя и пароль на этапе «Развертывание AWS PostgreSQL».

Вам следует использовать Endpoint в качестве URL-адреса сервера базы данных, который отображается в разделе «Подключение и безопасность»:

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Чтобы правильно подключить Spark и Kafka, следует запустить задание через smark-submit с использованием артефакта искра-потоковая-кафка-0-8_2.11 .

Дополнительно мы также будем использовать артефакт для взаимодействия с базой данных PostgreSQL; мы передадим их через --packages. Для гибкости скрипта мы также включим в качестве входных параметров имя сервера сообщений и тему, из которой мы хотим получать данные.

Итак, пришло время запустить и проверить работоспособность системы:

spark-submit \ --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,\ org.postgresql:postgresql:9.4.1207 \ spark_job.py localhost:9092 transaction

Все получилось! Как вы можете видеть на рисунке ниже, во время работы приложения новые результаты агрегации выводятся каждые 2 секунды, поскольку при создании объекта StreamingContext мы установили интервал пакетной обработки в 2 секунды:

Apache Kafka и потоковая обработка данных с помощью Spark Streaming

Далее делаем простой запрос к базе данных для проверки наличия записей в таблице.

поток_транзакций :

Apache Kafka и потоковая обработка данных с помощью Spark Streaming



Заключение

В этой статье был рассмотрен пример потоковой обработки информации с использованием Spark Streaming совместно с Apache Kafka и PostgreSQL. С ростом данных из различных источников сложно переоценить практическую ценность Spark Streaming для создания потоковых приложений и приложений реального времени.

Полный исходный код вы можете найти в моем репозитории по адресу GitHub .

С удовольствием обсужу эту статью, жду ваших комментариев, а также надеюсь на конструктивную критику со стороны всех неравнодушных читателей.

Желаю тебе успеха! P.S. Изначально планировалось использовать локальную базу данных PostgreSQL, но учитывая мою любовь к AWS, я решил перенести базу данных в облако.

В следующей статье на эту тему я покажу, как реализовать всю описанную выше систему в AWS с помощью AWS Kinesis и AWS EMR. Следите за новостями! Теги: #python #программирование #облачные сервисы #postgresql #Big Data #kafka #spark #amazon #tutorial #aws #Amazon Web Services #rds #kafkastreams #spark Streaming

Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.