Централизованные Логи Для Приложений, Использующих Heka+Elasticsearch+Kibana

В этой статье описывается настройка централизованного журналирования для разных типов приложений (Python, Java (java.util.logging), Go, bash) с использованием достаточно нового проекта Heka. Heka разработана в Mozilla и написана на Go. Вот почему я использую его вместо logstash, который имеет аналогичные возможности.

Heka основана на плагинах пяти типов:

  1. Ввод - каким-то образом получает данные (слушает порты, читает файлы и т.д.);
  2. Декодеры — обрабатывают входящие запросы и переводят их во внутренние структуры сообщений;
  3. Фильтры – выполнять любые действия с сообщениями;
  4. Кодеры (не понятно как переводить) — кодируют внутренние сообщения в форматы, которые отправляются через плагины вывода;
  5. Выходные - отправьте данные куда-нибудь.

Например, в случае приложений Java входной плагин — LogstreamerInput, который отслеживает изменения в файле журнала.

Новые строки в журнале обрабатываются PayloadRegexDecoder (в соответствии с указанным форматом), а затем отправляются в elasticsearch через плагин вывода ElasticSearchOutput. Выходной плагин, в свою очередь, кодирует сообщение из внутренней структуры в формат elasticsearch через ESJsonEncoder.

установка Хека

Все способы установки описаны на сайте ( http://hekad.readthedocs.org/en/v0.8.2/installing.html#binaries ).

Но проще всего скачать готовый бинарный пакет для вашей системы со страницы https://github.com/mozilla-services/heka/releases .

Так как я использую сервера под Ubuntu, то описание будет для этой системы.

В этом случае установка сводится к установке самого deb-пакета, настройке файла конфигурации /etc/hekad.toml и добавлению его в сервисы выскочки.

Базовая конфигурация /etc/hekad.toml включает настройку количества процессов (я установил равным количеству ядер), дашборда (в котором видно, какие плагины включены) и udp-сервера на порту 5565, который прослушивает сообщения через протокол google protobuf (используется для приложений Python и Go):

  
  
  
  
  
  
  
  
  
  
   

maxprocs = 4 [Dashboard] type = "DashboardOutput" address = ":4352" ticker_interval = 15 [UdpInput] address = "127.0.0.1:5565" parser_type = "message.proto" decoder = "ProtobufDecoder"

Конфигурация для выскочки /etc/init/hekad.conf:

start on runlevel [2345] respawn exec /usr/bin/hekad -config=/etc/hekad.toml



Логирование приложений Python

Здесь используется библиотека https://github.com/kalail/heka-py и специальный обработчик для модуля регистрации.

Код:

import logging from traceback import format_exception try: import heka HEKA_LEVELS = { logging.CRITICAL: heka.severity.CRITICAL, logging.ERROR: heka.severity.ERROR, logging.WARNING: heka.severity.WARNING, logging.INFO: heka.severity.INFORMATIONAL, logging.DEBUG: heka.severity.DEBUG, logging.NOTSET: heka.severity.NOTICE, } except ImportError: heka = None class HekaHandler(logging.Handler): _notified = None conn = None host = '127.0.0.1:5565' def __init__(self, name, host=None): if host is not None: self.host = host self.name = name super(HekaHandler, self).

__init__() def emit(self, record): if heka is None: return fields = { 'Message': record.getMessage(), 'LineNo': record.lineno, 'Filename': record.filename, 'Logger': record.name, 'Pid': record.process, 'Exception': '', 'Traceback': '', } if record.exc_info: trace = format_exception(*record.exc_info) fields['Exception'] = trace[-1].

strip() fields['Traceback'] = ''.

join(trace[:-1]).

strip() msg = heka.Message( type=self.name, severity=HEKA_LEVELS[record.levelno], fields=fields, ) try: if self.conn is None: self.__class__.conn = heka.HekaConnection(self.host) self.conn.send_message(msg) except: if self.__class__._notified is None: print "Sending HEKA message failed" self.__class__._notified = True

По умолчанию обработчик ожидает, что Heka будет прослушивать порт 5565.

Логирование приложений Go

Для ведения журнала я создал библиотеку журналирования.

https://github.com/ivpusic/golog и добавили возможность отправлять сообщения напрямую в Heka. Результат находится здесь: https://github.com/ildus/golog Использование:

import "github.com/ildus/golog" import "github.com/ildus/golog/appenders" .

logger := golog.Default logger.Enable(appenders.Heka(golog.Conf{ "addr": "127.0.0.1", "proto": "udp", "env_version": "2", "message_type": "myserver.log", })) .

logger.Debug("some message")



Логирование Java-приложений

Для Java-приложений используется входной плагин типа LogstreamerInput со специальным декодером регулярных выражений.

Он читает журналы приложений из файлов, которые должны быть записаны в определенном формате.

Конфигурация для хеки, отвечающей за чтение и расшифровку логов:

[JLogs] type = "LogstreamerInput" log_directory = "/some/path/to/logs" file_match = 'app_(ЭP<Seq>\d+\.

\d+)\.

log' decoder = "JDecoder" priority = ["Seq"] [JDecoder] type = "PayloadRegexDecoder" #Parses com.asdf[INFO|main|2014-01-01 3:08:06]: Server started match_regex = '^(ЭP<LoggerName>[\w\.

]+)\[(ЭP<Severity>[A-Z]+)\|(ЭP<Thread>[\w\d\-]+)\|(ЭP<Timestamp>[\d\-\s:]+)\]: (ЭP<Message>.

*)' timestamp_layout = "2006-01-02 15:04:05" timestamp_location = "Europe/Moscow" [JDecoder.severity_map] SEVERE = 3 WARNING = 4 INFO = 6 CONFIG = 6 FINE = 6 FINER = 7 FINEST = 7 [JDecoder.message_fields] Type = "myserver.log" Message = "%Message%" Logger = "%LoggerName%" Thread = "%Thread%" Payload = ""

В приложении нужно изменить Форматер через logging.properties. Пример logging.properties:

handlers= java.util.logging.FileHandler java.util.logging.ConsoleHandler java.util.logging.FileHandler.level=ALL java.util.logging.FileHandler.pattern = logs/app_%g.%u.log java.util.logging.FileHandler.limit = 1024000 java.util.logging.FileHandler.formatter = com.asdf.BriefLogFormatter java.util.logging.FileHandler.append=tru java.util.logging.ConsoleHandler.level=ALL java.util.logging.ConsoleHandler.formatter=com.asdf.BriefLogFormatter

Код BriefLogFormatter:

package com.asdf; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.logging.Formatter; import java.util.logging.LogRecord; public class BriefLogFormatter extends Formatter { private static final DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static final String lineSep = System.getProperty("line.separator"); /** * A Custom format implementation that is designed for brevity. */ public String format(LogRecord record) { String loggerName = record.getLoggerName(); if(loggerName == null) { loggerName = "root"; } StringBuilder output = new StringBuilder() .

append(loggerName) .

append("[") .

append(record.getLevel()).

append('|') .

append(Thread.currentThread().

getName()).

append('|') .

append(format.format(new Date(record.getMillis()))) .

append("]: ") .

append(record.getMessage()).

append(' ') .

append(lineSep); return output.toString(); } }



Скрипты журналирования (bash)

Для bash heka добавляет входной фильтр TcpInput (который прослушивает определенный порт) и PayloadRegexDecoder для декодирования сообщений.

Конфигурация в hekad.toml:

[TcpInput] address = "127.0.0.1:5566" parser_type = "regexp" decoder = "TcpPayloadDecoder" [TcpPayloadDecoder] type = "PayloadRegexDecoder" #Parses space_checker[INFO|2014-01-01 3:08:06]: Need more space on disk /dev/sda6 match_regex = '^(ЭP<LoggerName>[\w\.

\-]+)\[(ЭP<Hostname>[^\|]+)\|(ЭP<Severity>[A-Z]+)\|(ЭP<Timestamp>[\d\-\s:]+)\]: (ЭP<Message>.

*)' timestamp_layout = "2006-01-02 15:04:05" timestamp_location = "Europe/Moscow" [TcpPayloadDecoder.severity_map] ERROR = 3 WARNING = 4 INFO = 6 ALERT = 1 [TcpPayloadDecoder.message_fields] Type = "scripts" Message = "%Message%" Logger = "%LoggerName%" Hostname = "%Hostname%" Payload = "[%Hostname%|%LoggerName%] %Message%"

Для логирования написана функция, отправляющая сообщения на TCP-порт в заданном формате:

log() { if [ "$1" ]; then echo -e "app1[`hostname`|INFO|`date '+%Y-%m-%d %H:%M:%S'`]: $1" | nc 127.0.0.1 5566 || true echo $1 fi }

Отправка сообщения уровня INFO с типом app1:

log "test test test"



Отправка записей в elasticsearch

В hekad.conf добавлена следующая конфигурация:

[ESJsonEncoder] index = "heka-%{Type}-%{2006.01.02}" es_index_from_timestamp = true type_name = "%{Type}" [ElasticSearchOutput] message_matcher = "Type == 'myserver.log' || Type=='scripts' || Type=='nginx.access' || Type=='nginx.error'" server = "http://<elasticsearch_ip>:9200" flush_interval = 5000 flush_count = 10 encoder = "ESJsonEncoder"

Здесь мы указываем, где находится elasticsearch, как должны формироваться индексы и какие типы сообщений туда следует отправлять.



Просмотр журналов

Kibana 4 используется для просмотра логов.

Он еще находится в стадии бета-тестирования, но уже вполне работает. Для установки необходимо скачать архив со страницы http://www.elasticsearch.org/overview/kibana/installation/ , распакуйте его в любую папку на сервере и укажите расположение сервера elasticsearch в файле config/kibana.yml (параметр elasticsearch_url).

При первом запуске вам нужно будет добавить индексы на вкладке «Настройки».

Прежде чем вы сможете добавить шаблон индекса и правильно определить поля, вы должны отправить тестовое сообщение из каждого приложения.

Затем вы можете добавить шаблон индекса, например heka-* (который будет отображать все сообщения) или heka-scripts-*, тем самым отделив приложения друг от друга.

Затем вы можете перейти на вкладку Discover и посмотреть сами записи.



Заключение

Я показал лишь часть того, что можно логировать с помощью Heka. Если кому интересно, могу показать часть конфигурации Ansible, которая автоматически устанавливает хеку на все сервера, и на выбранный elasticsearch с кибаной.

Теги: #централизованное журналирование #golang #elasticsearch #java #python #Go

Вместе с данным постом часто просматривают: