Работа С Потоком Журналов В Реальном Времени С Использованием Heka. Опыт Яндекс.денег



Работа с потоком журналов в реальном времени с использованием Heka. Опыт Яндекс.
</p><p>
Денег

В этой статье я расскажу о том, как в Яндекс.

Деньгах организована система сбора и доставки логов серверов платежных сервисов.

Я занимаюсь доставкой логов уже последний год и за это время накопил достаточно нюансов, чтобы поделиться своим опытом с общественностью.

Система построена на основе стека EHK (Elasticsearch/Heka/Kibana) с прицелом на работу практически в реальном времени.

Особый акцент я сделаю на тонкостях и нюансах обработки миллиардов строк текста в день.

Я верю в три вещи: мониторинг, логи и резервное копирование.

Для быстрого доступа к журналам операций в Яндекс.

Деньгах используйте ЭластичныйПоиск , получая данные почти от сотни микросервисов, входящих в состав платежного сервиса.

Под «живыми» я подразумеваю логи, доступные в режиме реального времени: задержка доставки новых записей из файла журнала в кластер Elasticsearch теперь составляет менее секунды.

Служба мониторинга всегда точно знает состояние службы в любой момент, и разработчики могут быстро оценить и скорректировать поведение новой версии каждого микросервиса сразу после выпуска.

Но так было не всегда.

Когда пять месяцев назад я пришел в компанию, передо мной стояла задача организовать доставку логов оперативного сервиса в кластер.

ЭластичныйПоиск (далее просто ЭС).

На тот момент использовались четыре схемы их разбора и доставки в ES:

  • Heka → Выход TCP → Heka → ES.
  • Heka → Apache Kafka → Apache Flume → ES.
  • Системный журнал → ES.
  • nxlog → nxlog → ES.
Почти все они работали неидеально: кластер Kafka был ненадежен, Flume периодически зависал, из-за чего ES впадал в ступор.

Логи на самом деле очень простая вещь: множество файлов, быстро разрастающихся в объёме на боевых серверах.

Поэтому простые решения в этом случае самые надежные.

Поскольку «чем больше соединений в зонтике, тем выше вероятность его поломки», я отбросил все нерабочие и неоправданно сложные схемы и остановился на той, в которой роль процессора и транспорта будет выполнять Heka.

Работа с потоком журналов в реальном времени с использованием Heka. Опыт Яндекс.
</p><p>
Денег

Гека собирает логи и отправляет их по TCP на другую Heka для дальнейшей пересылки в ElasticSearch Cluster .

Идея следующая: на каждом сервере устанавливается Heka, логи всех сервисов на этом сервере упаковываются в бинарный протокол Protobuf и отправляются на прием Heka для парсинга, фильтрации и отправки в ES. Почему не классический стек ELK и почему устарел Heka, а не Logstash С точки зрения конфигурации и программного состава наш ES-кластер выглядит так:

  • ЭластичныйПоиск 2.2.
  • Кибана 4.4.2.
  • Два мастер-узла: Intel Xeon 2x E5-2660, 64 ГБ ОЗУ, 2x 146 ГБ RAID-10.
  • Клиентский узел с установленной Kibana: Intel Xeon 2xE5-2660, 64 ГБ ОЗУ, 2x146 ГБ RAID-10.
  • Четыре узла данных: Intel Xeon 2x E5-2640 v3; 512 ГБ ОЗУ, 3x16 ТБ RAID-10.
Все ES-узлы расположены в одной локальной сети и подключены к одному маршрутизатору, что позволяет осуществлять связь внутри кластера по транспортному протоколу на максимальной скорости.

Такая организация существенно ускоряет размещение индексов и ребалансировку кластера.

В современном мире стек Elasticsearch/Logstash/Kibana стал практически стандартом де-факто для работы с логами.

И если к Elasticsearch и Kibana возражений нет, то с Logstash есть один нюанс — он создается на jRuby (интерпретатор языка Ruby, написанный на Java) и требует наличия в системе JVM. Учитывая, что Яндекс.

Деньги — это набор микросервисов, размещенных на сотнях физических серверов и виртуальных контейнерах, было бы неправильно устанавливать тяжеловесные Logstash и JAVA в каждый из них.

Выбор пал на Heka из-за ее простоты, надежности, легкости, возможности фильтрации проходящих сообщений и отличной буферизации данных.

Что касается статуса продукта (deprecated), то для нас это не аргумент. Лук и стрелы для военных целей также не рекомендуются, но это никоим образом не мешает вам выстрелить кому-нибудь в голову с гарантированным результатом.

Если, например, вам нужен нестандартный плагин или процессор, то доработать продукт вам поможет целый штат опытных разработчиков.

Но это всё была теория, а при переходе к практике начались проблемы.

Дьявол скрывался в объёме данных Учитывая финансовую специфику нашей работы, практически все сервисы записывают в логи много разной информации.

Например и для понимания масштаба: объем логов некоторых компонентов системы достигает до 250 тысяч строк в минуту .

Никакая Heka, на каком бы мощном оборудовании она ни была, не справится с таким объёмом в одиночку — деградация производительности и потеря данных неизбежны.

Ни то, ни другое, конечно, совершенно неприемлемо, поэтому на помощь приходит HAProxy .

Итоговая схема оказалась следующей:

Работа с потоком журналов в реальном времени с использованием Heka. Опыт Яндекс.
</p><p>
Денег

На диаграмме показано общее направление трафика логов от источников Heka к комбинации HAProxy + Heka. На каждом сервере есть одна Heka, которая собирает логи микросервисов этого сервера.

Данные собираются, упаковываются в Protobuf и отправляются по TCP в балансировщик нагрузки, обслуживающий центр обработки данных.

Бэкенды — это HAProxy, расположенные непосредственно на узлах данных ES, за которыми находятся пулы.

Гека .

В свою очередь они получают данные, перепаковывают их в ESJson и отправляют на локальный узел данных по протоколу HTTP. .

и в разных форматах файлов журналов Несмотря на то, что основной язык в компании — Java и логи выводятся через стандартную библиотеку log4j, единого принятого формата на момент построения «кластера мечты» не было.

Каждый микросервис записывал свой собственный тип журналов, включая варианты набора полей вывода и форматов даты и времени.

Ждать, пока разработка приведёт логи к общепринятым форматам, мне не хотелось, поэтому движение к цели было параллельным.

Одновременно с анализом существующих форматов журналов создавались задания на модификацию, а по мере выхода новых версий с правильными форматами менялись настройки сборщиков.

Перейдем к самому соку – нюансам настройки.

Поведение Геки описано .

toml файлы, которые при запуске собираются в единую конфигурацию, и в зависимости от описанных блоков Heka строит модель обработки данных.

Каждый блок проходит несколько этапов:

  • Ввод — входной поток (это может быть файл, ввод TCP/UDP, данные, считанные из Kafka, события контейнера Docker и т. д.).

  • Сплиттер — здесь мы указываем начало и конец каждого блока данных в потоке.

    Обязательный шаг для многострочных данных, таких как Java Stacktrace.

  • Декодер – описывает правила декодирования входящих данных.

    В основном мы используем декодеры Regex.

  • Фильтры – этап фильтрации и изменения данных.

  • Кодировщики — кодирование потока данных в соответствии с форматом получателя.

  • Вывод — здесь мы описываем, как и куда должны идти данные.

Все эти этапы — всего лишь плагины.

Идти И Луа .

При необходимости можно написать что-то свое.

Например, плагин Lua-фильтра, который будет блокировать отправку записей о запросах мониторинга к сервису в ES; или вырезать конфиденциальные данные из журналов.

Гека в древнеегипетской мифологии — бог магии.

А то, что Heka позволяет делать с бревнами, просто волшебно.



Параметры сервера-источника журналов

Рассмотрим конфигурацию Heka на примере исходного сервера для логов и файла сервис.

toml .

  
  
  
  
  
  
  
  
  
   

[money-service-log] type = "LogstreamerInput" log_directory = "/var/log/tomcat" file_match = "money-service.log" splitter = "RegexSplitter" decoder = "service_decoder"

Самый простой случай — один файл, ротация происходит средствами системы.

Если файлов много и они различаются по формату, то каждый из них следует описать парой вход/декодер.

За более подробным описанием лучше обратиться к официалам.

документация .

Если что-то осталось неясным, обязательно спрашивайте в комментариях.



[RegexSplitter] delimiter = '\n(\[\d\d\d\d-\d\d-\d\d)' delimiter_eol = false

Поскольку логи могут быть многострочными (те же stacktraces), не забывайте про RegexSplitter, который позволяет Heka понять, где заканчивается один блок текста и начинается другой.



[service_decoder] type = "PayloadRegexDecoder" match_regex = '^\[(ЭP<timestamp>\d{4}-\d{2}-\d{2}T[\d:.

\+]+\])\s+(ЭP<level>[A-Z]+)\s+\[(ЭP<thread>.

*)\]\s+\[(ЭP<context>\S*)\]\s+\[(ЭP<traceid>\S*)\]\s+\[(ЭP<unilabel>\S*)\]\s\[(ЭP<class>\S+)\]\s-\s(ЭP<msg>.

*)' log_errors = true [service_decoder.message_fields] @timestamp = "%timestamp%" level = "%level%" thread = "%thread%" context = "%context%" traceid = "%traceid%" unilabel = "%unilabel%" class = "%class%" msg = "%msg%"

В match_regex Строки журнала описываем с помощью регулярного выражения в стандарте языка Go. Регулярные выражения в Go практически совпадают со стандартным PCRE, но есть ряд нюансов, из-за которых Heka может отказаться запускаться.

Например, некоторые реализации PCRE принимают такой синтаксис:

(?<groupname>.

*)

Но ГОЛАНГ не простит. Использование параметра log_errors Все ошибки собираем в отдельный лог — они понадобятся позже.



[ServiceOutput] type = "TcpOutput" address = "loadbalancer.server.address:port" keep_alive = true message_matcher = "Logger == 'money-appname'" use_buffering = true [ServiceOutput.buffering] max_file_size = 100857600 max_buffer_size = 1073741824 full_action = "block"

Буферизация вывода — одна из замечательных особенностей Heka. По умолчанию он сохраняет выходной буфер по следующему пути:

/var/cache/hekad/output_queue/OutputName

В настройках мы ограничиваем размер каждого буферного файла 100 МБ, а также устанавливаем общий размер кэша для каждого модуля вывода 1 ГБ.

Параметр полное_действие может принимать три значения:

  • неисправность — при переполнении буфера Heka останавливается;
  • уронить — при переполнении буфера он начинает работать как стек, удаляя старые сообщения в очереди;
  • блокировать — когда буфер заполнен, Heka приостанавливает все операции и ждет, пока можно будет отправить данные.

С блокировкой вы гарантированно не потеряете ни одной строчки журнала.

Единственным минусом является то, что в случае потери соединения или ухудшения качества канала вы получите резкий скачок трафика при возобновлении соединения.

Это связано с тем, что Heka отправляет накопленный буфер, пытаясь вернуться к обработке в реальном времени.

Приемный пул необходимо проектировать с запасом, учитывая возможность возникновения подобных ситуаций, иначе вы легко можете провести DDoS-атаку самостоятельно.

Кстати, по поводу использования двойных и одинарных кавычек в конфигурации Heka подразумевается следующее:

  • Значения переменных в одинарных кавычках по умолчанию рассматриваются как таковые.

    необработанная строка .

    Нет необходимости экранировать специальные символы.

  • Значения переменных в двойных кавычках обрабатываются как обычный текст и требуют двойного экранирования при использовании в них регулярных выражений.

Этот нюанс испортил мне в свое время много крови.



Конфигурация серверной части

Бэкэнд для балансировщика представляет собой комбинацию HAProxy и трех экземпляров Heka на каждом узле данных ES. В HAProxy все достаточно просто и, как мне кажется, не требует пояснений:

listen pool_502 bind 0.0.0.0:502 balance roundrobin default-server fall 5 inter 5000 weight 10 server heka_1502 127.0.0.1:1502 check server heka_2502 127.0.0.1:2502 check server heka_3502 127.0.0.1:3502 check

На каждом сервере работают три экземпляра Heka, отличающиеся только портами:

[Input_502_1] type = "TcpInput" address = "0.0.0.0:1502" keep_alive = true keep_alive_period = 180 decoder = "MultiDecoder_502" [MultiDecoder_502] type = "MultiDecoder" subs = ['Service1Decoder', 'Service2Decoder'] cascade_strategy = "first-wins" log_sub_errors = true

В конфигурации используется MultiDecoder, поскольку логи многих сервисов проходят через один порт. Политика «первый победитель» означает, что после первого совпадения дальнейший перебор декодеров прекращается.



[Service1Decoder] type = "ProtobufDecoder" [Service2Decoder] type = "ProtobufDecoder" [Service1Encoder] type = "ESJsonEncoder" index = "service1-logs-%{%Y.%m.%d}" es_index_from_timestamp = false type_name = "%{Logger}" fields = [ "DynamicFields", "Payload", "Hostname" ] dynamic_fields = ["@timestamp", "level", "thread", "context", "traceid", "unilabel", "class", "msg"] [Service1Encoder.field_mappings] Payload = "message" Hostname = "MessageSourceAddress"

Параметр es_index_from_timestamp нужен для того, чтобы указать, что дата и время формирования имени индекса берутся не из входящих данных, а из местного времени сервера.

Позволяет избежать беспорядка, когда серверы работают в разное время с x зоны и кто-то пишет время в логах в UTC, а кто-то в MSK. В параметре индекс Реализован принцип «один сервис – один индекс», каждый день создается новый индекс.



[Service1Output] type = "ElasticSearchOutput" message_matcher = "Logger == 'money-service1'" server = " http://localhost:9200 " encoder = "Service1Encoder" use_buffering = true

Плагины вывода анализируют поток данных на основе параметра message_matcher , соответствующий имени файла журнала.

Чтобы избежать перегрузки сети, Heka отправляет данные на локальный узел данных, на котором он установлен.

А затем ES сама рассылает индексированные данные по транспортному протоколу между узлами данных кластера.

Заключение Описанная выше схема успешно работает и индексирует 25-30 тысяч записей в секунду.

Запас прочности приемных пулов Heka позволяет им выдерживать пики нагрузки до 100 тысяч записей/сек:

Работа с потоком журналов в реальном времени с использованием Heka. Опыт Яндекс.
</p><p>
Денег

Статистика из Заббикс .

В ElasticSearch мы храним логи за последний 21 день.

Опыт показывает, что онлайн-доступ к старым данным требуется крайне редко.

Но при необходимости вы всегда можете запросить их у лог-серверов, которые хранят данные практически вечно.



Работа с потоком журналов в реальном времени с использованием Heka. Опыт Яндекс.
</p><p>
Денег

Текущее состояние кластера по Копфу.

Я описал только ту часть системы, которая касается сбора и доставки логов, поэтому в следующей статье я расскажу о самом кластере ElasticSearch и его настройке.

Думаю, я вам расскажу, как мы это виртуализировали, как перешли с версии 2.2 на 5.3 и перевезли с собой 24 миллиарда записей, не теряя при этом веры в человечество.

Может быть, добавить в историю еще что-то, какие-то упущенные детали? Поделитесь своим мнением в комментариях.

Теги: #Оптимизация серверов #ИТ-инфраструктура #Системное администрирование #DevOps #elasticsearch #kibana #heka

Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.