Привет, Хабр! Сегодня мы построим систему, которая будет обрабатывать потоки сообщений Apache Kafka с помощью Spark Streaming и записывать результаты обработки в облачную базу данных AWS RDS. Представим, что некая кредитная организация ставит перед нами задачу обрабатывать входящие операции «на лету» по всем своим филиалам.
Это можно сделать с целью оперативного расчета открытой валютной позиции казны, лимитов или финансовых результатов по сделкам и т.п.
Как реализовать это дело без использования магии и магических заклинаний – читайте под катом! Идти!
(Источник изображения)
Введение
Разумеется, обработка большого количества данных в режиме реального времени предоставляет широкие возможности для использования в современных системах.Одной из самых популярных комбинаций для этого является тандем Apache Kafka и Spark Streaming, где Kafka создает поток входящих пакетов сообщений, а Spark Streaming обрабатывает эти пакеты через заданный интервал времени.
Для повышения отказоустойчивости приложения мы будем использовать контрольные точки.
Благодаря этому механизму, когда механизму Spark Streaming необходимо восстановить потерянные данные, ему нужно всего лишь вернуться к последней контрольной точке и возобновить вычисления оттуда.
Архитектура разрабатываемой системы
Используемые компоненты:
- Апач Кафка это распределенная система обмена сообщениями публикации и подписки.
Подходит как для автономного, так и для онлайн-потребления сообщений.
Чтобы предотвратить потерю данных, сообщения Kafka хранятся на диске и реплицируются внутри кластера.
Система Kafka построена на базе службы синхронизации ZooKeeper;
- Потоковая передача Apache Spark — Компонент Spark для обработки потоковых данных.
Модуль Spark Streaming построен с использованием микропакетной архитектуры, в которой поток данных интерпретируется как непрерывная последовательность небольших пакетов данных.
Spark Streaming берет данные из разных источников и объединяет их в небольшие пакеты.
Новые пакеты создаются через регулярные промежутки времени.
В начале каждого временного интервала создается новый пакет, и все данные, полученные в течение этого интервала, включаются в пакет. В конце интервала рост пакетов прекращается.
Размер интервала определяется параметром, называемым пакетным интервалом;
- Apache Spark SQL — сочетает реляционную обработку с функциональным программированием Spark. Структурированные данные — это данные, имеющие схему, то есть единый набор полей для всех записей.
Spark SQL поддерживает ввод из множества источников структурированных данных и благодаря наличию информации о схеме может эффективно извлекать только необходимые поля записей, а также предоставляет API DataFrame;
- АВС РДС — это относительно недорогая облачная реляционная база данных, веб-сервис, который упрощает настройку, эксплуатацию и масштабирование и администрируется непосредственно Amazon.
Установка и запуск сервера Kafka
Прежде чем напрямую использовать Kafka, нужно убедиться, что у вас есть Java, потому что.для работы используется JVM:
Создадим нового пользователя для работы с Kafka:sudo apt-get update sudo apt-get install default-jre java -version
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
Далее скачиваем дистрибутив с официального сайта Apache Kafka:
wget -P /YOUR_PATH " http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz "
Распакуйте скачанный архив:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Следующий шаг не является обязательным.
Дело в том, что настройки по умолчанию не позволяют полностью использовать все возможности Apache Kafka. Например, удалить тему, категорию, группу, в которую можно публиковать сообщения.
Чтобы изменить это, давайте отредактируем файл конфигурации: vim ~/kafka/config/server.properties
Добавьте в конец файла следующее: delete.topic.enable = true
Перед запуском сервера Kafka вам необходимо запустить сервер ZooKeeper; мы будем использовать вспомогательный скрипт, поставляемый с дистрибутивом Kafka: Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
После успешного запуска ZooKeeper запустите сервер Kafka в отдельном терминале: bin/kafka-server-start.sh config/server.properties
Давайте создадим новую тему под названием «Транзакция»: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
Убедимся, что тема с необходимым количеством разделов и репликацией создана: bin/kafka-topics.sh --describe --zookeeper localhost:2181
Давайте упустим моменты тестирования производителя и потребителя для вновь созданной темы.
Подробнее о том, как можно протестировать отправку и получение сообщений, написано в официальной документации — Отправить несколько сообщений .
Что ж, переходим к написанию продюсера на Python с использованием API KafkaProducer.
Написание продюсера
Производитель будет генерировать случайные данные — 100 сообщений каждую секунду.Под случайными данными мы подразумеваем словарь, состоящий из трех полей:
- Ветвь — наименование точки продаж кредитной организации;
- Валюта — валюта транзакции;
- Количество — сумма сделки.
Сумма будет положительным числом, если это покупка валюты Банком, и отрицательным числом, если это продажа.
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
Далее методом send отправляем на сервер сообщение, в нужную нам тему, в формате JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).
encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic: \
{}, partition: {}, offset: {}' \
.
format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.
format(e))
finally:
producer.flush()
При запуске скрипта мы получаем в терминале следующие сообщения:
Это значит, что все работает так, как мы хотели — продюсер генерирует и отправляет сообщения в нужную нам тему.
Следующий шаг — установка Spark и обработка этого потока сообщений.
Установка Apache Spark
Апач Спарк — универсальная и высокопроизводительная платформа кластерных вычислений.Spark работает лучше, чем популярные реализации модели MapReduce, поддерживая при этом более широкий спектр типов вычислений, включая интерактивные запросы и потоковую обработку.
Скорость играет важную роль при обработке больших объемов данных, поскольку именно скорость позволяет работать в интерактивном режиме, не тратя минуты или часы на ожидание.
Одна из самых важных вещей, которая делает Spark таким быстрым, — это его способность выполнять вычисления в памяти.
Этот фреймворк написан на Scala, поэтому сначала его необходимо установить: sudo apt-get install scala
Загрузите дистрибутив Spark с официального сайта: wget " http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz "
Распакуйте архив: sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
Добавьте путь к Spark в файл bash: vim ~/.
bashrc
Добавьте следующие строки через редактор: SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
Запустите команду ниже после внесения изменений в bashrc: source ~/.
bashrc
Развертывание AWS PostgreSQL
Остается только развернуть базу данных, в которую мы будем загружать обработанную информацию из потоков.
Для этого мы будем использовать сервис AWS RDS.
Перейдите в консоль AWS -> AWS RDS -> Базы данных -> Создать базу данных:
Выберите PostgreSQL и нажмите «Далее»:
Потому что этот пример предназначен только для образовательных целей; мы будем использовать бесплатный сервер «как минимум» (Free Tier):
Далее ставим галочку в блоке Free Tier, и после этого нам автоматически будет предложен экземпляр класса t2.micro — хоть и слабый, но бесплатный и вполне подходящий для нашей задачи:
Дальше идут очень важные вещи: имя экземпляра базы данных, имя главного пользователя и его пароль.
Назовем экземпляр: myHabrTest, главный пользователь: хабр , пароль: хабр12345 и нажмите кнопку «Далее»:
На следующей странице находятся параметры, отвечающие за доступность нашего сервера базы данных извне (Public accessibility) и доступность портов:
Давайте создадим новую настройку для группы безопасности VPC, которая позволит внешний доступ к нашему серверу базы данных через порт 5432 (PostgreSQL).
Перейдем в консоль AWS в отдельном окне браузера в раздел VPC Dashboard --> Security Groups --> Create Security Group:
Задаем для группы Security имя — PostgreSQL, описание, указываем, с каким VPC должна быть связана эта группа и нажимаем кнопку «Создать»:
Заполните правила входящего трафика для порта 5432 для вновь созданной группы, как показано на рисунке ниже.
Вы можете не указывать порт вручную, а выбрать PostgreSQL из раскрывающегося списка Тип.
Строго говоря, значение ::/0 означает наличие входящего трафика на сервер со всего мира, что канонически не совсем верно, но для анализа примера позволим себе использовать такой подход:
Возвращаемся на страницу браузера, где у нас открыта «Настройка дополнительных параметров» и в разделе Группы безопасности VPC выбираем --> Выбрать существующие группы безопасности VPC --> раздел PostgreSQL:
Далее в параметрах базы данных --> Имя базы данных --> задайте имя - хабрБД .
Остальные параметры, за исключением отключения резервного копирования (срок хранения резервных копий — 0 дней), мониторинга и Performance Insights, можем оставить по умолчанию.
Нажмите на кнопку Создать базу данных :
Обработчик потока
Завершающим этапом станет разработка задания Spark, которое будет каждые две секунды обрабатывать новые данные, поступающие от Kafka, и заносить результат в базу данных.Как отмечалось выше, контрольные точки — это основной механизм в SparkStreaming, который необходимо настроить для обеспечения отказоустойчивости.
Мы будем использовать контрольные точки и в случае сбоя процедуры модулю Spark Streaming достаточно будет вернуться к последней контрольной точке и возобновить вычисления с нее, чтобы восстановить потерянные данные.
Создание контрольных точек можно включить, установив каталог в отказоустойчивой и надежной файловой системе (например, HDFS, S3 и т. д.), в котором будет храниться информация о контрольных точках.
Это делается с помощью, например: streamingContext.checkpoint(checkpointDirectory)
В нашем примере мы будем использовать следующий подход, а именно, если checkpointDirectory существует, то контекст будет воссоздан из данных контрольной точки.
Если каталог не существует (т.е.
выполняется впервые), то вызывается функцияToCreateContext для создания нового контекста и настройки DStreams: from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Создадим объект DirectStream для подключения к теме «транзакция», используя метод createDirectStream библиотеки KafkaUtils: from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
Парсинг входящих данных в формате JSON: rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
Используя Spark SQL, делаем простую группировку и выводим результат в консоль: select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Получение текста запроса и запуск его через Spark SQL: sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
А затем сохраняем полученные агрегированные данные в таблицу в AWS RDS. Чтобы сохранить результаты агрегации в таблицу базы данных, мы воспользуемся методом записи объекта DataFrame: testResultDataFrame.write \
.
format("jdbc") \ .
mode("append") \ .
option("driver", 'org.postgresql.Driver') \ .
option("url"," jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB ") \ .
option("dbtable", "transaction_flow") \ .
option("user", "habr") \ .
option("password", "habr12345") \ .
save()
Несколько слов о настройке подключения к AWS RDS. Мы создали для него пользователя и пароль на этапе «Развертывание AWS PostgreSQL».Чтобы правильно подключить Spark и Kafka, следует запустить задание через smark-submit с использованием артефакта искра-потоковая-кафка-0-8_2.11 .Вам следует использовать Endpoint в качестве URL-адреса сервера базы данных, который отображается в разделе «Подключение и безопасность»:
Дополнительно мы также будем использовать артефакт для взаимодействия с базой данных PostgreSQL; мы передадим их через --packages. Для гибкости скрипта мы также включим в качестве входных параметров имя сервера сообщений и тему, из которой мы хотим получать данные.
Итак, пришло время запустить и проверить работоспособность системы: spark-submit \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,\
org.postgresql:postgresql:9.4.1207 \
spark_job.py localhost:9092 transaction
Все получилось! Как вы можете видеть на рисунке ниже, во время работы приложения новые результаты агрегации выводятся каждые 2 секунды, поскольку при создании объекта StreamingContext мы установили интервал пакетной обработки в 2 секунды:
Далее делаем простой запрос к базе данных для проверки наличия записей в таблице.
поток_транзакций :
Заключение
В этой статье был рассмотрен пример потоковой обработки информации с использованием Spark Streaming совместно с Apache Kafka и PostgreSQL. С ростом данных из различных источников сложно переоценить практическую ценность Spark Streaming для создания потоковых приложений и приложений реального времени.Полный исходный код вы можете найти в моем репозитории по адресу GitHub .
С удовольствием обсужу эту статью, жду ваших комментариев, а также надеюсь на конструктивную критику со стороны всех неравнодушных читателей.
Желаю тебе успеха! P.S. Изначально планировалось использовать локальную базу данных PostgreSQL, но учитывая мою любовь к AWS, я решил перенести базу данных в облако.
В следующей статье на эту тему я покажу, как реализовать всю описанную выше систему в AWS с помощью AWS Kinesis и AWS EMR. Следите за новостями! Теги: #python #программирование #облачные сервисы #postgresql #Big Data #kafka #spark #amazon #tutorial #aws #Amazon Web Services #rds #kafkastreams #spark Streaming
-
Зодиак
19 Oct, 24 -
Стратегии Построения Списка
19 Oct, 24 -
Как Заработать На Священных Войнах
19 Oct, 24 -
Полезный Совет: Аватар
19 Oct, 24 -
Как Проверить Пароль На Безопасность?
19 Oct, 24 -
Wfc И Полицейское Расследование
19 Oct, 24