Демистификация Соединения В Apache Spark

Привет, Хабр.

Для будущих студентов курса «Косистема Hadoop, Spark, Hive» подготовил перевод материала.

Также приглашаем всех на вебинар.

«Тестирование приложений Spark» .

В этом открытом уроке мы рассмотрим проблемы тестирования приложений Spark: статистические данные, частичную проверку и запуск/остановку тяжелых систем.

Давайте изучим библиотеки для решения и напишем тесты.



Демистификация соединения в Apache Spark






Эта статья посвящена исключительно операции соединения в Apache Spark и содержит обзор основы, на которой построена технология Spark Join.

Операции соединения часто используются в типичных потоках анализа данных для корреляции двух наборов данных.

Apache Spark, являющийся унифицированной аналитической системой, также обеспечивает прочную основу для запуска широкого спектра сценариев объединения.

На очень высоком уровне Join работает с двумя наборами входных данных, операция выполняется путем сопоставления каждой записи данных, принадлежащей одному из наборов входных данных, с каждой другой записью, принадлежащей другому набору входных данных.

При обнаружении совпадения или несоответствия (на основе указанного условия) операция соединения может либо вывести одну совпадающую запись из любого из двух наборов данных, либо объединенную запись.

Объединенная запись представляет собой комбинацию отдельных совпадающих записей из обоих наборов данных.



Важные аспекты операции соединения:

Теперь давайте разберемся с тремя важными аспектами, влияющими на выполнение операции соединения в Apache Spark. К ним относятся: 1) Размер наборов входных данных: Размер наборов входных данных напрямую влияет на эффективность выполнения и надежность операции соединения.

Кроме того, относительный размер входных наборов данных влияет на выбор механизма соединения, что может еще больше повлиять на эффективность и надежность соединения.

2) Условие соединения: Условие или предложение, на основе которого объединяются наборы входных данных, называется условием соединения.

Условие обычно включает в себя логическое сравнение атрибутов, принадлежащих наборам входных данных.

В зависимости от условия соединения соединения делятся на две большие категории: эквивалентные соединения и неэквивалентные соединения.

«Эквивалентные соединения включают одно или несколько условий равенства, которые должны выполняться одновременно.

Каждое условие равенства применяется к атрибутам двух наборов входных данных.

Например, (A.x == B.x) или ((A.x == B.x) и (A.y == B.y)) — два примера эквивалентных условий соединения для атрибутов x, y двух входных наборов данных A и B, участвующих в операции соединения.

Неэквивалентные соединения не подразумевают условия равенства.

Однако они могут допускать несколько условий равенства, которые не должны выполняться одновременно.

Например, (A.x. < B.x) or ((A.x == B.x) or (A.y == B.y)) are two examples of unequal join conditions for attributes x, y of two input data sets A and B participating in the Join operation. 3) Тип соединения: Тип соединения влияет на результат операции соединения после применения условия соединения между записями во входных наборах данных.

Ниже приведена общая классификация различных типов соединений: Внутреннее соединение: Внутреннее объединение выводит только соответствующие объединенные записи (как определено условием объединения) из входных наборов данных.

Внешнее соединение: Outer Join отображает не только совпадающие записи, но и несовпадающие.

Внешнее соединение далее классифицируется на левое, правое и полное внешнее соединение на основе выбора входных наборов для вывода несовпадающих записей.

Полуприсоединение: Semi Join выводит отдельные записи, принадлежащие только одному из двух входных наборов данных, либо в совпадающем, либо в несовпадающем экземпляре.

Если запись, принадлежащая одному из входных наборов данных, выводится в несовпадающий экземпляр, то полусоединение также называется анти-соединением.

Перекрестное соединение: Cross Join выводит все возможные объединенные записи путем объединения каждой записи из одного входного набора с каждой записью из другого входного набора.

Основываясь на трех вышеупомянутых важных аспектах выполнения соединения, Apache Spark выбирает правильный механизм для выполнения соединения.



Различные механизмы операций соединения

После того, как мы поняли различные аспекты выполнения операции соединения, давайте теперь разберемся в различных механизмах ее выполнения.

Apache Spark предоставляет в общей сложности пять механизмов для выполнения операций соединения.

К ним относятся:

  • Перемешать хеш-соединение
  • Широковещательное хэш-соединение
  • Сортировать Объединить Присоединиться
  • Декартово соединение
  • Широковещательное соединение вложенного цикла
Присоединение к широковещательному хешу: в механизме Broadcast Hash Join один из двух входных наборов данных (участвующих в Join) транслируется всем исполнителям.

Для всех исполнителей в преобразованном наборе данных создается хеш-таблица, а затем каждый раздел нешироковещательного входного набора данных независимо присоединяется к другому набору данных, доступному в виде локальной хеш-таблицы.

«Broadcast Hash Join» не требует этапа перемешивания и является наиболее эффективным.

Единственное требование надежности состоит в том, что у исполнителей должно быть достаточно памяти для размещения широковещательного набора данных.

Поэтому Spark избегает этого механизма, когда оба входных набора данных достаточно велики и превышают настраиваемый порог.

Перемешать хеш-соединение: В механизме «Shuffle Hash Join» первые два набора входных данных выравниваются в соответствии с выбранной схемой разделения вывода (чтобы узнать больше о выбранной схеме разделения вывода, вы можете обратиться к моей недавней книге под названием «Руководство по разбиению Spark» .

В случае, если один или оба набора входных данных не соответствуют выбранной схеме секционирования, перед фактическим выполнением соединения используется операция перемешивания для достижения соответствия.

Как только выбранная схема разделения вывода удовлетворяет обоим наборам входных данных, Shuffle Hash выполняет операцию соединения для каждого выходного раздела, используя стандартный подход Hash Join. То есть для каждого выходного раздела сначала строится хеш-таблица из соответствующего раздела меньшего входного набора данных, а затем соответствующий раздел большего входного набора данных присоединяется к построенной хеш-таблице.

Требование к памяти исполнителя относительно меньше в случае «Shuffle Hash Join» по сравнению с «Broadcast Hash Join».

Это связано с тем, что хеш-таблица строится только на определенном разделе меньшего набора входных данных.

Таким образом, если вы предоставляете большое количество выходных разделов и имеете большое количество исполнителей с подходящей конфигурацией памяти, вы можете добиться более высокой производительности операции соединения, используя «Shuffle Hash Join».

Однако эффективность будет ниже, чем при «широковещательном соединении хэша», если Spark необходимо выполнить дополнительную операцию перемешивания одного или обоих входных наборов данных, чтобы соответствовать выходному разделению.

Сортировка слиянием Начальная часть операции «Сортировка слиянием» аналогична операции «Перемешать хеш-соединение».

Здесь также сначала два набора входных данных выравниваются в соответствии с выбранной схемой разделения выходных данных.

Если один или оба набора входных данных не соответствуют выбранной схеме разделения, то для достижения соответствия перед выполнением операции соединения используется операция перемешивания.

Как только выбранная схема разделения вывода удовлетворяет обоим наборам входных данных, Sort Merge выполняет операцию соединения для каждого выходного раздела, используя стандартный подход Sort Merge Join. «Сортировка объединения слиянием» менее эффективна в вычислительном отношении, чем «Перемешивание хэш-соединения» и «Широковещательное хэш-соединение», однако требования к памяти исполнителя для выполнения «Сортировка объединения слиянием» значительно ниже, чем для «Перемешать хэш» и «Broadcast» хеш».

.

Кроме того, как и в случае с «Shuffle Hash Join», если наборы входных данных не соответствуют желаемому выходному разделению, тогда операция перемешивания один или оба входных набора данных, в зависимости от ситуации, увеличивают нагрузку на выполнение операции «Сортировка слиянием».

Декартово соединение: Декартово соединение используется исключительно для выполнения перекрестного соединения между двумя наборами входных данных.

Количество выходных разделов всегда равно произведению количества разделов во входном наборе данных.

Каждый выходной раздел сопоставляется с уникальной парой разделов, которая состоит из одного раздела одного и другого раздела второго входного набора.

Для каждого из выходных разделов результат вычисляется как декартово произведение данных из двух входных разделов, сопоставленных с выходным разделом.

Недостатком Cartesian Join является увеличение количества выходных разделов.

Но если вам требуется перекрестное соединение, декартовый механизм — единственный подходящий механизм.

Присоединение к вложенному циклу широковещательной рассылки: При «Broadcast Nested Loop Join» один из наборов входных данных транслируется всем исполнителям.

Затем каждый раздел непереведенного набора входных данных присоединяется к переведенному набору с использованием стандартной процедуры вложенного цикла соединения для создания объединенных выходных данных.

Широковещательное соединение вложенного цикла является наименее эффективным с точки зрения вычислений, поскольку оно выполняет вложенный цикл для сравнения двух наборов данных.

Кроме того, он требует большого объема памяти, так как один из наборов входных данных должен транслироваться всем воркерам.



Как Spark выбирает механизм соединения?

Рассмотрев важные аспекты операции соединения и различные механизмы выполнения соединения, давайте теперь посмотрим, как Spark выбирает, какой механизм использовать: Spark выбирает конкретный механизм для выполнения операции соединения на основе следующих факторов:
  • Варианты конфигурации
  • Советы по присоединению
  • Размер наборов входных данных
  • Тип соединения
  • ? Эквивалентные или неэквивалентные соединения (равноправное или неэквивалентное соединение)
Spark обеспечил гибкость в API соединения для указания дополнительных подсказок соединения для завершения механизма соединения.

Подсказки по объединению, такие как «broadcast», «merge», «shuffle_hash» и «shuffle_rescribe_nl», могут предоставляться вместе с наборами данных, участвующими в соединении.

Ниже приведено полное описание того, как Spark выбирает различные механизмы соединения на основе вышеуказанных факторов:

«Присоединение к широковещательному хэшу»

Предварительные условия
  • Применяется только к условию Equi Join.
  • Не применимо к типу соединения «Полное внешнее соединение».

Помимо обязательного условия, должно выполняться одно из следующих условий:
  • Подсказка, предоставляемая для левого набора входных данных, — «Broadcast», а тип соединения — «Right Outer», «Right Semi» или «Inner».

  • Подсказка не предоставлена, но левый входной набор данных транслируется согласно конфигурации.

    '

    spark.sql.autoBroadcastJoinThreshold

    (по умолчанию 10 МБ)' Тип соединения — «Право внешнее», «Право полу» или «Внутреннее».

  • Для правого набора входных данных предоставляется подсказка «Broadcast», а тип соединения — «Left Outer», «Left Semi» или «Inner».

  • Никаких подсказок не предоставляется, но правильный набор входных данных передается в соответствии с конфигурацией.

    '

    spark.sql.autoBroadcastJoinThreshold

    (по умолчанию 10 МБ)' Тип соединения — «Левое внешнее», «Левое полуполу» или «Внутреннее».

  • Подсказка «Трансляция» предоставляется для обоих наборов входов, а тип соединения — «Левый внешний», «Левый полупроводниковый», «Правый внешний», «Правый полупроводниковый» или «Внутренний».

  • Подсказки не предоставлены, но оба набора входных данных передаются в соответствии с конфигурацией.

    '

    spark.sql.autoBroadcastJoinThreshold

    (по умолчанию 10 МБ)' типом соединения является «Левое внешнее», «Левое полушарие», «Правое внешнее», «Правое полушарие» или «Внутреннее».



«Перемешать хэш-соединение»

Предварительные условия
  • Применяется только к условию Equi Join.
  • Не применимо к типу соединения «Полное внешнее соединение».

  • Конфигурация '

    spark.sql.join.prefersortmergeJoin

    (по умолчанию true)' ложно
Помимо обязательного условия, должно выполняться одно из следующих условий:
  • Подсказка «shuffle_hash» предоставляется для левого набора входных данных, а тип соединения — «Правый внешний», «Правый полу» или «Внутренний».

  • Никаких подсказок не предоставляется, но левый входной набор данных значительно меньше правого входного набора данных, а тип соединения — «Правый внешний», «Правый полу» или «Внутренний».

  • Подсказка «shuffle_hash» предоставляется для правого набора входных данных, а тип соединения — «Левый внешний», «Левый полу» или «Внутренний».

  • Подсказки не предоставляются, но правый входной набор данных значительно меньше левого, а тип соединения — «Левый внешний», «Левый полу» или «Внутренний».

  • Подсказка «shuffle_hash» предоставляется для обоих входных наборов данных, а тип соединения — «Левое внешнее», «Левое полушарие», «Правое внешнее», «Правое полушарие» или «Внутреннее».

  • Никаких подсказок не предоставляется, но оба набора данных значительно малы, а тип соединения — «Левое внешнее», «Левое полушарие», «Правое внешнее», «Правое полушарие» или «Внутреннее».



«Сортировать объединение»

Предварительные условия
  • Применяется только к условию Equi Join.
  • Ключи соединения, определенные из условия Equi Join, можно сортировать.

  • Конфигурация 'spark.sql.join.prefersortmergeJoin (по умолчанию true)' правда.

Помимо обязательных условий, должно выполняться одно из следующих условий:
  • Подсказка «слияние» предоставляется для любого набора входных данных, а тип соединения может быть любым.

  • Никаких подсказок не предусмотрено, а тип соединения может быть любым.



«Декартово соединение»

Предварительные условия
  • Тип подключения «Внутренний»
Помимо обязательного условия, должно быть выполнено одно из следующих условий:
  • Подсказка «shuffle_reulate_nl» предоставляется для любого из входных наборов данных, условие соединения может быть равным или неравнозначным.

  • Подсказка не предоставлена.

    Условие соединения может быть равным или неравным.



«Присоединение к вложенному циклу широковещательной рассылки»

«Присоединение вложенного цикла широковещательной рассылки» является механизмом соединения по умолчанию; когда никакие другие механизмы не могут быть выбраны, тогда «Broadcast Nested Loop Join» выбирается в качестве окончательного механизма для выполнения любого типа соединения для любого условия соединения.

Если для выполнения подходит более одного механизма соединения, то предпочтительный из них выбирается в следующем порядке: «Широковещательное хэш-соединение», «Сортировка слиянием», «Перемешать хэш-соединение», «Декартово соединение».

Среди декартовых и широковещательных вложенных циклических соединений широковещательный вложенный цикл предпочтительнее для внутренних, неэквивалентных соединений, чем декартово соединение в случае, когда один из входных наборов данных может транслироваться.

И последнее, но не менее важное: секционирование также играет очень важную роль в эффективности выполнения этого механизма соединения.

Чтобы узнать больше о разбиении на разделы, вы можете обратиться к предыдущей ссылке.

Мы надеемся, что эта статья развеяла все ваши сомнения и путаницу относительно выполнения соединения в Apache Spark. Если у вас еще остались вопросы, напишите в комментариях или отправьте мне сообщение.




Подробнее о курсе «Косистема Hadoop, Spark, Hive» Посмотреть открытый урок «Тестирование приложений Spark»
Теги: #Машинное обучение #программирование #Hadoop #наука о данных #spark #Apache #join
Вместе с данным постом часто просматривают: