Отправка gzip-журналов Cloudtrail из S3 в ElasticSearch
Я относительно новичок во всей части установки ELK, поэтому, пожалуйста, продолжайте.
То, что я хочу сделать, это отправить журналы Cloudtrail, которые хранятся на S3, в локально настроенную ELK (я имею в виду не AWS). Я не использую Filebeat нигде в настройке. Я считаю, что это не обязательно использовать. Logstash может напрямую доставлять данные в ES.
- Я здесь?
Как только данные в ES, я бы просто хотел визуализировать их в Kibana.
То, что я пробовал до сих пор, учитывая, что мой ELK запущен и работает, а Filebeat не участвует в настройке:
используя плагин S3 logstash
содержимое /etc/logstash/conf.d/aws_ct_s3.conf
input {
s3 {
access_key_id => "access_key_id"
bucket => "bucket_name_here"
secret_access_key => "secret_access_key"
prefix => "AWSLogs/<account_number>/CloudTrail/ap-southeast-1/2019/01/09"
sincedb_path => "/tmp/s3ctlogs.sincedb"
region => "us-east-2"
codec => "json"
add_field => { source => gzfiles }
}
}
output {
stdout { codec => json }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "attack-%{+YYYY.MM.dd}"
}
}
Когда logstash запускается с вышеупомянутым conf, я вижу, что все работает нормально. Используя плагин head для Google Chrome, я вижу, что документы постоянно добавляются в указанный индекс. Фактически, когда я просматриваю его, я вижу, что есть данные, которые мне нужны. Я могу видеть то же самое на стороне Кибаны.
Данные каждого из этих файлов gzip имеют формат:
{
"Records": [
dictionary_D1,
dictionary_D2,
.
.
.
]
}
И я хочу, чтобы каждый из этих словарей из списка словарей был отдельным событием в Кибане. С некоторой прибылью я понимаю, что могу использовать split
фильтр для достижения того, что я хочу. А теперь мой aws_ct_s3.conf
выглядит примерно так:
input {
s3 {
access_key_id => "access_key_id"
bucket => "bucket_name_here"
secret_access_key => "secret_access_key"
prefix => "AWSLogs/<account_number>/CloudTrail/ap-southeast-1/2019/01/09"
sincedb_path => "/tmp/s3ctlogs.sincedb"
region => "us-east-2"
codec => "json"
add_field => { source => gzfiles }
}
}
filter {
split {
field => "Records"
}
}
output {
stdout { codec => json }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "attack-%{+YYYY.MM.dd}"
}
}
И с этим я фактически получаю данные, которые мне нужны на Кибане.
Теперь проблема в том,
Без установленного фильтра количество документов, отправляемых Logstash из S3 в Elasticsearch, было в ГБ, а после применения фильтра оно остановилось примерно на 5000 документов.
Я не знаю, что я здесь делаю не так. Может ли кто-нибудь помочь, пожалуйста?
Текущая конфигурация:
java -XshowSettings:vm => Max Heap Size: 8.9 GB
elasticsearch jvm options => max and min heap size: 6GB
logstash jvm options => max and min heap size: 2GB
ES version - 6.6.0
LS version - 6.6.0
Kibana version - 6.6.0