JSON 数据超过 aws kinesis firehose put_record 限制,有解决办法吗?

JSON data exceeds aws kinesis firehose put_record limit, is there a work around?

我正在从 API 中获取流数据,然后通过 Kinesis Firehose 将原始数据发送到 S3 存储桶。有时,数据大小超过了我可以通过 firehose 发送的限制,所以我得到以下错误

botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the PutRecord operation: 1 va lidation error detected: Value at 'record.data' failed to satisfy constraint: Member must have length less than or equal to 1024000

解决这个问题的最佳方法是什么,以便我最终得到类似于原始结构的东西?我在想 buffering/chunking 的一些问题,或者我应该写入文件并直接推送到 S3 中吗?

我想通了,在 API 文档中找到以下语句:

Kinesis Data Firehose buffers records before delivering them to the destination. To disambiguate the data blobs at the destination, a common solution is to use delimiters in the data, such as a newline (\n) or some other character unique within the data. This allows the consumer application to parse individual data items when reading the data from the destination.

所以我意识到我可以以 <= 1000 kB 的块发送 json 字符串,然后发送最后一个以 '\n' 结尾的块以关闭缓冲区以确保原始完整数据结构完好无损。

然后我实现了下面的函数来检查 json 字符串的大小,如果在大小限制内则处理整个数据。如果没有,则通过 put_record_batch().

分块发送
def send_to_firehose(json_data: str, data_name: str, verbose=False):
    if len(json_data) > 1024000:
        # send json_data in chunks of 1000000 bytes or less
        start = 0
        end = 1000000
        chunk_batch = list()
        while True:
            chunk_batch.append({'Data': json_data[start:end]})
            start = end
            end += 1000000
            if end >= len(json_data):
                end = len(json_data) + 1
                chunk_batch.append({'Data': json_data[start:end] + '\n'})
                firehose_batch(
                    client=AWS_FIREHOSE_CLIENT, data_name=data_name,
                    records=chunk_batch, verbose=verbose
                )
                break
    else:
        record = {'Data': json_data + '\n'}
        firehose_put(
            client=AWS_FIREHOSE_CLIENT, data_name=data_name,
            record=record, verbose=verbose
        )