Дружим ELK и Exchange. Часть 2

Первая часть эпопеи находится тут.

Logstash в стеке ELK используется для интеллектуальной обработки логов и подготовки получаемых логов для размещения в elastic в виде документов, на базе которых будет удобно строить различные визуализации в Kibana.

Установка

Установка включает два шага:

  • Установка и настройка пакета OpenJDK
  • Установка и настройка пакета Logstash

Установка и настройка пакета OpenJDK

Пакет OpenJDK необходимо скачать и распаковать в определённую директорию.

Затем путь до этой директории необходимо внести в переменные $env:Path и $env:JAVA_HOME операционной системы Windows:

Проверка версии java:

PS C:\> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Установка и настройка пакета Logstash

Файл-архив с дистрибутивом Logstash качается отсюда.

Архив нужно распаковать в корень диска. Распаковывать в папку “c:\program files” не стоит – Logstash при этом отказывается нормально запускаться.

Затем необходимо внести правки в файл jvm.options, которые отвечают за выделение оперативной памяти для процесса java. Рекомендуется указать половину оперативной памяти сервера. Если сервер содержит на борту 16Гб оперативки, то дефолтные ключи:

-Xms1g
-Xmx1g

Необходимо заменить на:

-Xms8g
-Xmx8g

Кроме этого имеет смысл закомментировать строку “-XX:+UseConcMarkSweepGC”. Подробнее об этот тут.

Следующий шаг – создание дефолтной конфигурации в файле logstash.conf:

input {
  stdin{}
}
 
filter {
}
 
output {
  stdout {
    codec => "rubydebug"
  }
}

При использовании этой конфигурации Logstash считывает данные из консоли, пропускает через пустой фильтр и выводит обратно в консоль. Применение этой конфигурации позволит проверить работоспособность Logstash. Для этого запускаем Logstash в интерактивном режиме следующим образом:

PS C:\...\bin> .\logstash.bat -f .\logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Видно, что Logstash успешно запустился на порту 9600.

Финальный шаг установки – запуск Logstash в виде сервиса Windows. Это можно сделать, например, с помощью пакета NSSM:

PS C:\...\bin> .\nssm.exe install logstash
Service "logstash" installed successfully!

Отказоустойчивость

Сохранность логов в процессе передачи с исходного сервера обеспечивается механизмом Persistent Queues.

Как работает

Схема расположения очередей в процессе обработки логов:

input queue filter + output

Input plugin получает данные с источника логов, записывает их в очередь и отправляет источнику логов подтверждение получения логов.

Сообщения из очереди обрабатываются Logstash, проходят фильтр и output plugin. При получении подтверждения отправки лога от output plugin Logstash удаляет обработанный лог из очереди.

Если Logstash останавливается, то все необработанные сообщения и сообщения, по которым не получено подтверждение об отправке остаются в очереди и Logstash продолжит их обработку при следующем запуске.

Настройка

Регулируется следующими ключами в файле C:\Logstash\config\logstash.yml:

  • queue.type: (возможные значения – persisted и memory (default)).
  • path.queue: (пусть до папки с файлами очередей, по умолчанию хранятся тут – C:\Logstash\queue).
  • queue.page_capacity: (максимальный размер страницы очереди, значение по умолчанию – 64mb).
  • queue.drain: (true/false – включает/выключает остановку обработки очереди перед выключением Logstash, не рекомендуется использовать, так как прямо скажется на скорости выключения сервера).
  • queue.max_events: (максимально число событий в очереди, по умолчанию – 0 (не ограничено)).
  • queue.max_bytes: (максимальный размер очереди в байтах, по умолчанию – 1024mb (1gb)).

Если настроены queue.max_events и queue.max_bytes сообщения перестают приниматься в очередь при наступлении любой настройки, которая сработала первой.

Подробнее про Persistent Queues тут.

Пример части logstash.yml, отвечающей за настройку очереди:

queue.type: persisted
queue.max_bytes: 10gb

Настройка

Конфигурация Logstash обычно состоит из трёх частей, отвечающих за разные фазы обработки входящий логов: приём (секция input), парсинг (секция filter) и отправка в elastic (секция output). Ниже мы подробнее рассмотрим каждую из них.

Input

Входящий поток с сырыми логами мы принимаем с агентов filebeat. Именно этот плагин мы и указываем в секции input:

input {
  beats {
    port => 5044
  }
}

После такой настройки Logstash начинает прослушивать порт 5044 и при получении логов обрабатывает их согласно настройках секции filter. При необходимости, можно канал получения логов с filebit завернуть в ssl. Подробнее о настройках плагина beats тут.

Filter

Все интересные для обработки текстовые логи, которые генерирует Exchange имеют csv-формат с описанными в самом файле логов полями. Для парсинга csv-записей Logstash предлагает нам три плагина: dissect, csv и grok. Первый – самый быстрый, но справляется с парсингом только самых простых логов.

Например, следующую запись он разобьёт на две (из-за наличия внутри поля запятой), из-за чего лог будет разобран неправильно:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Его можно использовать при парсинге логов IIS, например. В этом случае секция filter может выглядеть следующим образом:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Конфигурация Logstash позволяет использовать условные операторы, поэтому мы в плагин dissect можем направить только логи, которые были помечены filebeat тэгом “IIS”. Внутри плагина мы сопоставляем значения полей с их названиями, удаляем исходное поле message, которое содержало запись из лога и можем добавить произвольное поле, которое у нас будет, например, содержать имя приложения, из которого мы собираем логи.

В случае с логами трэкинга лучше использовать плагин csv – он корректно умеет обрабатывать сложные поля:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Внутри плагина мы сопоставляем значения полей с их названиями, удаляем исходное поле message (а также поля tenant-id и schema-version), которое содержало запись из лога и можем добавить произвольное поле, которое у нас будет, например, содержать имя приложения, из которого мы собираем логи.

На выходе из стадии фильтрации мы получим документы в первом приближении готовые к визуализации в kibana. Не хватать нам будет следующего:

  • Числовые поля будут распознаны как текст, что делает невозможным операции с ними. А именно, поля time-taken лога IIS, а также поля recipient-count и total-bites лога Tracking
  • Стандартный временной штамп документа будет содержать время обработки лога, а не время записи его на стороне сервера
  • Поле recipient-address будет выглядеть одной стройкой, что сделает невозможным анализ, связанный с подсчётом получателей писем

Настало время добавить немного магии в процесс обработки логов.

Конвертация числовых полей

Плагин dissect имеет опцию convert_datatype, которую можно использовать для конвертации текстового поля в цифровой формат. Например, так:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Стоит помнить, что этот метод подходит только в том случае, если поле точно будет содержать строку. Null-значения из полей опция не обрабатывает и вываливается в исключение.

Для логов трэкинга аналогичный метод convert лучше не использовать, так как поля recipient-count и total-bites могут быть пустыми. Для конвертации этих полей лучше использовать плагин mutate:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}
Разбиение recipient_address на отдельных получателей

Эту задачу можно так же решить с помощью плагина mutate:

mutate {
  split => ["recipient_address", ";"]
}
Изменяем timestamp

В случае с логами трэкинга задача очень просто решается плагином date, который поможет прописать в поле timestamp дату и время в нужном формате из поля date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

В случае с логами IIS нам будет необходимо объединить данные полей date и time с помощью плагина mutate, прописать нужную нам временную зону и поместить этот временной штамп в timestamp с помощью плагина date:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Output

Секция output используется для отправки обработанных логов в приёмник логов. В случае отправки напрямую в elastic используется плагин elasticsearch, в котором указывается адрес сервера и шаблон имени индекса для отправки сформированного документа:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Итоговая конфигурация

Итоговая конфигурация будет выглядеть следующим образом:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Полезные ссылки:

1 thought on “Дружим ELK и Exchange. Часть 2

Leave a Reply

Your email address will not be published. Required fields are marked *