fluentd aws kinesis插件没有将记录放入kinesis
fluentd aws kinesis plugin not putting records to kinesis
我一直在尝试查看 FluentD 是否是我公司将数据发送到 Kinesis 的可行方法。我在 Amazon EC2 实例上使用 td-agent rpm 包安装了 FLuentD。
没有太多关于如何发送数据的参数,只是 TCP 或 in_forward 插件可能是最合适的。我从 TCP 开始,但我之前从未真正用 TCP 做过任何事情,如果我犯了一些新的错误,我深表歉意。
我一直在用头撞墙,试图发送 TCP 请求并将它们放入 Kinesis。 Kinesis 显示没有写入。起初我以为我没有发送足够的数据来触发写入,所以我尝试向它发送大量数据但没有这样的运气。谷歌搜索这个问题没有给我带来任何答案,因为只有一个结果似乎是可行的,这是某个网站上的一个问题 - netdownload - 没有答案。
FluentD 配置:
<match beacon.test>
type kinesis
stream_name test_stream
region us-east-1
random_partition_key true
debug true
</match>
<source>
type tcp
tag beacon.test
format json
log_level debug
</source>
CONNECT_TO_ADDR 是一个常量,定义为我的 ec2 实例的 Public IP。
发送 TCP 数据的代码:
define('TCP_PORT', 5170);
for($i = 0; $i < 312500000; $i++)
{
Send(array('id' => $i, 'value' => 'testing123'));
}
function Send($obj)
{
try
{
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if($socket === false)
{
throw new Exception(GetSocketError("Unable to create a socket"));
}
if(socket_connect($socket, CONNECT_TO_ADDR, TCP_PORT))
{
$message = json_encode($obj, JSON_FORCE_OBJECT);
if($message === false)
{
throw new Exception("Unable to create JSON object: " . json_last_error_msg());
}
$totalByteCount = strlen($message);
$wrote = socket_send($socket, $message, $totalByteCount, MSG_EOF);
if($wrote === false)
{
throw new Exception(GetSocketError("Unable to write to socket"));
}
echo "Wrote $wrote/$totalByteCount bytes successfully!\n";
}
else
{
throw new Exception(GetSocketError("Unable to connect to socket"));
}
}
catch(Exception $ex)
{
if(isset($socket) && $socket !== false)
{
socket_shutdown($socket);
socket_close($socket);
}
throw $ex;
}
}
FluentD 日志的输出:
D, [2015-08-11T21:17:00.244426 #9848] DEBUG -- : [Aws::Kinesis::Client 200 >0.130176 0 retries] describe_stream(stream_name:"test_stream")
2015-08-11 21:17:00 +0000 [info]: listening fluent socket on 0.0.0.0:24224
2015-08-11 21:17:00 +0000 [info]: listening dRuby uri="druby://127.0.0.1:24230" >object="Engine"
2015-08-11 21:17:00 +0000 [debug]: listening tcp socket on 0.0.0.0:5170
2015-08-12 03:28:01 +0000 [info]: force flushing buffered events
添加copy + stdout调试怎么样?通过这种方式,您可以确认您的 PHP 应用是否确实将数据发送到您的 Fluentd。
<match>
type copy
<store>
type stdout
</store>
<store>
type kinesis
stream_name test_stream
region us-east-1
random_partition_key true
debug true
flush_interval 10s
</store>
</match>
此外,添加 "flush_interval 10s" 选项将使缓冲区刷新更频繁。
我一直在尝试查看 FluentD 是否是我公司将数据发送到 Kinesis 的可行方法。我在 Amazon EC2 实例上使用 td-agent rpm 包安装了 FLuentD。 没有太多关于如何发送数据的参数,只是 TCP 或 in_forward 插件可能是最合适的。我从 TCP 开始,但我之前从未真正用 TCP 做过任何事情,如果我犯了一些新的错误,我深表歉意。
我一直在用头撞墙,试图发送 TCP 请求并将它们放入 Kinesis。 Kinesis 显示没有写入。起初我以为我没有发送足够的数据来触发写入,所以我尝试向它发送大量数据但没有这样的运气。谷歌搜索这个问题没有给我带来任何答案,因为只有一个结果似乎是可行的,这是某个网站上的一个问题 - netdownload - 没有答案。
FluentD 配置:
<match beacon.test>
type kinesis
stream_name test_stream
region us-east-1
random_partition_key true
debug true
</match>
<source>
type tcp
tag beacon.test
format json
log_level debug
</source>
CONNECT_TO_ADDR 是一个常量,定义为我的 ec2 实例的 Public IP。 发送 TCP 数据的代码:
define('TCP_PORT', 5170);
for($i = 0; $i < 312500000; $i++)
{
Send(array('id' => $i, 'value' => 'testing123'));
}
function Send($obj)
{
try
{
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if($socket === false)
{
throw new Exception(GetSocketError("Unable to create a socket"));
}
if(socket_connect($socket, CONNECT_TO_ADDR, TCP_PORT))
{
$message = json_encode($obj, JSON_FORCE_OBJECT);
if($message === false)
{
throw new Exception("Unable to create JSON object: " . json_last_error_msg());
}
$totalByteCount = strlen($message);
$wrote = socket_send($socket, $message, $totalByteCount, MSG_EOF);
if($wrote === false)
{
throw new Exception(GetSocketError("Unable to write to socket"));
}
echo "Wrote $wrote/$totalByteCount bytes successfully!\n";
}
else
{
throw new Exception(GetSocketError("Unable to connect to socket"));
}
}
catch(Exception $ex)
{
if(isset($socket) && $socket !== false)
{
socket_shutdown($socket);
socket_close($socket);
}
throw $ex;
}
}
FluentD 日志的输出:
D, [2015-08-11T21:17:00.244426 #9848] DEBUG -- : [Aws::Kinesis::Client 200 >0.130176 0 retries] describe_stream(stream_name:"test_stream")
2015-08-11 21:17:00 +0000 [info]: listening fluent socket on 0.0.0.0:24224
2015-08-11 21:17:00 +0000 [info]: listening dRuby uri="druby://127.0.0.1:24230" >object="Engine" 2015-08-11 21:17:00 +0000 [debug]: listening tcp socket on 0.0.0.0:5170
2015-08-12 03:28:01 +0000 [info]: force flushing buffered events
添加copy + stdout调试怎么样?通过这种方式,您可以确认您的 PHP 应用是否确实将数据发送到您的 Fluentd。
<match>
type copy
<store>
type stdout
</store>
<store>
type kinesis
stream_name test_stream
region us-east-1
random_partition_key true
debug true
flush_interval 10s
</store>
</match>
此外,添加 "flush_interval 10s" 选项将使缓冲区刷新更频繁。