FluentD AWS плагин Kinesis не ставит записи для Kinesis
Я пытался выяснить, является ли FluentD жизнеспособным способом для моей компании отправлять данные в Kinesis через. Я установил FLuentD с помощью rd-пакета td-agent на экземпляре Amazon EC2. Существует не так много параметров для отправки данных, просто плагин TCP или in_forward может подойти лучше всего. Я начал с TCP, но я никогда ничего не делал с TCP, так что извините, если я сделаю несколько новых ошибок.
Я ударился головой о стену, пытаясь отправить TCP-запросы и посмотреть, как они отправляются в Kinesis. Кинезис показывает, что не было никаких записей. Сначала я подумал, что не посылаю достаточно данных, чтобы инициировать запись, поэтому я попытался отправить тонны данных, но не повезло. Погугление проблемы не дало мне ответов, так как был только один результат, который казался жизнеспособным, и это был вопрос на каком-то сайте - netdownload - без ответов.
Конфигурация FluentD:
<match beacon.test>
type kinesis
stream_name test_stream
region us-east-1
random_partition_key true
debug true
</match>
<source>
type tcp
tag beacon.test
format json
log_level debug
</source>
CONNECT_TO_ADDR - это константа, определенная как публичный IP моего экземпляра ec2. Код для отправки данных TCP:
define('TCP_PORT', 5170);
for($i = 0; $i < 312500000; $i++)
{
Send(array('id' => $i, 'value' => 'testing123'));
}
function Send($obj)
{
try
{
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if($socket === false)
{
throw new Exception(GetSocketError("Unable to create a socket"));
}
if(socket_connect($socket, CONNECT_TO_ADDR, TCP_PORT))
{
$message = json_encode($obj, JSON_FORCE_OBJECT);
if($message === false)
{
throw new Exception("Unable to create JSON object: " . json_last_error_msg());
}
$totalByteCount = strlen($message);
$wrote = socket_send($socket, $message, $totalByteCount, MSG_EOF);
if($wrote === false)
{
throw new Exception(GetSocketError("Unable to write to socket"));
}
echo "Wrote $wrote/$totalByteCount bytes successfully!\n";
}
else
{
throw new Exception(GetSocketError("Unable to connect to socket"));
}
}
catch(Exception $ex)
{
if(isset($socket) && $socket !== false)
{
socket_shutdown($socket);
socket_close($socket);
}
throw $ex;
}
}
Вывод из журнала FluentD:
D, [2015-08-11T21: 17: 00.244426 # 9848] DEBUG -: [Aws::Kinesis::Client 200 >0.130176 0 повторных попыток] description_stream(имя_потока: "test_stream")
2015-08-11 21:17:00 +0000 [info]: прослушивание беглого сокета на 0.0.0.0:24224
2015-08-11 21:17:00 +0000 [info]: listen dRuby uri="druby://127.0.0.1:24230" >object="Engine" 2015-08-11 21:17:00 +0000 [ отладка]: прослушивание сокета TCP на 0.0.0.0:5170
2015-08-12 03:28:01 +0000 [info]: принудительная очистка буферизованных событий
1 ответ
Как насчет добавления copy + stdout для отладки? Таким образом, вы можете подтвердить, что ваше PHP-приложение на самом деле отправляет данные на ваш Fluentd или нет.
<match>
type copy
<store>
type stdout
</store>
<store>
type kinesis
stream_name test_stream
region us-east-1
random_partition_key true
debug true
flush_interval 10s
</store>
</match>
Кроме того, добавление опции "flush_interval 10s" ускорит сброс буфера.