将数据从 Kinesis 写入 S3

Writing data from Kinesis to S3

我正在使用 AWS SDK 从 Java 将数据发布到 Kinesis 流的应用程序写入数据。这是一次分批完成的,每批 10 条记录,使用以下代码;

// Convert to JSON object, and then to bytes...
                ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
                String json = ow.writeValueAsString(transaction);

                // Add byte array to PutRecordsRequestEntry
                PutRecordsRequestEntry record = new PutRecordsRequestEntry();
                record.setPartitionKey(String.valueOf(java.util.UUID.randomUUID()));
                record.setData(ByteBuffer.wrap(json.getBytes()));

                // Add to list...
                batch.add(record);

                // Check and send batches
                if(counter>=batchLimit){

                    logger.info("Sending batch of " + batchLimit + " rows.");

                    putRecordsRequest.setRecords(batch);
                    PutRecordsResult result = amazonKinesisClient.putRecords(putRecordsRequest);
                    batch = new ArrayList<>();
                    counter=0;

                }else{
                    counter++;
                }

然后我有一个 nodejs lambda 函数,它在 Kinesis 上收到的每个事务上都被触发,我的想法是让它写入来自 Kinesis 的事务,并将它们放入流水数据流中以供保存到 S3。

    var AWS = require('aws-sdk');
var firehose = new AWS.Firehose();

exports.handler = function(event, context) {

    console.log(event);

    var params = {
        DeliveryStreamName: "transaction-postings",
        Record: { 
            Data:  decodeURIComponent(event)
        }
    };
    firehose.putRecord(params, function(err, data) {
        if (err) console.log(err, err.stack); // an error occurred
        else    {  
            console.log(data);           // successful response
        }

        context.done();
    });
};

然而,当查看 S3 上的数据时,我看到的是以下内容,而不是我预期的 JSON 对象列表...

[object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object]

任何人都可以指出我在将数据从 Kinesis 流式传输到 s3 时缺少什么,作为 JSON 对象吗?

Data:  decodeURIComponent(event)

您需要序列化事件,因为 Lambda 会自动反序列化参数。即:

Data: JSON.stringify(decodeURIComponent(event))

对于那些想知道需要更改代码的人...要将生产者发送的实际消息写入 S3,需要对 PutRecordsRequestEntry 的数据 属性 进行解码。换句话说,这些代码块显示了所使用的依赖项,即解析来自 Kinesis 流的数据的 lambda...

var AWS = require('aws-sdk');
var firehose = new AWS.Firehose();
var firehoseStreamName = "transaction-postings";

exports.handler = function(event, context) {

    // This is the actual transaction, encapsulated with Kinesis Put properties
    var transaction = event;

    // Convert data object because this is all that we need
    var buf = new Buffer(transaction.data, "base64");

    // Convert to actual string which is readable
    var jsonString = buf.toString("utf8");

    // Prepare storage to postings firehose stream...
    var params = { 
        DeliveryStreamName: firehoseStreamName, 
        Record: { 
            Data:  jsonString
        }
    };

    // Store data!
    firehose.putRecord(params, function(err, data) {
        if (err) {

            // This needs to be fired to Kinesis in the future...
            console.log(err, err.stack); 
        }
        else{  
            console.log(data);            
        }

        context.done();
    });
};

这是因为使用下面的 AWS 生产者依赖项发送的记录

<dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>amazon-kinesis-producer</artifactId>
        <version>0.12.3</version>
    </dependency>

看起来像这样;

{
  "kinesisSchemaVersion": "1.0",
  "partitionKey": "cb3ff3cd-769e-4d48-969d-918b5378e81b",
  "sequenceNumber": "49571132156681255058105982949134963643939775644952428546",
  "data": "[base64 string]",
  "approximateArrivalTimestamp": 1490191017.614
}