Кластер Elasticsearch Объемом 200 Тб+



Кластер Elasticsearch объемом 200 ТБ+

Многие люди испытывают трудности с Elasticsearch. Но что произойдет, если вы захотите использовать его для хранения логов «в особенно большом объеме»? И также безболезненно пережить выход из строя любого из нескольких дата-центров? Какую архитектуру вам следует сделать и на какие подводные камни вы наткнетесь? Мы в Одноклассниках решили использовать elasticsearch для решения вопроса с управлением логами и теперь делимся опытом с Хабром: и об архитектуре, и о подводных камнях.

Я Петр Зайцев, работаю системным администратором в Одноклассниках.

До этого я тоже был админом, работал с Manticore Search, Sphinx search, Elasticsearch. Возможно, если появится еще один поиск, возможно, я тоже с ним поработаю.

Я также участвую в ряде проектов с открытым исходным кодом на добровольной основе.

Когда я пришел в Одноклассники, на собеседовании я опрометчиво сказал, что могу работать с Elasticsearch. После того, как я освоился и выполнил несколько простых задач, передо мной стояла большая задача по реформированию существовавшей на тот момент системы управления журналами.



Требования

Системные требования были сформулированы следующим образом:
  • В качестве интерфейса предполагалось использовать Graylog. Потому что у компании уже был опыт использования этого продукта, программисты и тестировщики его знали, им он был знаком и удобен.

  • Объем данных: в среднем 50-80 тысяч сообщений в секунду, но если что-то сломается, то трафик ничем не ограничивается, он может составлять 2-3 миллиона строк в секунду.

  • Обсудив с заказчиками требования к скорости обработки поисковых запросов, мы поняли, что типичная схема использования такой системы такова: люди ищут логи своего приложения за последние два дня и не хотят ждать больше суток.

    второй для результата сформулированного запроса.

  • Администраторы настояли на том, чтобы при необходимости систему можно было легко масштабировать, не требуя от них глубоко вникать в то, как она работает.
  • Таким образом, единственная задача обслуживания, которую периодически требуют эти системы, — это замена некоторого оборудования.

  • Кроме того, в «Одноклассниках» есть прекрасная техническая традиция: любой сервис, который мы запускаем, должен пережить сбой дата-центра (внезапный, незапланированный и абсолютно в любой момент).

Больше всего нам обошлось последнее требование при реализации этого проекта, о котором я расскажу подробнее.



Среда

Мы работаем в четырех дата-центрах, а узлы данных Elasticsearch могут располагаться только в трех (по ряду нетехнических причин).

Эти четыре дата-центра содержат примерно 18 тысяч различных источников журналов — оборудование, контейнеры, виртуальные машины.

Важная особенность: кластер запускается в контейнерах.

Подман не на физических машинах, а на собственный облачный продукт one-cloud .

Контейнерам гарантировано 2 ядра, аналогично 2,0 ГГц v4, с возможностью перезапуска остальных ядер, если они простаивают. Другими словами:

Кластер Elasticsearch объемом 200 ТБ+



Топология

Изначально я видел общий вид решения следующим образом:
  • За А-записью домена Graylog стоят 3-4 ВИПа, это адрес, на который отправляются логи.

  • каждый VIP является балансировщиком LVS.
  • После него логи попадают на батарею Graylog, часть данных в формате GELF, часть в формате syslog.
  • Затем все это большими партиями записывается в группу координаторов Elasticsearch.
  • А они, в свою очередь, отправляют запросы на запись и чтение соответствующим узлам данных.



Кластер Elasticsearch объемом 200 ТБ+



Терминология

Возможно, не все подробно разбираются в терминологии, поэтому хотелось бы на ней немного остановиться.

Elasticsearch имеет несколько типов узлов — главный, координатор, узел данных.

Есть еще два типа для разных преобразований журналов и связи между разными кластерами, но мы использовали только перечисленные.

Владелец Он проверяет все узлы, присутствующие в кластере, поддерживает актуальную карту кластера и распределяет ее между узлами, обрабатывает логику событий и выполняет различные виды обслуживания всего кластера.

Координатор Выполняет одну единственную задачу: принимает запросы на чтение или запись от клиентов и маршрутизирует этот трафик.

В случае поступления запроса на запись, скорее всего, он спросит у мастера, в какой шард соответствующего индекса его следует поместить, и перенаправит запрос дальше.

Узел данных Хранит данные, выполняет поступающие извне поисковые запросы и выполняет операции над расположенными на нем шардами.

Грейлог Это что-то вроде слияния Kibana с Logstash в стеке ELK. Graylog сочетает в себе как пользовательский интерфейс, так и конвейер обработки журналов.

Под капотом Graylog работают Kafka и Zookeeper, которые обеспечивают подключение к Graylog как к кластеру.

Graylog умеет кэшировать логи (Kafka) в случае недоступности Elasticsearch и повторять неудачные запросы на чтение и запись, группировать и помечать логи согласно заданным правилам.

Как и Logstash, Graylog имеет функцию изменения строк перед их записью в Elasticsearch. Кроме того, в Graylog имеется встроенный сервис обнаружения, который позволяет на основе одного доступного узла Elasticsearch получить всю карту кластера и отфильтровать ее по определенному тегу, что дает возможность направлять запросы к конкретным контейнерам.

Визуально это выглядит примерно так:

Кластер Elasticsearch объемом 200 ТБ+

Это скриншот конкретного экземпляра.

Здесь мы строим гистограмму на основе поискового запроса и отображаем релевантные строки.



Индексы

Возвращаясь к архитектуре системы, хотелось бы подробнее остановиться на том, как мы построили индексную модель, чтобы все работало корректно.

На диаграмме выше это самый нижний уровень: узлы данных Elasticsearch. Индекс — это большой виртуальный объект, состоящий из фрагментов Elasticsearch. Сам по себе каждый из шардов является не чем иным, как индексом Lucene. А каждый индекс Lucene, в свою очередь, состоит из одного или нескольких сегментов.



Кластер Elasticsearch объемом 200 ТБ+

При проектировании мы посчитали, что для удовлетворения требования к скорости чтения большого объёма данных нам необходимо «распределить» эти данные равномерно по узлам данных.

Это привело к тому, что количество шардов на индекс (с репликами) должно быть строго равно количеству узлов данных.

Во-первых, чтобы обеспечить коэффициент репликации, равный двум (то есть мы можем потерять половину кластера).

А, во-вторых, чтобы обрабатывать запросы на чтение и запись хотя бы на половине кластера.

Сначала мы определили срок хранения как 30 дней.

Распределение шардов можно представить графически следующим образом:

Кластер Elasticsearch объемом 200 ТБ+

Весь темно-серый прямоугольник — это индекс.

Левый красный квадрат в нем — основной шард, первый по индексу.

А синий квадрат — это осколок точной копии.

Они расположены в разных дата-центрах.

Когда мы добавляем еще один осколок, он отправляется в третий дата-центр.

И в итоге получаем вот такую структуру, позволяющую потерять DC без потери целостности данных:

Кластер Elasticsearch объемом 200 ТБ+

Ротацию индексов, т.е.

создание нового индекса и удаление самого старого, мы сделали равным 48 часам (по схеме использования индекса: чаще всего ищутся последние 48 часов).

Такой интервал ротации индекса обусловлен следующими причинами: Когда поисковый запрос поступает на конкретный узел данных, то с точки зрения производительности выгоднее запрашивать один шард, если его размер сопоставим с размером бедра узла.

Это позволяет хранить «горячую» часть индекса в куче и быстро обращаться к ней.

Когда «горячих частей» много, скорость поиска по индексу ухудшается.

Когда узел начинает выполнять поисковый запрос на одном шарде, он выделяет количество потоков, равное количеству ядер гиперпоточности физической машины.

Если поисковый запрос затрагивает большое количество шардов, то количество потоков растет пропорционально.

Это отрицательно влияет на скорость поиска и негативно влияет на индексацию новых данных.

Чтобы обеспечить необходимую задержку поиска, мы решили использовать SSD. Для быстрой обработки запросов машины, на которых размещались эти контейнеры, должны были иметь не менее 56 ядер.

Цифра 56 была выбрана как условно достаточное значение, определяющее количество потоков, которые Elasticsearch будет генерировать во время работы.

В Elasitcsearch многие параметры пула потоков напрямую зависят от количества доступных ядер, что, в свою очередь, напрямую влияет на необходимое количество узлов в кластере по принципу «меньше ядер — больше узлов».

В результате мы выяснили, что в среднем шард весит около 20 гигабайт, а на индекс приходится 360 шардов.

Соответственно, если мы будем вращать их раз в 48 часов, то их у нас будет 15. Каждый индекс содержит данные за 2 дня.



Схемы записи и чтения данных

Давайте разберемся, как записываются данные в этой системе.

Допустим, от Graylog к координатору поступает какой-то запрос.

Например, мы хотим проиндексировать 2-3 тысячи строк.

Координатор, получив запрос от Graylog, задает вопрос мастеру: «В запросе на индексацию мы конкретно указали индекс, но в какой шард его записать, не указано».

Мастер отвечает: «Запишите эту информацию в шард номер 71», после чего она отправляется непосредственно в соответствующий узел данных, где находится Primary-Shard номер 71. После чего журнал транзакций реплицируется на реплику-шард, которая находится в другом дата-центре.



Кластер Elasticsearch объемом 200 ТБ+

От Graylog к координатору поступает запрос на поиск.

Координатор перенаправляет его по индексу, а Elasticsearch распределяет запросы между первичным шардом и репликой-шардом по принципу round-robin.

Кластер Elasticsearch объемом 200 ТБ+

180 узлов отвечают неравномерно, и пока они отвечают, координатор накапливает информацию, которая уже «выплеснута» более быстрыми узлами данных.

После этого, когда либо вся информация пришла, либо запрос достиг таймаута, он отдаёт всё напрямую клиенту.

Вся эта система в среднем обрабатывает поисковые запросы за последние 48 часов за 300-400 мс, исключая запросы с ведущим подстановочным знаком.



Цветы с Elasticsearch: настройка Java



Кластер Elasticsearch объемом 200 ТБ+

Чтобы все работало так, как мы изначально хотели, мы потратили очень много времени на отладку самых разных вещей в кластере.

Первая часть обнаруженных проблем была связана с тем, как Java предварительно настраивается по умолчанию в Elasticsearch. Проблема первая Мы видели очень большое количество отчетов о том, что на уровне Lucene при выполнении фоновых заданий слияние сегментов Lucene завершается с ошибкой.

При этом в логах было ясно, что это ошибка OutOfMemoryError. По телеметрии мы видели, что бедро свободно, и непонятно, почему эта операция не удалась.

Оказалось, что слияния индексов Lucene происходят за пределами бедра.

А контейнеры достаточно жестко ограничены по потребляемым ресурсам.

В эти ресурсы могла поместиться только куча (значение heap.size было примерно равно RAM), а некоторые операции вне кучи завершались с ошибкой выделения памяти, если по каким-то причинам не умещались в оставшиеся до лимита ~500МБ.

Исправление оказалось довольно тривиальным: был увеличен объем доступной для контейнера оперативной памяти, после чего мы забыли, что у нас вообще были подобные проблемы.

Проблема вторая Через 4-5 дней после запуска кластера мы заметили, что узлы данных стали периодически выпадать из кластера и заходить в него через 10-20 секунд. Когда мы начали в этом разбираться, оказалось, что эта внекучная память в Elasticsearch никак не контролируется.

Когда мы давали контейнеру больше памяти, мы смогли заполнить пулы прямых буферов различной информацией, а она очищалась только после явного запуска GC из Elasticsearch. В некоторых случаях эта операция занимала довольно много времени, и за это время кластер успел пометить этот узел как уже вышедший.

Эта проблема хорошо описана здесь .

Решение было следующим: мы ограничили возможность Java использовать для этих операций большую часть памяти вне кучи.

Мы ограничили его до 16 гигабайт (-XX:MaxDirectMemorySize=16g), гарантируя, что явный сбор мусора будет вызываться гораздо чаще и обрабатываться намного быстрее, тем самым больше не дестабилизируя кластер.

Проблема третья Если вы думаете, что проблемы с «узлами, выходящими из кластера в самый неожиданный момент» закончились, вы ошибаетесь.

Когда мы настраивали работу с индексами, мы выбрали mmapfs для сократить время поиска на свежих шардах с отличной сегментацией.

Это была настоящая ошибка, поскольку при использовании mmapfs файл отображается в оперативную память, а затем мы работаем с отображенным файлом.

Из-за этого получается, что когда GC пытается остановить потоки в приложении, мы очень долго идем в точку безопасности, и по пути к ней приложение перестает отвечать на запросы мастера о том, живо ли оно.

.

Соответственно, мастер считает, что узла больше нет в кластере.

После этого через 5-10 секунд срабатывает сборщик мусора, нода оживает, снова входит в кластер и начинает инициализировать шарды.

Все это было очень похоже на «постановку, которую мы заслужили» и не подходило ни для чего серьезного.

Чтобы избавиться от такого поведения, мы сначала перешли на стандартный ниофс, а затем, при переходе с пятой версии Elastic на шестую, попробовали гибридфс, где данная проблема не воспроизводилась.

Подробнее о типах хранения можно прочитать Здесь .

Проблема четвертая Затем была еще одна очень интересная проблема, которую мы решали за рекордное время.

Мы ловили его 2-3 месяца, потому что его закономерность была совершенно непонятна.

Иногда наши координаторы уходили на Full GC, обычно где-то после обеда, и никогда оттуда не возвращались.

При этом при протоколировании задержки GC это выглядело так: все идет хорошо-ну-ну, а потом вдруг все идет очень плохо.

Сначала мы подумали, что перед нами злой пользователь, который запускает какой-то запрос, выбивающий координатор из рабочего режима.

Мы очень долго логировали запросы, пытаясь разобраться, что происходит. В результате оказалось, что в тот момент, когда пользователь запускает огромный запрос, и он попадает к конкретному координатору Elasticsearch, некоторые узлы отвечают дольше, чем другие.

И пока координатор ждет ответа от всех узлов, он аккумулирует результаты, отправленные от уже ответивших узлов.

Для GC это означает, что наши шаблоны использования кучи меняются очень быстро.

И тот GC, который мы использовали, с этой задачей не справился.

Единственное решение, которое мы нашли, чтобы изменить поведение кластера в этой ситуации, — это переход на JDK13 и использование сборщика мусора Shenandoah. Это решило проблему, наши координаторы перестали падать.

На этом проблемы с Java закончились и начались проблемы с пропускной способностью.



«Ягоды» с Elasticsearch: пропускная способность



Кластер Elasticsearch объемом 200 ТБ+

Проблемы с пропускной способностью означают, что наш кластер работает стабильно, но в пики количества проиндексированных документов и во время маневров производительность недостаточна.

Первый встреченный симптом: во время каких-то «взрывов» в продакшене, когда вдруг генерируется очень большое количество логов, в Graylog начинает часто мигать ошибка индексации es_rejected_execution. Это произошло из-за того, что thread_pool.write.queue на одном узле данных до момента, когда Elasticsearch сможет обработать запрос на индексацию и загрузить информацию в шард на диске, по умолчанию способен кэшировать только 200 запросов.

И в Документация Elasticsearch Об этом параметре сказано очень мало.

Указывается только максимальное количество потоков и размер по умолчанию.

Мы, конечно, пошли подкрутить это значение и выяснили следующее: конкретно в нашей настройке достаточно хорошо кэшируется до 300 запросов, а большее значение чревато тем, что мы снова влетим в Full GC. Кроме того, поскольку это пакеты сообщений, которые приходят в рамках одного запроса, пришлось настроить Graylog так, чтобы он писал не часто и небольшими пакетами, а огромными пакетами или раз в 3 секунды, если пакет еще не полный.

В этом случае получается, что информация, которую мы пишем в Elasticsearch, становится доступной не через две секунды, а через пять (что нас вполне устраивает), а количество повторов, которые приходится сделать, чтобы протолкнуть большой стопка информации уменьшается.

Особенно это важно в те моменты, когда где-то что-то упало и яростно сообщает об этом, чтобы не получить полностью заспамленный Elastic, а через некоторое время — узлы Graylog, неработоспособные из-за забитых буферов.

Кроме того, когда у нас были такие же взрывы в продакшене, мы получали жалобы от программистов и тестировщиков: в тот момент, когда им действительно были нужны эти логи, они их выдавали очень медленно.

Они начали разбираться.

С одной стороны, было понятно, что и поисковые запросы, и запросы на индексацию обрабатываются, по сути, на одних и тех же физических машинах, и так или иначе будут определенные просадки.

Но частично это можно было обойти за счет того, что в шестой версии Elasticsearch появился алгоритм, позволяющий распределять запросы между релевантными узлами данных не по принципу случайного round-robin (контейнер, который делает индексацию и хранит первичные данные).

-shard может быть очень занят, быстро ответить не получится), но перенаправить этот запрос на менее загруженный контейнер с репликой-шардом, который ответит гораздо быстрее.

Другими словами, мы пришли к use_adaptive_replica_selection: true. Картина чтения начинает выглядеть так:

Кластер Elasticsearch объемом 200 ТБ+

Переход на этот алгоритм позволил существенно улучшить время запроса в те моменты, когда нам приходилось писать большой поток логов.

Наконец, главной проблемой стал безболезненный демонтаж дата-центра.

Чего мы хотели от кластера сразу после потери связи с одним DC:

  • Если у нас есть текущий мастер в вышедшем из строя дата-центре, то он будет перевыбран и перемещен в качестве роли на другой узел в другом ДЦ.

  • Мастер быстро удалит из кластера все недоступные узлы.

  • По оставшимся он поймет: в потерянном дата-центре у нас были такие-то первичные шарды, он быстро раскрутит комплементарные реплики-шарды в остальных дата-центрах, а мы продолжим индексацию данных.

  • В результате этого пропускная способность кластера на запись и чтение будет постепенно ухудшаться, но в целом все будет работать хоть и медленно, но стабильно.

Как оказалось, нам хотелось чего-то вроде этого:

Кластер Elasticsearch объемом 200 ТБ+

И мы получили следующее:

Кластер Elasticsearch объемом 200 ТБ+

Как это произошло? Когда дата-центр упал, узким местом стал наш мастер.

Почему? Дело в том, что на мастере есть TaskBatcher, который отвечает за распределение определенных задач и событий в кластере.

Любой выход узла, любое продвижение шарда из реплики в первичный, любая задача создать где-то шард — все это поступает сначала в TaskBatcher, где обрабатывается последовательно и в одном потоке.

На момент вывода одного дата-центра выяснилось, что все дата-ноды в уцелевших дата-центрах сочли своим долгом сообщить мастеру «мы потеряли такие-то шарды и такие-то узлы данных».

При этом уцелевшие узлы данных отправляли всю эту информацию текущему мастеру и пытались дождаться подтверждения того, что он ее принял.

Этого они не дождались, так как мастер получал задания быстрее, чем мог ответить.

У узлов истекал таймаут повторения запросов, а мастер в это время даже не пытался на них ответить, а был полностью поглощен задачей сортировки запросов по приоритету.

В терминальном виде получилось, что ноды данных спамили мастер до такой степени, что тот перешел на полный GC. После этого наша главная роль переместилась на какой-то следующий узел, с ним произошло абсолютно то же самое, и в результате кластер полностью рухнул.

Мы проводили измерения, и до версии 6.4.0, где это исправлено, нам достаточно было одновременно выводить только 10 узлов данных из 360, чтобы полностью отключить кластер.

Это выглядело примерно так:

Кластер Elasticsearch объемом 200 ТБ+

После версии 6.4.0, где была исправлена эта ужасная ошибка, узлы данных перестали убивать мастера.

Но это не сделало его «умнее».

А именно: когда мы выводим 2, 3 или 10 (любое число, кроме одного) узлов данных, мастер получает какое-то первое сообщение, в котором говорится, что узел A ушел, и пытается сообщить об этом узлу B, узлу C, узлу D. И на данный момент справиться с этим можно только установив таймаут на попытки рассказать кому-то о чем-то, равный примерно 20-30 секундам, и таким образом контролировать скорость выезда дата-центра за пределы кластера.

В принципе, это укладывается в те требования, которые изначально предъявлялись к конечному продукту в рамках проекта, но с точки зрения «чистой науки» это баг.

Что, кстати, было успешно исправлено разработчиками в версии 7.2. Более того, когда вышел из строя определенный узел данных, оказалось, что распространение информации о его выходе важнее, чем сообщение всему кластеру о том, что на нем есть такие-то первично-шарды (чтобы раскрутить реплику-шард в другом data-узле).

центр в первичке, и на них можно было писать информацию).

Поэтому, когда все уже отгремело, освободившиеся узлы данных не сразу помечаются как устаревшие.

Соответственно, мы вынуждены ждать, пока истечет время ожидания всех пингов к освободившимся узлам данных, и только после этого наш кластер начинает нам сообщать, что там, там и там нужно продолжить запись информации.

Вы можете прочитать об этом больше Здесь .

В результате операция по выводу дата-центра сегодня занимает у нас около 5 минут в час пик.

Для такой большой и неповоротливой махины это довольно хороший результат. В результате мы пришли к следующему решению:

  • У нас есть 360 узлов данных с дисками по 700 гигабайт.
  • 60 координаторов для маршрутизации трафика через эти самые узлы данных.

  • 40 мастеров, которые мы оставили в качестве своеобразного наследства начиная с версий до 6.4.0 - чтобы пережить вывод дата-центра, мы были морально готовы потерять несколько машин, чтобы гарантированно иметь кворум мастеров даже в худший сценарий
  • Любые попытки объединить роли на одном контейнере встречались с тем, что рано или поздно нода ломалась под нагрузкой.

  • Весь кластер использует heap.size в 31 гигабайт: все попытки уменьшить размер приводили либо к уничтожению некоторых узлов на тяжелых поисковых запросах с ведущим wildcard, либо к срабатыванию автоматического выключателя в самом Elasticsearch.
  • Кроме того, для обеспечения производительности поиска мы старались держать количество объектов в кластере как можно меньшим, чтобы обрабатывать как можно меньше событий в узком месте, которое мы получили в мастере.



Наконец о мониторинге

Чтобы все это работало так, как задумано, мы следим за следующим:
  • Каждый узел данных сообщает нашему облаку, что он существует, и на нем есть такие-то шарды.

    Когда мы где-то что-то гасим, кластер через 2-3 секунды сообщает, что в центре А мы потушили узлы 2, 3 и 4 — это значит, что в других дата-центрах мы ни при каких обстоятельствах не можем гасить те узлы, на которых только один шард. левый.

  • Зная природу поведения мастера, мы очень внимательно смотрим на количество невыполненных задач.

    Потому что даже одна зависшая задача, если она не выйдет по таймауту вовремя, теоретически в какой-то аварийной ситуации может стать причиной того, что, например, не работает продвижение шарда реплики в первичный, из-за чего перестанет работать индексация.

  • Также мы очень внимательно следим за задержками сборщика мусора, потому что у нас уже были большие трудности с этим при оптимизации.

  • Отклоняет по ветке, чтобы заранее понять, где узкое место.

  • Ну и стандартные метрики, такие как куча, оперативная память и ввод-вывод.
При построении мониторинга необходимо учитывать особенности Thread Pool в Elasticsearch. Документация Elasticsearch описывает параметры конфигурации и значения по умолчанию для поиска и индексирования, но совершенно ничего не говорит о thread_pool.management. Эти потоки обрабатывают, в частности, запросы типа _cat/shards и другие подобные, которые удобно использовать при написании мониторинга.

Чем больше кластер, тем больше таких запросов выполняется в единицу времени, а вышеупомянутый thread_pool.management не только не представлен в официальной документации, но и по умолчанию ограничен 5 потоками, от которых очень быстро избавляются после какой мониторинг перестает работать корректно.

Что хочу сказать в заключение: мы это сделали! Мы смогли дать нашим программистам и разработчикам инструмент, который практически в любой ситуации сможет быстро и достоверно предоставить информацию о том, что происходит на производстве.

Да, это оказалось довольно сложно, но, тем не менее, нам удалось вписать наши пожелания в существующие продукты, которые нам не пришлось патчить и переписывать под себя.



Кластер Elasticsearch объемом 200 ТБ+

Теги: #Системное администрирование #Высокая производительность #DevOps #одноклассники #highload #Поисковые технологии #elasticsearch #oktech #ok.tech

Вместе с данным постом часто просматривают: