将数据从 Kinesis 写入 S3
Writing data from Kinesis to S3
我正在使用 AWS SDK 从 Java 将数据发布到 Kinesis 流的应用程序写入数据。这是一次分批完成的,每批 10 条记录,使用以下代码;
// Convert to JSON object, and then to bytes...
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = ow.writeValueAsString(transaction);
// Add byte array to PutRecordsRequestEntry
PutRecordsRequestEntry record = new PutRecordsRequestEntry();
record.setPartitionKey(String.valueOf(java.util.UUID.randomUUID()));
record.setData(ByteBuffer.wrap(json.getBytes()));
// Add to list...
batch.add(record);
// Check and send batches
if(counter>=batchLimit){
logger.info("Sending batch of " + batchLimit + " rows.");
putRecordsRequest.setRecords(batch);
PutRecordsResult result = amazonKinesisClient.putRecords(putRecordsRequest);
batch = new ArrayList<>();
counter=0;
}else{
counter++;
}
然后我有一个 nodejs lambda 函数,它在 Kinesis 上收到的每个事务上都被触发,我的想法是让它写入来自 Kinesis 的事务,并将它们放入流水数据流中以供保存到 S3。
var AWS = require('aws-sdk');
var firehose = new AWS.Firehose();
exports.handler = function(event, context) {
console.log(event);
var params = {
DeliveryStreamName: "transaction-postings",
Record: {
Data: decodeURIComponent(event)
}
};
firehose.putRecord(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else {
console.log(data); // successful response
}
context.done();
});
};
然而,当查看 S3 上的数据时,我看到的是以下内容,而不是我预期的 JSON 对象列表...
[object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object]
任何人都可以指出我在将数据从 Kinesis 流式传输到 s3 时缺少什么,作为 JSON 对象吗?
Data: decodeURIComponent(event)
您需要序列化事件,因为 Lambda 会自动反序列化参数。即:
Data: JSON.stringify(decodeURIComponent(event))
对于那些想知道需要更改代码的人...要将生产者发送的实际消息写入 S3,需要对 PutRecordsRequestEntry 的数据 属性 进行解码。换句话说,这些代码块显示了所使用的依赖项,即解析来自 Kinesis 流的数据的 lambda...
var AWS = require('aws-sdk');
var firehose = new AWS.Firehose();
var firehoseStreamName = "transaction-postings";
exports.handler = function(event, context) {
// This is the actual transaction, encapsulated with Kinesis Put properties
var transaction = event;
// Convert data object because this is all that we need
var buf = new Buffer(transaction.data, "base64");
// Convert to actual string which is readable
var jsonString = buf.toString("utf8");
// Prepare storage to postings firehose stream...
var params = {
DeliveryStreamName: firehoseStreamName,
Record: {
Data: jsonString
}
};
// Store data!
firehose.putRecord(params, function(err, data) {
if (err) {
// This needs to be fired to Kinesis in the future...
console.log(err, err.stack);
}
else{
console.log(data);
}
context.done();
});
};
这是因为使用下面的 AWS 生产者依赖项发送的记录
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.12.3</version>
</dependency>
看起来像这样;
{
"kinesisSchemaVersion": "1.0",
"partitionKey": "cb3ff3cd-769e-4d48-969d-918b5378e81b",
"sequenceNumber": "49571132156681255058105982949134963643939775644952428546",
"data": "[base64 string]",
"approximateArrivalTimestamp": 1490191017.614
}
我正在使用 AWS SDK 从 Java 将数据发布到 Kinesis 流的应用程序写入数据。这是一次分批完成的,每批 10 条记录,使用以下代码;
// Convert to JSON object, and then to bytes...
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = ow.writeValueAsString(transaction);
// Add byte array to PutRecordsRequestEntry
PutRecordsRequestEntry record = new PutRecordsRequestEntry();
record.setPartitionKey(String.valueOf(java.util.UUID.randomUUID()));
record.setData(ByteBuffer.wrap(json.getBytes()));
// Add to list...
batch.add(record);
// Check and send batches
if(counter>=batchLimit){
logger.info("Sending batch of " + batchLimit + " rows.");
putRecordsRequest.setRecords(batch);
PutRecordsResult result = amazonKinesisClient.putRecords(putRecordsRequest);
batch = new ArrayList<>();
counter=0;
}else{
counter++;
}
然后我有一个 nodejs lambda 函数,它在 Kinesis 上收到的每个事务上都被触发,我的想法是让它写入来自 Kinesis 的事务,并将它们放入流水数据流中以供保存到 S3。
var AWS = require('aws-sdk');
var firehose = new AWS.Firehose();
exports.handler = function(event, context) {
console.log(event);
var params = {
DeliveryStreamName: "transaction-postings",
Record: {
Data: decodeURIComponent(event)
}
};
firehose.putRecord(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else {
console.log(data); // successful response
}
context.done();
});
};
然而,当查看 S3 上的数据时,我看到的是以下内容,而不是我预期的 JSON 对象列表...
[object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object]
任何人都可以指出我在将数据从 Kinesis 流式传输到 s3 时缺少什么,作为 JSON 对象吗?
Data: decodeURIComponent(event)
您需要序列化事件,因为 Lambda 会自动反序列化参数。即:
Data: JSON.stringify(decodeURIComponent(event))
对于那些想知道需要更改代码的人...要将生产者发送的实际消息写入 S3,需要对 PutRecordsRequestEntry 的数据 属性 进行解码。换句话说,这些代码块显示了所使用的依赖项,即解析来自 Kinesis 流的数据的 lambda...
var AWS = require('aws-sdk');
var firehose = new AWS.Firehose();
var firehoseStreamName = "transaction-postings";
exports.handler = function(event, context) {
// This is the actual transaction, encapsulated with Kinesis Put properties
var transaction = event;
// Convert data object because this is all that we need
var buf = new Buffer(transaction.data, "base64");
// Convert to actual string which is readable
var jsonString = buf.toString("utf8");
// Prepare storage to postings firehose stream...
var params = {
DeliveryStreamName: firehoseStreamName,
Record: {
Data: jsonString
}
};
// Store data!
firehose.putRecord(params, function(err, data) {
if (err) {
// This needs to be fired to Kinesis in the future...
console.log(err, err.stack);
}
else{
console.log(data);
}
context.done();
});
};
这是因为使用下面的 AWS 生产者依赖项发送的记录
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.12.3</version>
</dependency>
看起来像这样;
{
"kinesisSchemaVersion": "1.0",
"partitionKey": "cb3ff3cd-769e-4d48-969d-918b5378e81b",
"sequenceNumber": "49571132156681255058105982949134963643939775644952428546",
"data": "[base64 string]",
"approximateArrivalTimestamp": 1490191017.614
}