Apache Spark, Отложенное Вычисление И Многостраничные Sql-Запросы

Известное об известном: Spark работает с «dataframes», которые представляют собой алгоритмы преобразования.

Алгоритм запускается в самый последний момент, чтобы «дать больше места» оптимизации и за счет оптимизации выполнить ее максимально эффективно.

Под катом мы рассмотрим, как можно разложить многостраничный SQL-запрос на атомы (без потери эффективности) и как это может существенно сократить время выполнения ETL-конвейера.

Ленивая оценка Интересная функциональная особенность Spark — ленивые вычисления: преобразования выполняются только при выполнении действий.

Как это работает (примерно): алгоритмы построения датафреймов, предшествующих действию, «склеиваются», оптимизатор строит наиболее эффективный с его точки зрения конечный алгоритм, который запускается и выдает результат (тот, который запрошен действием) .

Что интересно здесь в контексте нашего изложения: любой сложный запрос можно разложить на «атомы» без потери эффективности.

Давайте посмотрим на пример немного дальше.

Многостраничный SQL Есть много причин, по которым мы пишем «многостраничные» SQL-запросы, одна из основных, вероятно, нежелание создавать промежуточные объекты (нежелание, подкрепленное требованиями эффективности).

Ниже приведен пример относительно сложного запроса (он, конечно, достаточно простой, но для целей дальнейшего изложения нам его будет достаточно).

  
  
  
   

qSel = """ select con.contract_id as con_contract_id, con.begin_date as con_begin_date, con.product_id as con_product_id, cst.contract_status_type_id as cst_status_type_id, sbj.subject_id as sbj_subject_id, sbj.subject_name as sbj_subject_name, pp.birth_date as pp_birth_date from kasko.contract con join kasko.contract_status cst on cst.contract_status_id = con.contract_status_id join kasko.subject sbj on sbj.subject_id = con.owner_subject_id left join kasko.physical_person pp on pp.subject_id = con.owner_subject_id """ dfSel = sp.sql(qSel)

Что мы видим:
  • данные выбираются из нескольких таблиц
  • используются разные типы соединений
  • выделенные столбцы распределяются по select части, join части (и где часть, но ее здесь нет — для простоты убрал)
Этот запрос можно разбить на простые (например, сначала объединить таблицы контракт и контракт_статус, сохранить результат во временную таблицу, затем объединить его с темой, также сохранить результат во временную таблицу и т. д.).

Наверняка, при создании действительно сложных запросов мы так поступаем, но потом — после отладки — собираем все это в многостраничный комок.

Что в этом плохого? Ничего страшного, ведь все так работают и привыкли.

А вот недостатки есть, а точнее, что улучшить - читайте дальше.

Тот же запрос в искре При использовании искры для трансформации, конечно, можно просто взять и выполнить этот запрос (и это будет хорошо, ведь мы его тоже выполним), но можно пойти и другим путем, попробуем.

Давайте разобьем этот «сложный» запрос на «атомы» — элементарные кадры данных.

Их у нас будет столько, сколько таблиц задействовано в запросе (в данном случае 4).

Это «атомы»:

dfCon = sp.sql("""select contract_id as con_contract_id, begin_date as con_begin_date, product_id as con_product_id, owner_subject_id as con_owner_subject_id, contract_status_id as con_contract_status_id from kasko.contract""") dfCStat = sp.sql("""select contract_status_id as cst_status_id, contract_status_type_id as cst_status_type_id from kasko.contract_status""") dfSubj = sp.sql("""select subject_id as sbj_subject_id, subject_type_id as sbj_subject_type_id, subject_name as sbj_subject_name from kasko.subject""") dfPPers = sp.sql("""select subject_id as pp_subject_id, birth_date as pp_birth_date from kasko.physical_person""")

Spark позволяет вам объединять их с помощью выражений, отделенных от самих «атомов», так что давайте сделаем это:

con_stat = f.col("cst_status_id")==f.col("con_contract_status_id") con_subj_own = f.col("con_owner_subject_id")==f.col("sbj_subject_id") con_ppers_own = f.col("con_owner_subject_id")==f.col("pp_subject_id")

Тогда наш «сложный запрос» будет выглядеть так:

dfAtom = dfCon.join(dfCStat,con_stat, "inner")\ .

join(dfSubj,con_subj_own,"inner") \ .

join(dfPPers,con_ppers_own, "left") \ .

drop("con_contract_status_id","sbj_subject_type_id", "pp_subject_id","con_owner_subject_id","cst_status_id")

Что в этом хорошего? На первый взгляд ничего, совсем наоборот: из «сложного» SQL можно понять, что происходит, из нашего «атомарного» запроса понять сложнее, нужно смотреть на «атомы» и выражения.

Давайте сначала убедимся, что эти запросы эквивалентны - в книге Юпитера на связь Я предоставил планы выполнения обоих запросов (любопытные могут найти 10 отличий, но суть — эквивалентность — очевидна).

Это, конечно, не чудо, так и должно быть (про ленивую оценку и оптимизацию см.

выше).

В итоге мы получаем, что «многостраничный» запрос и «атомарный» запрос работают с одинаковой эффективностью (это важно, без этого дальнейшие рассуждения несколько бессмысленны).

Ну а теперь найдем плюсы в «атомарном» способе построения запросов.

Что такое «атом» (элементарный фрейм данных) — это наши знания о подмножестве предметной области (части реляционной таблицы).

Идентифицируя такие «атомы», мы автоматически (и, что немаловажно, алгоритмически и воспроизводимо) выделяем значительную часть огромной вещи, называемой «физической моделью данных».

Какое выражение мы использовали в соединении? Это тоже знания о предметной области – именно так (как указано в выражении) связаны между собой сущности предметной области (таблицы в базе данных).

Повторюсь — это важно — у нас есть эти «знания» (атомы и выражения), материализованные в исполняемом коде (а не в диаграмме или словесном описании), это код, который выполняется каждый раз при выполнении ETL-конвейера (пример взят , кстати, из реальной жизни).

Исполняемый код — как мы с вами знаем из чистого кодера — это один из двух объективно существующих артефактов, претендующих на «титул» документации.

То есть использование «атомов» позволяет сделать шаг вперед в таком важном процессе, как документирование данных.

Что еще хорошего в «атомарности»? Оптимизация конвейера В реальной жизни дата-инженера — кстати, я не представился — конвейер ETL состоит из десятков преобразований, подобных приведенному выше.

Они очень часто повторяют таблицы (я когда-то считал в Excel — некоторые таблицы используются в 40% запросов).

Что происходит с точки зрения эффективности? Какая-то лажа - одна и та же таблица читается из источника несколько раз.

Как это можно улучшить? В Spark есть механизм кэширования фреймов данных — мы можем явно указать, какие фреймы данных и сколько мы хотим хранить в кеше.

Что нам для этого нужно сделать, так это выделить повторяющиеся таблицы и построить запросы таким образом, чтобы минимизировать общий размер кэша (так как все таблицы по определению в него «не поместятся», поэтому и много данные).

Можно ли это сделать при использовании «многостраничных» запросов SSQ? Да, но… немного сложно (у нас там нет датафреймов, только таблицы, их тоже можно кэшировать — над этим работает спарк-сообщество).

Можно ли это сделать с помощью «атомарных» запросов? Да! И это не сложно, нам нужно лишь обобщить «атомы» — добавить к ним столбцы, которые используются во всех запросах нашего конвейера.

Если вдуматься, это тоже «правильно» с точки зрения документации: если столбец используется в каком-то запросе (даже в частиwhere), он является частью интересующих нас данных домена.

А дальше всё просто — кешируем повторяющиеся атомы (датафреймы), строим цепочку преобразований так, чтобы пересечения кешированных датафреймов были минимальными (это нетривиально, но алгоритмизируемо, кстати).

И самый эффективный конвейер мы получаем совершенно «бесплатно».

А кроме него полезным и важным артефактом является «заготовка» для документирования данных по предметной области.

Роботизация и автоматизация Атомы более восприимчивы к автоматической обработке, чем «великий и могучий SQL» — их структура проста и понятна, парсинг за нас делает спарк (за что ему отдельное спасибо), он же строит планы запросов, проанализировав которые мы можем автоматически изменить порядок обработки запросов.

Так что здесь тоже можно что-нибудь «поиграть».

Окончательно Возможно, я слишком оптимистичен — мне кажется, что этот путь (атомизация запроса) более работоспособен, чем попытки описать нулевые данные постфактум.

Кроме того — кстати, какие еще там «добавки» — мы получаем прирост эффективности.

Почему я считаю атомарный подход «работающим»? Это часть регулярного процесса, а это значит, что описанные артефакты имеют реальный шанс оказаться актуальными в долгосрочной перспективе.

Я, наверное, что-то упустил — можете помочь найти (в комментариях)? Теги: #Большие данные #Hadoop #spark #sql #etl

Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.