В этой статье я расскажу о том, как в Яндекс.
Деньгах организована система сбора и доставки логов серверов платежных сервисов.
Я занимаюсь доставкой логов уже последний год и за это время накопил достаточно нюансов, чтобы поделиться своим опытом с общественностью.
Система построена на основе стека EHK (Elasticsearch/Heka/Kibana) с прицелом на работу практически в реальном времени.
Особый акцент я сделаю на тонкостях и нюансах обработки миллиардов строк текста в день.
Я верю в три вещи: мониторинг, логи и резервное копирование.Для быстрого доступа к журналам операций в Яндекс.
Деньгах используйте ЭластичныйПоиск , получая данные почти от сотни микросервисов, входящих в состав платежного сервиса.
Под «живыми» я подразумеваю логи, доступные в режиме реального времени: задержка доставки новых записей из файла журнала в кластер Elasticsearch теперь составляет менее секунды.
Служба мониторинга всегда точно знает состояние службы в любой момент, и разработчики могут быстро оценить и скорректировать поведение новой версии каждого микросервиса сразу после выпуска.
Но так было не всегда.
Когда пять месяцев назад я пришел в компанию, передо мной стояла задача организовать доставку логов оперативного сервиса в кластер.
ЭластичныйПоиск (далее просто ЭС).
На тот момент использовались четыре схемы их разбора и доставки в ES:
- Heka → Выход TCP → Heka → ES.
- Heka → Apache Kafka → Apache Flume → ES.
- Системный журнал → ES.
- nxlog → nxlog → ES.
Логи на самом деле очень простая вещь: множество файлов, быстро разрастающихся в объёме на боевых серверах.Поскольку «чем больше соединений в зонтике, тем выше вероятность его поломки», я отбросил все нерабочие и неоправданно сложные схемы и остановился на той, в которой роль процессора и транспорта будет выполнять Heka.Поэтому простые решения в этом случае самые надежные.
Гека собирает логи и отправляет их по TCP на другую Heka для дальнейшей пересылки в ElasticSearch Cluster .
Идея следующая: на каждом сервере устанавливается Heka, логи всех сервисов на этом сервере упаковываются в бинарный протокол Protobuf и отправляются на прием Heka для парсинга, фильтрации и отправки в ES. Почему не классический стек ELK и почему устарел Heka, а не Logstash С точки зрения конфигурации и программного состава наш ES-кластер выглядит так:
- ЭластичныйПоиск 2.2.
- Кибана 4.4.2.
- Два мастер-узла: Intel Xeon 2x E5-2660, 64 ГБ ОЗУ, 2x 146 ГБ RAID-10.
- Клиентский узел с установленной Kibana: Intel Xeon 2xE5-2660, 64 ГБ ОЗУ, 2x146 ГБ RAID-10.
- Четыре узла данных: Intel Xeon 2x E5-2640 v3; 512 ГБ ОЗУ, 3x16 ТБ RAID-10.
Такая организация существенно ускоряет размещение индексов и ребалансировку кластера.
В современном мире стек Elasticsearch/Logstash/Kibana стал практически стандартом де-факто для работы с логами.
И если к Elasticsearch и Kibana возражений нет, то с Logstash есть один нюанс — он создается на jRuby (интерпретатор языка Ruby, написанный на Java) и требует наличия в системе JVM. Учитывая, что Яндекс.
Деньги — это набор микросервисов, размещенных на сотнях физических серверов и виртуальных контейнерах, было бы неправильно устанавливать тяжеловесные Logstash и JAVA в каждый из них.
Выбор пал на Heka из-за ее простоты, надежности, легкости, возможности фильтрации проходящих сообщений и отличной буферизации данных.
Что касается статуса продукта (deprecated), то для нас это не аргумент. Лук и стрелы для военных целей также не рекомендуются, но это никоим образом не мешает вам выстрелить кому-нибудь в голову с гарантированным результатом.
Если, например, вам нужен нестандартный плагин или процессор, то доработать продукт вам поможет целый штат опытных разработчиков.
Но это всё была теория, а при переходе к практике начались проблемы.
Дьявол скрывался в объёме данных Учитывая финансовую специфику нашей работы, практически все сервисы записывают в логи много разной информации.
Например и для понимания масштаба: объем логов некоторых компонентов системы достигает до 250 тысяч строк в минуту .
Никакая Heka, на каком бы мощном оборудовании она ни была, не справится с таким объёмом в одиночку — деградация производительности и потеря данных неизбежны.
Ни то, ни другое, конечно, совершенно неприемлемо, поэтому на помощь приходит HAProxy .
Итоговая схема оказалась следующей:
На диаграмме показано общее направление трафика логов от источников Heka к комбинации HAProxy + Heka. На каждом сервере есть одна Heka, которая собирает логи микросервисов этого сервера.
Данные собираются, упаковываются в Protobuf и отправляются по TCP в балансировщик нагрузки, обслуживающий центр обработки данных.
Бэкенды — это HAProxy, расположенные непосредственно на узлах данных ES, за которыми находятся пулы.
Гека .
В свою очередь они получают данные, перепаковывают их в ESJson и отправляют на локальный узел данных по протоколу HTTP. .
и в разных форматах файлов журналов Несмотря на то, что основной язык в компании — Java и логи выводятся через стандартную библиотеку log4j, единого принятого формата на момент построения «кластера мечты» не было.
Каждый микросервис записывал свой собственный тип журналов, включая варианты набора полей вывода и форматов даты и времени.
Ждать, пока разработка приведёт логи к общепринятым форматам, мне не хотелось, поэтому движение к цели было параллельным.
Одновременно с анализом существующих форматов журналов создавались задания на модификацию, а по мере выхода новых версий с правильными форматами менялись настройки сборщиков.
Перейдем к самому соку – нюансам настройки.
Поведение Геки описано .
toml файлы, которые при запуске собираются в единую конфигурацию, и в зависимости от описанных блоков Heka строит модель обработки данных.
Каждый блок проходит несколько этапов:
- Ввод — входной поток (это может быть файл, ввод TCP/UDP, данные, считанные из Kafka, события контейнера Docker и т. д.).
- Сплиттер — здесь мы указываем начало и конец каждого блока данных в потоке.
Обязательный шаг для многострочных данных, таких как Java Stacktrace.
- Декодер – описывает правила декодирования входящих данных.
В основном мы используем декодеры Regex.
- Фильтры – этап фильтрации и изменения данных.
- Кодировщики — кодирование потока данных в соответствии с форматом получателя.
- Вывод — здесь мы описываем, как и куда должны идти данные.
Идти И Луа .
При необходимости можно написать что-то свое.
Например, плагин Lua-фильтра, который будет блокировать отправку записей о запросах мониторинга к сервису в ES; или вырезать конфиденциальные данные из журналов.
Гека в древнеегипетской мифологии — бог магии.А то, что Heka позволяет делать с бревнами, просто волшебно.
Параметры сервера-источника журналов
Рассмотрим конфигурацию Heka на примере исходного сервера для логов и файла сервис.toml
.
Самый простой случай — один файл, ротация происходит средствами системы.[money-service-log] type = "LogstreamerInput" log_directory = "/var/log/tomcat" file_match = "money-service.log" splitter = "RegexSplitter" decoder = "service_decoder"
Если файлов много и они различаются по формату, то каждый из них следует описать парой вход/декодер.
За более подробным описанием лучше обратиться к официалам.
Если что-то осталось неясным, обязательно спрашивайте в комментариях.
[RegexSplitter]
delimiter = '\n(\[\d\d\d\d-\d\d-\d\d)'
delimiter_eol = false
Поскольку логи могут быть многострочными (те же stacktraces), не забывайте про RegexSplitter, который позволяет Heka понять, где заканчивается один блок текста и начинается другой.
[service_decoder]
type = "PayloadRegexDecoder"
match_regex = '^\[(ЭP<timestamp>\d{4}-\d{2}-\d{2}T[\d:.
\+]+\])\s+(ЭP<level>[A-Z]+)\s+\[(ЭP<thread>.
*)\]\s+\[(ЭP<context>\S*)\]\s+\[(ЭP<traceid>\S*)\]\s+\[(ЭP<unilabel>\S*)\]\s\[(ЭP<class>\S+)\]\s-\s(ЭP<msg>.
*)'
log_errors = true
[service_decoder.message_fields]
@timestamp = "%timestamp%"
level = "%level%"
thread = "%thread%"
context = "%context%"
traceid = "%traceid%"
unilabel = "%unilabel%"
class = "%class%"
msg = "%msg%"
В match_regex Строки журнала описываем с помощью регулярного выражения в стандарте языка Go. Регулярные выражения в Go практически совпадают со стандартным PCRE, но есть ряд нюансов, из-за которых Heka может отказаться запускаться.
Например, некоторые реализации PCRE принимают такой синтаксис: (?<groupname>.
*)
Но ГОЛАНГ не простит. Использование параметра log_errors Все ошибки собираем в отдельный лог — они понадобятся позже.
[ServiceOutput]
type = "TcpOutput"
address = "loadbalancer.server.address:port"
keep_alive = true
message_matcher = "Logger == 'money-appname'"
use_buffering = true
[ServiceOutput.buffering]
max_file_size = 100857600
max_buffer_size = 1073741824
full_action = "block"
Буферизация вывода — одна из замечательных особенностей Heka. По умолчанию он сохраняет выходной буфер по следующему пути: /var/cache/hekad/output_queue/OutputName
В настройках мы ограничиваем размер каждого буферного файла 100 МБ, а также устанавливаем общий размер кэша для каждого модуля вывода 1 ГБ.
Параметр полное_действие может принимать три значения:
- неисправность — при переполнении буфера Heka останавливается;
- уронить — при переполнении буфера он начинает работать как стек, удаляя старые сообщения в очереди;
- блокировать — когда буфер заполнен, Heka приостанавливает все операции и ждет, пока можно будет отправить данные.
Единственным минусом является то, что в случае потери соединения или ухудшения качества канала вы получите резкий скачок трафика при возобновлении соединения.
Это связано с тем, что Heka отправляет накопленный буфер, пытаясь вернуться к обработке в реальном времени.
Приемный пул необходимо проектировать с запасом, учитывая возможность возникновения подобных ситуаций, иначе вы легко можете провести DDoS-атаку самостоятельно.
Кстати, по поводу использования двойных и одинарных кавычек в конфигурации Heka подразумевается следующее:
- Значения переменных в одинарных кавычках по умолчанию рассматриваются как таковые.
необработанная строка .
Нет необходимости экранировать специальные символы.
- Значения переменных в двойных кавычках обрабатываются как обычный текст и требуют двойного экранирования при использовании в них регулярных выражений.
Конфигурация серверной части
Бэкэнд для балансировщика представляет собой комбинацию HAProxy и трех экземпляров Heka на каждом узле данных ES. В HAProxy все достаточно просто и, как мне кажется, не требует пояснений: listen pool_502
bind 0.0.0.0:502
balance roundrobin
default-server fall 5 inter 5000 weight 10
server heka_1502 127.0.0.1:1502 check
server heka_2502 127.0.0.1:2502 check
server heka_3502 127.0.0.1:3502 check
На каждом сервере работают три экземпляра Heka, отличающиеся только портами:
[Input_502_1]
type = "TcpInput"
address = "0.0.0.0:1502"
keep_alive = true
keep_alive_period = 180
decoder = "MultiDecoder_502"
[MultiDecoder_502]
type = "MultiDecoder"
subs = ['Service1Decoder', 'Service2Decoder']
cascade_strategy = "first-wins"
log_sub_errors = true
В конфигурации используется MultiDecoder, поскольку логи многих сервисов проходят через один порт. Политика «первый победитель» означает, что после первого совпадения дальнейший перебор декодеров прекращается.
[Service1Decoder]
type = "ProtobufDecoder"
[Service2Decoder]
type = "ProtobufDecoder"
[Service1Encoder]
type = "ESJsonEncoder"
index = "service1-logs-%{%Y.%m.%d}"
es_index_from_timestamp = false
type_name = "%{Logger}"
fields = [ "DynamicFields", "Payload", "Hostname" ]
dynamic_fields = ["@timestamp", "level", "thread", "context", "traceid", "unilabel", "class", "msg"]
[Service1Encoder.field_mappings]
Payload = "message"
Hostname = "MessageSourceAddress"
Параметр es_index_from_timestamp нужен для того, чтобы указать, что дата и время формирования имени индекса берутся не из входящих данных, а из местного времени сервера.
Позволяет избежать беспорядка, когда серверы работают в разное время с x зоны и кто-то пишет время в логах в UTC, а кто-то в MSK. В параметре индекс Реализован принцип «один сервис – один индекс», каждый день создается новый индекс.
[Service1Output]
type = "ElasticSearchOutput"
message_matcher = "Logger == 'money-service1'"
server = " http://localhost:9200 "
encoder = "Service1Encoder"
use_buffering = true
Плагины вывода анализируют поток данных на основе параметра message_matcher , соответствующий имени файла журнала.
Чтобы избежать перегрузки сети, Heka отправляет данные на локальный узел данных, на котором он установлен.
А затем ES сама рассылает индексированные данные по транспортному протоколу между узлами данных кластера.
Заключение Описанная выше схема успешно работает и индексирует 25-30 тысяч записей в секунду.
Запас прочности приемных пулов Heka позволяет им выдерживать пики нагрузки до 100 тысяч записей/сек:
Статистика из Заббикс .
В ElasticSearch мы храним логи за последний 21 день.
Опыт показывает, что онлайн-доступ к старым данным требуется крайне редко.
Но при необходимости вы всегда можете запросить их у лог-серверов, которые хранят данные практически вечно.
Текущее состояние кластера по Копфу.
Я описал только ту часть системы, которая касается сбора и доставки логов, поэтому в следующей статье я расскажу о самом кластере ElasticSearch и его настройке.
Думаю, я вам расскажу, как мы это виртуализировали, как перешли с версии 2.2 на 5.3 и перевезли с собой 24 миллиарда записей, не теряя при этом веры в человечество.
Может быть, добавить в историю еще что-то, какие-то упущенные детали? Поделитесь своим мнением в комментариях.
Теги: #Оптимизация серверов #ИТ-инфраструктура #Системное администрирование #DevOps #elasticsearch #kibana #heka
-
Дата-Центр Ржд. Системы Сигнализации
19 Oct, 24 -
Гугл Переводчик
19 Oct, 24 -
Зумер
19 Oct, 24 -
Практическое Задание С Анализом Решения
19 Oct, 24