Практика Использования Spark Sql, Или Как Не Наступить На Грабли

Если вы работаете с SQL, то это вам очень скоро понадобится.

Apache Spark — это один из инструментов, входящих в экосистему Hadoop, который обрабатывает данные в памяти.

Одним из его расширений является Spark SQL, которое позволяет выполнять SQL-запросы к данным.

Spark SQL полезен для работы с SQL-запросами к большим объемам данных и в высоконагруженных системах.

Ниже вы найдете несколько простых приемов работы со Spark SQL:

  • Как оптимизировать план выполнения запроса путем сбора статистики и использования подсказок.

  • Как эффективно обрабатывать искаженные соединения, оставаясь в рамках SQL.
  • Как организовать широковещательное соединение таблицы, если ее размер слишком велик.

  • Как использовать Spark SQL, чтобы понять, насколько приложение Spark фактически использовало память и ядра кластера с течением времени.

Сейчас на рынке много аналитиков и разработчиков, владеющих SQL. Это язык, понятный всем сторонам процесса разработки.

Аналитик формулирует, какие данные из каких таблиц необходимо извлечь и объединить тем или иным образом.

Разработчик дорабатывает, при необходимости разбивает на этапы и оптимизирует полученные SQL-запросы, затем формирует из них ETL-потоки для регулярной загрузки данных.

По мере роста потребностей бизнеса эра хранилищ данных с миллионами строк сменяется эрой озер данных с миллиардами строк.

Компании развивают свои экосистемы и ставят цели по переносу объектов хранения в облако.

Там процессоры машинного обучения и обработки данных ждут данных.

Разработчики ETL переучиваются на инженеров данных, и самый простой способ для них — использовать знакомый язык SQL, используя возможности Spark, для загрузки миллиардов строк в озеро данных.

Если разработчик имеет на входе сотни SQL-запросов, которые необходимо материализовать в облачные сущности, и время разработки ограничено, то самый простой способ — использовать Spark SQL, настраивать запросы с помощью самого SQL и только в том случае, если скорость загрузки неприемлема.

прибегнуть к кодированию в Spark. Spark SQL помогает многим компаниям из списка Global 100 в обработке данных.

Несмотря на простоту разработки, Spark SQL обеспечивает высокую производительность при правильном использовании.

Spark SQL можно использовать в различных процессах ETL при обработке и загрузке данных.

Альтернативы Spark SQL при работе с данными включают использование Impala SQL или Hive (с обработкой SQL-запросов в Mapreduce).

Если вы выходите за рамки SQL, альтернативой является использование языка DSL в Spark. Нет никаких сомнений в перспективности языка SQL, который существует уже несколько десятилетий и имеет миллионы пользователей.

Spark SQL также имеет значительные перспективы развития.

Например, с новыми версиями Spark становится возможным использовать новые подсказки по оптимизации, а методы обработки данных становятся более эффективными.

Spark развивается от версии к версии в целях повышения эффективности обработки данных и интеграции с DeepLearning. Подходы меняются в сторону улучшения: RDD, DataFrame, DataSet. Таким образом, изучение и применение Spark SQL является актуальным и перспективным.

Для компаний использование Spark SQL в конечном итоге приводит к накоплению знаний о клиентах, их обработке и построению нового бизнеса на основе новых знаний.

Чтобы настроить загрузку данных с помощью Spark, вам нужно будет выйти за рамки Spark SQL и использовать, например, DSL. Уточним, что статья посвящена работе с версией Apache Spark 2.3, данные загружаются из Hadoop в Hadoop, управление ресурсами осуществляется с помощью Yarn, используется операционная система Linux, а также СУБД Hive с сохранением данных в hdfs в формате паркета.

.

Мы не будем приводить полный код приложения, запускающего файл с переданной ему в качестве параметра командой SQL; напишем лишь, что в приложении нужно открыть переданный файл, загрузить его содержимое в переменную sqlStr и вызвать:

  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
   

SparkSession spark = SparkSession .

builder() .

appName("") .

enableHiveSupport() .

getOrCreate(); Dataset<Row> sql = spark.sql(sqlStr);

В результате получается *.

jar-приложение, которому можно передать файл с командой SQL в качестве параметра и выполнить его с помощью spark-submit:

spark-submit --class <…sqlrunner> --name <taskname> --queue <yarnqueue> --executor-cores 1 --executor-memory 1g --driver-cores 1 --driver-memory 1g --num-executors 1 --master yarn --deploy-mode cluster < hdfs://…sqlrunner.jar > sqlFile=<…sql>;

В этом случае в передаваемом файле sqlFile можно указать команды SQL, например:

insert overwrite table target_scheme.target_table select s.* from source_scheme.source_table s;



Сбор и просмотр статистики для Spark, подсказка BROADCAST

Как вы знаете, одна из самых сложных операций в Spark — объединение таблиц или наборов данных.

При этом в Spark существуют различные алгоритмы реализации соединений: SortMergeJoin, BroadcastHashJoin, CartesianProduct и т. д. Чтобы помочь Spark автоматически понять, соединяет ли он большие или маленькие таблицы и какой тип соединения является оптимальным, необходимо собирать статистику по этим соединениям.

столы.

Для этого используйте spark-submit для вызова такой команды, как:

ANALYZE TABLE scheme_name.table_name COMPUTE STATISTICS;

Мы можем передать эту команду сбора статистики описанному выше модулю sqlrunner.jar в качестве параметра вместо команды SQL. Для выполнения этой команды необходимо иметь разрешение на чтение и запись в таблицу, для которой собирается статистика.

Проверить, собрана ли статистика в среде куста, можно с помощью такой команды:

show create table scheme_name.table_name;

Вам нужно посмотреть, появляются ли свойства «spark.sql.statistics.numRows» и «spark.sql.statistics.totalSize» в конце описания в блоке TBLPROPERTIES:

CREATE EXTERNAL TABLE `scheme_name.table_name`( … TBLPROPERTIES ( … 'spark.sql.statistics.numRows'='363852167', 'spark.sql.statistics.totalSize'='82589603650', …

Таким образом, желательно иметь собранную статистику по исходным таблицам, по промежуточным таблицам, используемым в расчетах, а также по витринным таблицам, которые также можно использовать при объединениях в пользовательских запросах.

Теперь скажем пару слов о том, что такое трансляция в Spark. При присоединении небольшой таблицы к большому набору данных часто оказывается целесообразным не распределять маленькую таблицу по разным узлам с помощью ключей подключения в виде порций данных, а разместить ее целиком на всех узлах кластера, используемых в приложении.

В этом случае говорят, что трансляция небольшой таблицы осуществляется на все узлы и тип присоединения небольшой таблицы к основному массиву данных — широковещательное соединение.

Часто широковещательное соединение требуется в Spark SQL в тех случаях, когда в реляционных базах данных требуется соединение вложенного цикла.

Трансляция особенно необходима при неравномерности статистики распределения данных по ключам, участвующим в подключении, например, при привязке небольшого справочника валют к большой таблице транзакций, в которой 99% строк относятся к рублям.

Чтобы указать Spark транслировать небольшую таблицу или набор данных (до ~ 1 ГБ), вы можете указать подсказку /*+ BROADCAST(t)*/ в запросе SQL, где t — псевдоним таблицы или набора данных.



insert overwrite table target_scheme.target_table select /*+ BROADCAST(t) */ big.field1, big.field2, t.field3 from source_scheme.big_table as big left join source_scheme.small_table as t on big.field1 = t.field1;

В частности, если для какой-то таблицы не собирается статистика или мы имеем дело с подзапросом, результат которого формируется «на лету» и Spark заранее не знает, велик ли он, то функция /*+ BROADCAST(t )*/ подсказка особенно уместна, если в таблице или подзапросе недостаточно данных.

Если трансляция подзапроса завершается позже, чем через 5 минут от начала запроса, то следует вызвать spark-submit с параметром ниже (здесь 36000 — время в секундах), поскольку 5 минут по умолчанию может оказаться недостаточно:

--conf spark.sql.broadcastTimeout=36000

Например, такой параметр полезен в запросе ниже, где производится трансляция результата подзапроса /*+ BROADCAST(small) */, который становится готовым не с самого начала запроса, а на более поздних этапах :

select /*+ BROADCAST(small) */ big.field3, small.field1, small.field2 from source_scheme.big_table as big inner join (select t1.field1, t2.field2 from source_scheme.table1 as t1 inner join source_scheme.table2 as t2 on t1.field1 = t2.field1) small on big.field2 = small.field2;

Убедиться, что подсказка учтена оптимизатором и трансляция действительно осуществляется, можно, посмотрев в Yarn ссылку в столбце Tracking UI соответствующего приложения Spark на план выполнения запроса на вкладке SQL:

Практика использования Spark SQL, или Как не наступить на грабли

Обратите внимание, что в Spark SQL есть и другие подсказки, в т.ч.

Начиная с версии 2.4 появились подсказки /*+ COALESCE(n) */, где n — количество разделов, на которые будет разбит результат, и /* + REPARTITION(n) */, где n — количество разделов во время передела.



Обход SKEWED JOIN путем явного указания ключей SKEWED

Рассмотрим две таблицы:
  • Таблица A, имеющая поле AID
  • Таблица B, имеющая поле BID
Требуется объединить эти таблицы с помощью ключей A.AID = B.BID:

select A.*, B.* from A inner join B on A.AID = B.BID;

Рассмотрим случай, когда значения ключей, фигурирующие в одной из таблиц, распределены неравномерно.

Допустим, в таблице B значения ключей BID 4089, 4107, 4468 и 6802 составляют в общей сложности 70% строк, в то время как миллион других значений ключей BID составляют только оставшиеся 30% строк.

В данном случае будем считать, что в таблице А значения ключей AID распределены более или менее равномерно, причем значения 4089, 4107, 4468 и 6802 встречаются по одному разу.

В этом случае такое соединение таблиц называется skewed join, т.е.

«перекошенным соединением», а слишком частые ключи называются перекошенными ключами.

По умолчанию задача объединения двух таблиц, выполняемая с помощью Spark-submit, будет разбита на 200 секций.

Вы можете изменить этот параметр по умолчанию, задав другое значение конфигурации spark.sql.shuffle.partitions, например:

--conf spark.sql.shuffle.partitions=1000

Как Spark будет обрабатывать перекошенные соединения? Spark без подсказки не знает, что данные распределены неравномерно, и распределит задачу объединения таблиц следующим образом: большая часть значений ключей попадет в «легкие» секции, которые быстро обработают небольшой объем данных ; в этом случае обработка перекошенных ключей попадет в «тяжелые» разделы.

На картинках ниже показан пример такого случая: 999 разделов из 1000 работали быстро, выдавая небольшое количество строк, а один раздел, включавший почти все данные, работал несколько часов.



Практика использования Spark SQL, или Как не наступить на грабли



Практика использования Spark SQL, или Как не наступить на грабли

Как сделать распределение подключенных данных по разделам более равномерным? Как вариант, нужно соединить таблицы не по одному ключу A.AID=B.BID, а по паре ключей, и сделать распределение данных по паре ключей более равномерным.

Предположим, что в таблице B также есть числовое поле BID2, данные в котором распределены равномерно (в отличие от BID).

Если такого равномерно распределенного числового поля нет, то его можно сгенерировать функцией hash(), работающей в hive и spark, из конкатенации других полей.

В этом случае функция hash() будет генерировать равномерно распределенные данные.

Приведенный ниже запрос, выполняемый под Spark, решает проблему.

Вместо этого желательно использовать этот запрос «выберите A.*, B.* из внутреннего соединения B на A.AID = B.BID;» В разделе with приведен пример описанных выше данных для таблиц A и B. В ключе BID имеются искаженные данные.

Формируется подзапрос ANTISKEW, в котором перекошенные значения ключей 4089, 4107, 4468 и 6802 умножаются на 10 копий каждый.

Данные из таблицы А объединяются подзапросом ANTISKEW, тем самым формируется 10 копий с разными значениями ANTISKEWKEY от 0 до 9 в массиве A_ для ключей AID, равных 4089, 4107, 4468 и 6802. В то же время для остальных Значения AID в массиве A_ будут по одному экземпляру с ANTISKEWKEY = 0. Далее массив A_ подключается к таблице B не только условием A_.AID = B.BID, но и второй парой ключей .

А именно, для перекошенных значений ключа BID несколько записей из таблицы B будут объединены с одним из значений ANTISKEWKEY от 0 до 9 в зависимости от остатка от деления BID2 на 10. Неперекошенные значения BID будут соединены один к одному.

один с ANTISKEWKEY=0. Это позволит добиться более равномерного распределения перекошенных BID-ключей по партициям — они будут идти не в одну, а в 10 партиций.



with A as (select 4089 AID union all select 4468 AID union all select 6802 AID union all select 5 AID union all select 8 AID union all select 14 AID), B as (select 4089 BID, 1 BID2 union all select 4089 BID, 2 BID2 union all select 4089 BID, 3 BID2 union all select 4107 BID, 4 BID2 union all select 4107 BID, 5 BID2 union all select 4107 BID, 6 BID2 union all select 4468 BID, 7 BID2 union all select 4468 BID, 8 BID2 union all select 4468 BID, 9 BID2 union all select 6802 BID, 10 BID2 union all select 6802 BID, 11 BID2 union all select 6802 BID, 12 BID2 union all select 1 BID, 13 BID2 union all select 2 BID, 14 BID2 union all select 3 BID, 15 BID2 union all select 4 BID, 16 BID2 union all select 5 BID, 17 BID2 union all select 6 BID, 18 BID2 union all select 7 BID, 19 BID2 union all select 8 BID, 20 BID2 union all select 9 BID, 21 BID2 union all select 10 BID, 22 BID2 union all select 11 BID, 23 BID2 union all select 12 BID, 24 BID2 union all select 13 BID, 25 BID2 union all select 14 BID, 26 BID2 union all select 15 BID, 27 BID2) select A_.*, B.* from (select A.*, coalesce(ANTISKEW.ANTISKEWKEY, 0) ANTISKEWKEY from A left join (select Y.ID, Z.ANTISKEWKEY from (select 4089 ID union all select 4107 ID union all select 4468 ID union all select 6802 ID) Y cross join (select 0 ANTISKEWKEY union all select 1 ANTISKEWKEY union all select 2 ANTISKEWKEY union all select 3 ANTISKEWKEY union all select 4 ANTISKEWKEY union all select 5 ANTISKEWKEY union all select 6 ANTISKEWKEY union all select 7 ANTISKEWKEY union all select 8 ANTISKEWKEY union all select 9 ANTISKEWKEY) Z) ANTISKEW on A.AID = ANTISKEW.ID) A_ inner join B on A_.AID = B.BID and A_.ANTISKEWKEY = case when A_.AID in (4089, 4107, 4468, 6802) then B.BID2 else 0 end;

Для еще более равномерного распределения данных нужно в подзапросе ANTISKEW выбрать, на сколько именно разделов вы хотите разбить каждый ключ.

Описываемый метод требует знания статистики — для каких перекошенных ключей какой процент строк следует ожидать.



Разделение помогает сделать BROADCAST

Ниже приведен способ выполнения широковещательного соединения вместо случайного соединения в Spark SQL, если размер меньшей таблицы слишком велик для широковещательной рассылки.

Допустим, вам нужно присоединиться к таблице big_table (100 ГБ).



create table big_table( id decimal(18,0), key1 decimal(18,0) ) stored as parquet;

с таблицей small_table (4Гб)

create table small_table( id decimal(18,0), key2 decimal(18,0) ) stored as parquet;

В этом случае предполагается, что размер таблицы small_table слишком велик для ее широковещательной передачи, что приводит к ошибке.

Цель состоит в том, чтобы заполнить результирующую таблицу:

truncate table result_table; insert into table result_table select big_table.id, big_table.key1, small_table.key2 from big_table left join small_table on big_table.id = small_table.id;

Предполагается, что соединение с использованием ключа id является нормально распределенным.

Сделаем обе объединенные таблицы секционированными:

create table big_table( id decimal(18,0), key1 decimal(18,0) ) partitioned by (part_mod decimal(6,0)) stored as parquet; create table small_table( id decimal(18,0), key2 decimal(18,0) ) partitioned by (part_mod decimal(6,0)) stored as parquet;

При заполнении обеих секционированных таблиц big_table и small_table установите ключ секционирования part_mod равный остатку от деления id на 4: part_mod = идентификатор%4; Давайте создадим псевдотаблицу с ее расположением на основе одного раздела big_table (part_mod = 0):

create table big_table_0( id decimal(18,0), key1 decimal(18,0) ) stored as parquet location '…/big_table/part_mod=0';

Давайте также создадим псевдотаблицу с ее расположением на одном разделе small_table (part_mod = 0):

create table small_table_0( id decimal(18,0), key1 decimal(18,0) ) stored as parquet location '…/small_table/part_mod=0';

Теперь мы можем записать в результирующую таблицу четверть необходимых данных из комбинации вышеописанных псевдотаблиц.

При этом проверяется, что подсказка BROADCAST работает успешно:

truncate table result_table; insert into table result_table select /*+ BROADCAST(small_table_0)*/ big_table_0.id, big_table_0.key1, small_table_0.key2 from big_table_0 left join small_table_0 on big_table.id = small_table.id;

В следующих трёх шагах мы можем аналогичным образом добавить оставшиеся три четверти данных в результирующую таблицу с part_mod = 1, part_mod = 2, part_mod = 3. Общая продолжительность загрузки результирующей таблицы при использовании «метода поэтапного широковещательного разделения» обычно получается на идентичных ресурсах меньше, чем при использовании метода shuffle join. Однако этот метод требует, чтобы таблицы с исходными данными были заранее секционированы таким же образом, т.е.

по остатку от деления id на любое число, по которому мы подключаемся.

Если id не является целым числом, вы можете взять остаток хеша (id), разделенный на любое число.

Особенно легко заранее организовать секционирование промежуточных (промежуточных) таблиц, дизайн которых остается на усмотрение разработчика.



Мониторинг ресурсов

Как известно, при работе с озером данных важно правильно распределять ресурсы для задач загрузки данных.

В противном случае возникают риски сбоя задачи из-за нехватки ресурсов, слишком долгого выполнения задачи или неоправданно высокого выделения ресурсов в ущерб другим задачам.

Как правильно распределять ресурсы – это отдельная тема.

В этой статье мы поговорим о том, как понять:

  1. Сколько ресурсов Yarn выделил задаче или группе задач с течением времени.

  2. Сколько ресурсов Spark фактически использовал для выполнения задачи из числа ресурсов, выделенных ему менеджером Yarn на основе времени.

Сразу отметим, что существуют специализированные методы мониторинга ресурсов, например Graphite/Grafana, но они требуют настройки.

Ниже мы покажем способ получения результатов «на коленке», используя командную строку, Spark SQL и логи Spark. Сначала опишем первый тип мониторинга («мониторинг пряжи»).

Тип команды

while [ ! -f complete.flg ]; do for app in $(yarn application -list -appStates RUNNING | grep <filter_for_app_names> | awk '{print $1}'); do ( command1 () { date +"%Y-%m-%d %H:%M:%S"; }; command2 () { yarn application -status $app; }; x=<loading_date>"|"<loading_id>"|$(command1) $(command2)"; echo $x >> app.txt; ) & done; wait; done

осуществляется следующее.

В период мониторинга при установке флага в виде файла Complete.flg, в цикле по списку приложений Yarn в статусе RUNNING, с шаблоном в названии в текстовый файл app.txt добавляется результат вызова команды Yarn application –status $app. Таким образом, текстовый файл app.txt для каждого момента мониторинга содержит информацию о совокупной сумме мегабайт*секунд (МБ-секунд) и ядер*секунд (vcore-секунд), затраченных при работе каждого приложения.

По окончании работы по мониторингу устанавливается флаг завершения Complete.flg и окончательный файл app.txt переносится в текстовую таблицу Yarn_monitoring в hdfs:

CREATE EXTERNAL TABLE IF NOT EXISTS scheme_name.yarn_monitoring( loading_date string, loading_id string, txt string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'field.delim'='|', 'serialization.format'='|') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '…/yarn_monitoring';

Строки в этой таблице выглядят так:

2020-02-18|2235599|2020-02-18 01:00:44 Application Report : Application-Id : application_1580486634374_3632578 Application-Name : <app_name> Application-Type : SPARK User : <user> Queue : <yarnqueue> Start-Time : 1581976835176 Finish-Time : 0 Progress : 10% State : RUNNING Final-State : UNDEFINED Tracking-URL : <url> RPC Port : 0 AM Host : <host> Aggregate Resource Allocation : 18663 MB-seconds, 9 vcore-seconds Log Aggregation Status : NOT_START Diagnostics :

Затем с помощью того же Spark SQL можно написать запрос к этой текстовой таблице, в котором можно разобрать значения показателей используемых ресурсов (МБ-секунды, vcore-секунды) для каждого момента мониторинга.

Далее в этом запросе вы можете визуализировать эти показатели в виде графиков в формате векторной графики *.

svg:

… <text x="101" y="21" font-size="10" fill="rgb(0,150,255)">13:47:48</text> <line x1="948" x2="949" y1="2041" y2="2041" style="stroke:rgb(255,0,0); stroke-width:1px"/> …

Результат запроса можно сохранить в файл *.

svg и просмотреть в браузере.

Ниже приведен пример изображения с результатами мониторинга ресурсов, используемых группой отслеживаемых приложений Yarn. Красный цвет показывает динамику используемой памяти, синий цвет показывает динамику используемых ядер процессора:

Практика использования Spark SQL, или Как не наступить на грабли

На основе такой визуализации вы можете принимать решения об изменении графика выполнения тех или иных приложений Yarn, чтобы более равномерно использовать ресурсы в течение дня.

Теперь опишем второй тип мониторинга («искровой мониторинг»).

Итак, интересная ситуация, когда Yarn выделяет под задачу Spark, например, 200 ядер и не отдает эти ресурсы другим задачам.

Однако на самом деле в приложении Spark уже не используются 199 ядер из 200, и фактически продолжает работать только одно ядро.

Данный вид мониторинга, в том числе, направлен на выявление подобных ситуаций.

Мы предполагаем, что отслеживаемые приложения Spark вызываются из какого-то механизма управления.

В случае со Сбербанком это, например, Oozie или Informatica BDM. Таким образом, механизм управления может по завершению работы приложения Spark скопировать его лог в hdfs и запустить небольшое Spark SQL-приложение для разбора этого лога (или группы логов) и визуализации динамики реально используемых ядер Spark в формате *.

svg. .

Итак, используя команду ниже, мы:

  1. Сохраняем идентификатор приложения в пряже в переменную $app, например, «application_1576046768499_16691»
  2. Используя команду Command1, мы записываем строку заголовка со статусом завершенного приложения Spark в файл $app.txt.
  3. С помощью команды «yarn logs -applicationId $app» добавляем лог завершенного приложения Spark, полученный от Yarn, в файл $app».

    txt

  4. Перенесите сгенерированный файл $app.txt со статусом и журналом в hdfs.


'app=$(grep "(state: FINISHED)" /tmp/'<spark_log_file>' | grep -o "application_[^ ]*" | tee /tmp/applicationid_'<yarn_task_name>'.

txt); command1 () { yarn application -status $app; }; x="'<loading_date>'|'<loading_id>'|$app|0|$(command1)"; echo $x > "/tmp/"$app".

txt"; yarn logs -applicationId $app | nl -ba -s"|" | sed "s/^/'<loading_date>'|'<loading_id>'|"$app"|/" >> "/tmp/"$app".

txt"; hdfs dfs -copyFromLocal -f "/tmp/"$app".

txt" …/spark_monitoring; rm "/tmp/"$app".

txt"'

В результате в hdfs получаем строки типа:

2019-12-13|1227125|application_1576046768499_16691|0|Application Report : Application-Id : application_1576046768499_16691 Application-Name : <app_name> Application-Type : SPARK User : <user> Queue : <yarnqueue> Start-Time : 1576229137128 Finish-Time : 1576229189406 Progress : 100% State : FINISHED Final-State : SUCCEEDED Tracking-URL : http://.

sbrf.ru:18089/history/application_1576046768499_16691/1 RPC Port : 0 AM Host : … Aggregate Resource Allocation : 93763306 MB-seconds, 9957 vcore-seconds Log Aggregation Status : NOT_START Diagnostics : 2019-12-13|1227125|application_1576046768499_16691| 1| 2019-12-13|1227125|application_1576046768499_16691| 2| 2019-12-13|1227125|application_1576046768499_16691| 3|Container: container_1576046768499_16691_01_000162 on …sbrf.ru_8041 2019-12-13|1227125|application_1576046768499_16691| 4|=========================================================================================== 2019-12-13|1227125|application_1576046768499_16691| 5| LogType:container-localizer-syslog 2019-12-13|1227125|application_1576046768499_16691| 6|Log Upload Time:Fri Dec 13 12:26:32 +0300 2019 2019-12-13|1227125|application_1576046768499_16691| 7|LogLength:0 2019-12-13|1227125|application_1576046768499_16691| 8|Log Contents: 2019-12-13|1227125|application_1576046768499_16691| 9| … 2019-12-13|1227125|application_1576046768499_16691| 53|19/12/13 12:26:02 INFO executor.Executor: Running task 26.0 in stage 0.0 (TID 19) … 2019-12-13|1227125|application_1576046768499_16691| 82|19/12/13 12:26:07 INFO executor.Executor: Finished task 26.0 in stage 0.0 (TID 19).

5735 bytes result sent to driver … 2019-12-13|1227125|application_1576046768499_16691| 84|19/12/13 12:26:07 INFO executor.Executor: Running task 441.0 in stage 0.0 (TID 364) … 2019-12-13|1227125|application_1576046768499_16691| 102|19/12/13 12:26:08 INFO executor.Executor: Finished task 441.0 in stage 0.0 (TID 364).

5543 bytes result sent to driver …

При разборе журнала приложения Spark осуществляется поиск событий начала использования и освобождения ядер кластера тем или иным контейнером, т.е.

ищем время записи в журнале событий по следующим шаблонам:

INFO executor.Executor: Running task INFO executor.Executor: Finished task

Таким образом, в любой момент времени известно, сколько ядер процессора были активны, когда приложение Spark работало, т.е.

запускалось и еще не завершило свою работу.

Для разбора логов, собранных в hdfs, формируется таблица spark_monitoring в текстовом формате:

CREATE EXTERNAL TABLE IF NOT EXISTS scheme_name.spark_monitoring( loading_date string, loading_id string, applicationid string, rn string, txt string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'field.delim'='|', 'serialization.format'='|') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '…/spark_monitoring';

Как написано выше, парсинг логов осуществляется с помощью Spark SQL. Те.

Для этой таблицы был написан запрос spark_monitoring, который выводит картинку в формате *.

svg с динамикой фактического использования ядер кластера приложением Spark. Результат запроса можно сохранить в файл *.

svg и просмотреть в браузере.

Пример изображения показан ниже:

Практика использования Spark SQL, или Как не наступить на грабли

По таким графикам можно понять, эффективно ли приложение Spark использует выделенные ему процессорные ядра кластера.

Вы можете смотреть приложения оптом.

Резкие скачки вверх и вниз, видимые на графиках, — это переходы между этапами приложений Spark. Когда какой-то этап чтения или подключения массивов данных заканчивается и все ядра освобождаются одно за другим, график уменьшается до нуля.

А в начале реализации новых этапов наблюдается резкий рост используемых ядер.

Ситуация, когда расписание подтянуто к максимуму выделенных ядер и скачки между этапами резкие, является признаком оптимального использования ресурсов.

Медленное уменьшение может указывать, например, на перекосы соединений, о которых говорилось выше в этой статье.

Автор: Михаил Гричик, эксперт профессионального сообщества Сбербанка SberProfi DWH/BigData. Профессиональное сообщество СберПрофи СХД/BigData отвечает за развитие компетенций в таких областях, как экосистема Hadoop, Teradata, Oracle DB, GreenPlum, а также BI-инструменты Qlik, SAP BO, Tableau и др.

Теги: #программирование #Администрирование баз данных #мониторинг #аналитик #sql #разработчик #код #ресурсы #apache spark #apache spark #spark sql #table
Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.