JSON 数据超过 aws kinesis firehose put_record 限制,有解决办法吗?
JSON data exceeds aws kinesis firehose put_record limit, is there a work around?
我正在从 API 中获取流数据,然后通过 Kinesis Firehose 将原始数据发送到 S3 存储桶。有时,数据大小超过了我可以通过 firehose 发送的限制,所以我得到以下错误
botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the PutRecord operation: 1 va
lidation error detected: Value at 'record.data' failed to satisfy constraint: Member must have length less than or
equal to 1024000
解决这个问题的最佳方法是什么,以便我最终得到类似于原始结构的东西?我在想 buffering/chunking 的一些问题,或者我应该写入文件并直接推送到 S3 中吗?
我想通了,在 API 文档中找到以下语句:
Kinesis Data Firehose buffers records before delivering them to the destination. To disambiguate the data blobs at the destination, a common solution is to use delimiters in the data, such as a newline (\n) or some other character unique within the data. This allows the consumer application to parse individual data items when reading the data from the destination.
所以我意识到我可以以 <= 1000 kB 的块发送 json 字符串,然后发送最后一个以 '\n' 结尾的块以关闭缓冲区以确保原始完整数据结构完好无损。
然后我实现了下面的函数来检查 json 字符串的大小,如果在大小限制内则处理整个数据。如果没有,则通过 put_record_batch().
分块发送
def send_to_firehose(json_data: str, data_name: str, verbose=False):
if len(json_data) > 1024000:
# send json_data in chunks of 1000000 bytes or less
start = 0
end = 1000000
chunk_batch = list()
while True:
chunk_batch.append({'Data': json_data[start:end]})
start = end
end += 1000000
if end >= len(json_data):
end = len(json_data) + 1
chunk_batch.append({'Data': json_data[start:end] + '\n'})
firehose_batch(
client=AWS_FIREHOSE_CLIENT, data_name=data_name,
records=chunk_batch, verbose=verbose
)
break
else:
record = {'Data': json_data + '\n'}
firehose_put(
client=AWS_FIREHOSE_CLIENT, data_name=data_name,
record=record, verbose=verbose
)
我正在从 API 中获取流数据,然后通过 Kinesis Firehose 将原始数据发送到 S3 存储桶。有时,数据大小超过了我可以通过 firehose 发送的限制,所以我得到以下错误
botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the PutRecord operation: 1 va lidation error detected: Value at 'record.data' failed to satisfy constraint: Member must have length less than or equal to 1024000
解决这个问题的最佳方法是什么,以便我最终得到类似于原始结构的东西?我在想 buffering/chunking 的一些问题,或者我应该写入文件并直接推送到 S3 中吗?
我想通了,在 API 文档中找到以下语句:
Kinesis Data Firehose buffers records before delivering them to the destination. To disambiguate the data blobs at the destination, a common solution is to use delimiters in the data, such as a newline (\n) or some other character unique within the data. This allows the consumer application to parse individual data items when reading the data from the destination.
所以我意识到我可以以 <= 1000 kB 的块发送 json 字符串,然后发送最后一个以 '\n' 结尾的块以关闭缓冲区以确保原始完整数据结构完好无损。
然后我实现了下面的函数来检查 json 字符串的大小,如果在大小限制内则处理整个数据。如果没有,则通过 put_record_batch().
分块发送def send_to_firehose(json_data: str, data_name: str, verbose=False):
if len(json_data) > 1024000:
# send json_data in chunks of 1000000 bytes or less
start = 0
end = 1000000
chunk_batch = list()
while True:
chunk_batch.append({'Data': json_data[start:end]})
start = end
end += 1000000
if end >= len(json_data):
end = len(json_data) + 1
chunk_batch.append({'Data': json_data[start:end] + '\n'})
firehose_batch(
client=AWS_FIREHOSE_CLIENT, data_name=data_name,
records=chunk_batch, verbose=verbose
)
break
else:
record = {'Data': json_data + '\n'}
firehose_put(
client=AWS_FIREHOSE_CLIENT, data_name=data_name,
record=record, verbose=verbose
)