使用 Ruby 的 AWS 开发工具包时,在 AWS Kinesis Firehose 记录之间插入换行符
Insert newlines between AWS Kinesis Firehose records when using the AWS SDK for Ruby
我有一个 AWS Kinesis Firehose,它通过 S3 将数据发送到 Redshift。
我希望在使用 put_record_batch
发送的记录之间出现换行符。目前我的代码如下所示:
records = [{ id: 1, value: "foo" }, { id: 2, value: "bar" }]
Aws::Firehose::Client.new(
region: "us-east-1"
).put_record_batch({
delivery_stream_name: "my_firehose",
records: records
)
在 S3 中结束的记录如下所示:
{"id":1,"value":"foo"}{"id":2,"value":"bar"}
我希望 S3 文件看起来像这样:
{"id":1,"value":"foo"}
{"id":2,"value":"bar"}
这将使必要时手动解析文件变得更加容易(例如,如果我们需要调试为什么数据没有从 S3 传输到 Redshift)。
put_record
的解决方案很简单:将数据转换为 JSON 并添加换行符:
record = { id: 1, value: "foo" }
Aws::Firehose::Client.new(
region: "us-east-1"
).put_record({
delivery_stream_name: "my_firehose",
data: record.to_json << "\n"
)
我试图用 put_record_batch
做类似的事情:
records = [{ id: 1, value: "foo"}, { id: 2, value: "bar" }]
json_records = records.map { |record| record.to_json << "\n" }
Aws::Firehose::Client.new(
region: "us-east-1"
).put_record_batch({
delivery_stream_name: "my_firehose",
records: json_records
)
但这导致了错误:
ArgumentError: parameter validator found 2 errors:
- expected params[:records][0] to be a hash, got value "{\"id\":1,\"value\":\"foo\"}\n" (class: String) instead.
- expected params[:records][1] to be a hash, got value "{\"id\":2,\"value\":\"bar\"}\n" (class: String) instead.
from /mnt/istore/apps/my_app/shared/bundle/ruby/2.7.0/gems/aws-sdk-core-3.89.1/lib/aws-sdk-core/param_validator.rb:33:in `validate!'
看来我们需要发送一个散列。
put_record_batch
的文档说:
Kinesis Data Firehose buffers records before delivering them to the destination. To disambiguate the data blobs at the destination, a common solution is to use delimiters in the data, such as a newline (\n
) or some other character unique within the data. This allows the consumer application to parse individual data items when reading the data from the destination.
我该怎么做?
我正在使用 aws-sdk-firehose version 1.26.0 gem。
我认为问题在于我在使用 put_record_batch
时遗漏了 data
键。这似乎有效:
records = [{ id: 1, value: "foo"}, { id: 2, value: "bar" }]
json_records = records.map do |record|
# Previously this line was `record.to_json << "\n"`
{ data: record.to_json << "\n" }
end
Aws::Firehose::Client.new(
region: "us-east-1"
).put_record_batch({
delivery_stream_name: "my_firehose",
records: json_records
)
我有一个 AWS Kinesis Firehose,它通过 S3 将数据发送到 Redshift。
我希望在使用 put_record_batch
发送的记录之间出现换行符。目前我的代码如下所示:
records = [{ id: 1, value: "foo" }, { id: 2, value: "bar" }]
Aws::Firehose::Client.new(
region: "us-east-1"
).put_record_batch({
delivery_stream_name: "my_firehose",
records: records
)
在 S3 中结束的记录如下所示:
{"id":1,"value":"foo"}{"id":2,"value":"bar"}
我希望 S3 文件看起来像这样:
{"id":1,"value":"foo"}
{"id":2,"value":"bar"}
这将使必要时手动解析文件变得更加容易(例如,如果我们需要调试为什么数据没有从 S3 传输到 Redshift)。
put_record
的解决方案很简单:将数据转换为 JSON 并添加换行符:
record = { id: 1, value: "foo" }
Aws::Firehose::Client.new(
region: "us-east-1"
).put_record({
delivery_stream_name: "my_firehose",
data: record.to_json << "\n"
)
我试图用 put_record_batch
做类似的事情:
records = [{ id: 1, value: "foo"}, { id: 2, value: "bar" }]
json_records = records.map { |record| record.to_json << "\n" }
Aws::Firehose::Client.new(
region: "us-east-1"
).put_record_batch({
delivery_stream_name: "my_firehose",
records: json_records
)
但这导致了错误:
ArgumentError: parameter validator found 2 errors:
- expected params[:records][0] to be a hash, got value "{\"id\":1,\"value\":\"foo\"}\n" (class: String) instead.
- expected params[:records][1] to be a hash, got value "{\"id\":2,\"value\":\"bar\"}\n" (class: String) instead.
from /mnt/istore/apps/my_app/shared/bundle/ruby/2.7.0/gems/aws-sdk-core-3.89.1/lib/aws-sdk-core/param_validator.rb:33:in `validate!'
看来我们需要发送一个散列。
put_record_batch
的文档说:
Kinesis Data Firehose buffers records before delivering them to the destination. To disambiguate the data blobs at the destination, a common solution is to use delimiters in the data, such as a newline (
\n
) or some other character unique within the data. This allows the consumer application to parse individual data items when reading the data from the destination.
我该怎么做?
我正在使用 aws-sdk-firehose version 1.26.0 gem。
我认为问题在于我在使用 put_record_batch
时遗漏了 data
键。这似乎有效:
records = [{ id: 1, value: "foo"}, { id: 2, value: "bar" }]
json_records = records.map do |record|
# Previously this line was `record.to_json << "\n"`
{ data: record.to_json << "\n" }
end
Aws::Firehose::Client.new(
region: "us-east-1"
).put_record_batch({
delivery_stream_name: "my_firehose",
records: json_records
)