Привет, Хабр! В условиях разнообразия распределенных систем наличие проверенной информации в целевом хранилище является важным критерием согласованности данных.
В этом плане существует множество подходов и техник, и мы остановимся на примирении, теоретические аспекты которого были затронуты.
здесь, в этой статье.
Предлагаю рассмотреть практическую реализацию данной системы, масштабируемой и адаптированной к большому объему данных.
Как реализовать это дело на старом добром Python — читайте под катом! Идти!
(Источник изображения)
Введение
Представим, что финансовая организация имеет несколько распределенных систем и перед нами стоит задача сверки транзакций в этих системах и загрузки сверенных данных в целевое хранилище.В качестве источника данных возьмем большой текстовый файл и таблицу в базе данных PostgreSQL. Предположим, что данные, находящиеся в этих источниках, имеют одинаковые транзакции, но могут иметь различия, и поэтому их необходимо сверить, а сверенные данные записать в конечное хранилище для анализа.
Дополнительно необходимо предусмотреть параллельный запуск нескольких сверок на одной базе данных и адаптировать систему к большому объему с использованием многопроцессорности.
Модуль многопроцессорность Отлично подходит для распараллеливания операций в Python и в некотором смысле обхода определенных недостатков GIL. Возможности этой библиотеки мы будем использовать и дальше.
Архитектура разрабатываемой системы
Используемые компоненты:
- Генератор случайных данных — Python-скрипт, формирующий CSV-файл и на его основе заполняющий таблицу в базе данных;
- Источники данных – CSV-файл и таблица в базе данных PostgreSQL;
- Адаптеры — в этом случае мы используем два адаптера, которые будут извлекать данные из их источников (CSV или базы данных) и заносить информацию в промежуточную базу данных;
- База данных — в количестве трех штук: сырые данные, промежуточная база данных, хранящая информацию, захваченную адаптерами, и «чистая» база данных, содержащая сверенные транзакции из обоих источников.
Начальное обучение
Мы будем использовать в качестве инструмента хранения данных База данных PostgreSQL в контейнере Docker и взаимодействовать с нашей базой данных через pgAdmin работает в контейнере :Запускаем pgAdmin:docker run --name pg -d -e "POSTGRES_USER=my_user" -e "POSTGRES_PASSWORD=my_password" postgres
docker run -p 80:80 -e "[email protected]" -e "PGADMIN_DEFAULT_PASSWORD=12345" -d dpage/pgadmin4
После того, как все запустилось, не забудьте в файле конфигурации (conf/db.ini) указать строку подключения к базе данных (для обучающего примера такое возможно!):
[POSTGRESQL]
db_url= postgresql://my_user:[email protected]:5432/my_user
В принципе, использование контейнера не является обязательным, и вы можете использовать собственный сервер базы данных.
Генерация входных данных
Скрипт Python отвечает за создание тестовых данных.генерировать_test_data , который принимает в качестве входных данных желаемое количество записей для создания.
Последовательность операций легко проследить по основной функции класса Генерироватьтестовые данные : @m.timing
def run(self, num_rows):
""" Run the process """
m.info('START!')
self.create_db_schema()
self.create_folder('data')
self.create_csv_file(num_rows)
self.bulk_copy_to_db()
self.random_delete_rows()
self.random_update_rows()
m.info('END!')
Таким образом, функция выполняет следующие шаги:
- Создание схем в базе данных (создаем все основные схемы и таблицы);
- Создание папки для хранения тестового файла;
- Генерация тестового файла с заданным количеством строк;
- Массовая вставка данных в целевую таблицу transaction_db_raw.transaction_log;
- Случайное удаление нескольких строк в этой таблице;
- Случайное обновление нескольких строк в этой таблице.
Важно уметь искать эти несоответствия! @m.timing
@m.wrapper(m.entering, m.exiting)
def random_delete_rows(self):
""" Random deleting some rows from the table """
sql_command = sql.SQL("""
delete from {0}.
{1} where ctid = any(array( select ctid from {0}.
{1} tablesample bernoulli (1) ))""").
format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been deleted [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) @m.timing @m.wrapper(m.entering, m.exiting) def random_update_rows(self): """ Random update some rows from the table """ sql_command = sql.SQL(""" update {0}.
{1} set transaction_amount = round(random()::numeric, 2) where ctid = any(array( select ctid from {0}.
{1} tablesample bernoulli (1) ))""").
format(sql.Identifier(self.schema_raw),
sql.Identifier(self.raw_table_name))
try:
rows = self.database.execute(sql_command)
m.info('Has been updated [%s rows] from table %s' % (rows, self.raw_table_name))
except psycopg2.Error as err:
m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror)
Формирование набора тестовых данных и последующая запись в текстовый файл формата CSV происходит следующим образом:
- Генерируется случайный UID транзакции;
- Создается случайный номер учетной записи UID (по умолчанию мы берем десять уникальных учетных записей, но это значение можно изменить с помощью файла конфигурации, изменив параметр «random_accounts»);
- Дата транзакции – случайная дата от даты, указанной в конфиге (initial_date);
- Тип транзакции (транзакция/комиссия);
- Сумма транзакции;
- Основную работу по формированию данных выполняет метод генерировать_test_data_by_chunk сорт ТестДанныеСоздатель :
@m.timing
def generate_test_data_by_chunk(self, chunk_start, chunk_end):
""" Generating and saving to the file """
num_rows_mp = chunk_end - chunk_start
new_rows = []
for _ in range(num_rows_mp):
transaction_uid = uuid.uuid4()
account_uid = choice(self.list_acc)
transaction_date = (self.get_random_date(self.date_in, 0)
.
__next__()
.
strftime('%Y-%m-%d %H:%M:%S'))
type_deal = choice(self.list_type_deal)
transaction_amount = randint(-1000, 1000)
new_rows.append([transaction_uid,
account_uid,
transaction_date,
type_deal,
transaction_amount])
self.write_in_file(new_rows, chunk_start, chunk_end)
Особенность этой функции в том, что она выполняется в нескольких распараллеленных асинхронных процессах, каждый из которых генерирует свою порцию по 50К записей.Эта «хитрость» позволит достаточно быстро создать файл в несколько миллионов строк.
def run_csv_writing(self):
""" Writing the test data into csv file """
pool = mp.Pool(mp.cpu_count())
jobs = []
for chunk_start, chunk_end in self.divide_into_chunks(0, self.num_rows):
jobs.append(pool.apply_async(self.generate_test_data_by_chunk,
(chunk_start, chunk_end)))
# wait for all jobs to finish
for job in jobs:
job.get()
# clean up
pool.close()
pool.join()
После завершения заполнения текстового файла выполняется обработка команды Bulk_insert и все данные из этого файла попадают в таблицу транзакция_db_raw.transaction_log. Далее в двух источниках будут абсолютно одинаковые данные и сверка не обнаружит ничего интересного, поэтому удаляем и изменяем несколько случайных строк в базе данных.
Запускаем скрипт и генерируем тестовый CSV-файл с транзакциями на 10К строк: .
/generate_test_data.py 10000
На скриншоте видно, что был получен файл с 10К строк, в базу было загружено 10К, но затем из базы было удалено 112 строк и изменено еще 108. Результат: файл и таблица в базе данных отличаются на 220 записей.
«Ну и причем здесь многопроцессорностьЭ» ты спрашиваешь.
И увидеть его работу можно, когда сгенерируешь файл большего размера, не 10К записей, а, например, 1М.
Попробуем? .
/generate_test_data.py 1000000
После загрузки данных, удаления и изменения случайных записей мы видим различия между текстовым файлом и таблицей: 19 939 строк (из них 10 022 были случайно удалены, а 9 917 изменены).
На картинке видно, что генерация записей была асинхронной и непоследовательной.Это действительно быстрее? Миллион строк, не на самой быстрой виртуальной машине, «придумали» за 15,5 секунд — и это достойный вариант. Запустив одну и ту же генерацию последовательно, без использования мультипроцессинга, я получил результат: генерация файлов стала более чем в три раза медленнее (более 52 секунд вместо 15,5):Это означает, что следующий процесс может начаться независимо от порядка выполнения, как только завершится предыдущий.
Нет никакой гарантии, что результат будет в том же порядке, что и входные данные.
Адаптер для CSV
Этот адаптер хеширует строку, оставляя неизменным только первый столбец — идентификатор транзакции — и сохраняет полученные данные в файл.данные/транзакция_хешед.csv .
Завершающим шагом его работы является загрузка этого файла с помощью команды COPY во временную таблицу схемы.
примирение_db. Оптимальное чтение файла осуществляется несколькими параллельными процессами.
Читаем его построчно, кусками по 5 мегабайт каждый.
Цифра «5 мегабайт» была получена эмпирическим методом.
Именно при таком размере одного фрагмента текста мне удалось добиться наименьшего времени чтения больших файлов на моей виртуальной машине.
Вы можете поэкспериментировать в своей среде с этим параметром и посмотреть, как изменится время работы: @m.timing
def process_wrapper(self, chunk_start, chunk_size):
""" Read a particular chunk """
with open(self.file_name_raw, newline='\n') as file:
file.seek(chunk_start)
lines = file.read(chunk_size).
splitlines()
for line in lines:
self.process(line)
def chunkify(self, size=1024*1024*5):
""" Return a new chunk """
with open(self.file_name_raw, 'rb') as file:
chunk_end = file.tell()
while True:
chunk_start = chunk_end
file.seek(size, 1)
file.readline()
chunk_end = file.tell()
if chunk_end > self.file_end:
chunk_end = self.file_end
yield chunk_start, chunk_end - chunk_start
break
else:
yield chunk_start, chunk_end - chunk_start
@m.timing
def run_reading(self):
""" The main method for the reading """
# init objects
pool = mp.Pool(mp.cpu_count())
jobs = []
m.info('Run csv reading.')
# create jobs
for chunk_start, chunk_size in self.chunkify():
jobs.append(pool.apply_async(self.process_wrapper,
(chunk_start, chunk_size)))
# wait for all jobs to finish
for job in jobs:
job.get()
# clean up
pool.close()
pool.join()
m.info('CSV file reading has been completed')
Пример чтения ранее созданного файла с 1М записей:
На снимке экрана показано создание временной таблицы с уникальным именем для текущего прогона сверки.
Далее идет асинхронное чтение файла по частям и получение хеша каждой строки.
Вставка данных из адаптера в целевую таблицу завершает работу адаптера.
Использование временной таблицы с уникальным именем для каждого процесса согласования позволяет дополнительно распараллелить процесс согласования в одной базе данных.
Адаптер для PostgreSQL
Адаптер для обработки данных, хранящихся в таблице, работает примерно по той же логике, что и адаптер для файла:- чтение частей таблицы (если она большая, более 100 тыс.
записей) и получение хэша для всех столбцов, кроме идентификатора транзакции;
- затем обработанные данные вставляются в таблицу примирение_db. Storage_$(int(time.time()) .
Исходя из размера таблицы рассчитывается количество необходимых для обработки процессов и внутри каждого процесса разбивается на 10 задач.
def read_data(self):
"""
Read the data from the postgres and shared those records with each
processor to perform their operation using threads
"""
threads_array = self.get_threads(0,
self.max_id_num_row,
self.pid_max)
for pid in range(1, len(threads_array) + 1):
m.info('Process %s' % pid)
# Getting connection from the connection pool
select_conn = self._select_conn_pool.getconn()
select_conn.autocommit = 1
# Creating 10 process to perform the operation
process = Process(target=self.process_data,
args=(self.data_queque,
pid,
threads_array[pid-1][0],
threads_array[pid-1][1],
select_conn))
process.daemon = True
process.start()
process.join()
select_conn.close()
Нахождение несоответствий
Перейдем к сравнению данных, полученных от двух адаптеров.
Согласование (или сообщение о расхождениях) происходит на стороне сервера базы данных с использованием всей мощности языка SQL.
SQL-запрос довольно простой — это просто объединение таблицы с данными адаптеров к себе по идентификатору транзакции: sql_command = sql.SQL("""
select
s1.adapter_name,
count(s1.transaction_uid) as tran_count
from {0}.
{1} s1 full join {0}.
{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name and s2.hash = s1.hash where s2.transaction_uid is null group by s1.adapter_name;""").
format(sql.Identifier(self.schema_target),
sql.Identifier(self.storage_table))
Результатом является отчет:
Давайте проверим, все ли правильно на картинке выше.
Мы помним, что из таблицы в базе данных было удалено 9917 строк и изменено 10022. Всего 19939 строк, как видно из отчета.
Финальный стол
Остается только вставить в таблицу хранения «чистые» транзакции, соответствующие всем параметрам (хешу) в разных адаптерах.
Мы выполним этот процесс, используя следующий SQL-запрос: sql_command = sql.SQL("""
with reconcil_data as (
select
s1.transaction_uid
from {0}.
{1} s1 join {0}.
{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name where s2.hash = s1.hash and s1.adapter_name = 'postresql_adapter' ) insert into {2}.
transaction_log select t.transaction_uid, t.account_uid, t.transaction_date, t.type_deal, t.transaction_amount from {3}.
transaction_log t join reconcil_data r on t.transaction_uid = r.transaction_uid where not exists ( select 1 from {2}.
transaction_log tl where tl.transaction_uid = t.transaction_uid ) """).
format(sql.Identifier(self.schema_target),
sql.Identifier(self.storage_table),
sql.Identifier(self.schema_db_clean),
sql.Identifier(self.schema_raw))
Временную таблицу, которую мы использовали как промежуточное хранилище данных адаптеров, можно удалить.
Заключение
В ходе проделанной работы была разработана система сверки данных из разных источников: текстового файла и таблицы в базе данных.Мы использовали минимум дополнительных инструментов.
Возможно, опытный читатель заметит, что использование таких фреймворков, как Apache Spark, вкупе с приведением исходных данных в паркетный формат позволяет значительно ускорить этот процесс, особенно для огромных объемов.
Но основная цель данной работы — написать систему на голом Python и изучить многопроцессорную обработку данных.
Что, на мой взгляд, нам удалось.
Исходный код всего проекта: в моем репозитории GitHub , предлагаю вам с ним ознакомиться.
Буду рад ответить на все вопросы и прочитать ваши комментарии.
Желаю тебе успеха! Теги: #python #Алгоритмы #программирование #postgresql #Big Data #bigdata #sql #multiprocessing #multiprocessing
-
Microsoft Против. Удар По Безопасности Adobe
19 Oct, 24 -
Я Не Могу Поймать Интернет-Сигнал? Помощь!
19 Oct, 24 -
Поповский Николай Никитич.
19 Oct, 24 -
Еще Один Классификатор
19 Oct, 24