使用 KCL 2.x 从 KinesisStream 获取 DynamoDBEvents
Using KCL 2.x to get DynamoDBEvents from KinesisStream
从我使用 KCL 客户端获得的 KinesisClientRecord(记录),我能够通过以下操作从流中获取表示 ddb 更新事件的 JSON 对象:
String recordData = StandardCharsets.UTF_8.decode(record.data()).toString();
JSONObject kinesisRecordObject = new JSONObject(recordData);
现在我想访问我从 dynamoDB 获得的字段,但我无法将其反序列化为 DynamoDB JSON 而不是本 post 中解释的标准 post =12=]。
那里给出的解决方案适用于 DynamoDBStreamRecord 但不适用于 KinesisClientRecord(我正在使用它)你能告诉我这一步是如何实现的吗?
对于KCL 1.x 我发现可以使用KCL Adapter,但是我用的是KCL 2.x!
如何反序列化它以获取 DDB 更新中的字段?
使用 KCL 2 时,您可以从 Kinesis 获取 CommittableRecord 对象。您需要从中提取 UTF 8 编码的 records().data()(默认情况下)。解码后得到的JSON字符串可以转换成你想要的POJO
像这样
String recordData =
new StringBuffer(StandardCharsets.UTF_8.decode(committableRecordObject.record().data()))
.toString();
从我使用 KCL 客户端获得的 KinesisClientRecord(记录),我能够通过以下操作从流中获取表示 ddb 更新事件的 JSON 对象:
String recordData = StandardCharsets.UTF_8.decode(record.data()).toString();
JSONObject kinesisRecordObject = new JSONObject(recordData);
现在我想访问我从 dynamoDB 获得的字段,但我无法将其反序列化为 DynamoDB JSON 而不是本 post 中解释的标准 post =12=]。 那里给出的解决方案适用于 DynamoDBStreamRecord 但不适用于 KinesisClientRecord(我正在使用它)你能告诉我这一步是如何实现的吗? 对于KCL 1.x 我发现可以使用KCL Adapter,但是我用的是KCL 2.x!
如何反序列化它以获取 DDB 更新中的字段?
使用 KCL 2 时,您可以从 Kinesis 获取 CommittableRecord 对象。您需要从中提取 UTF 8 编码的 records().data()(默认情况下)。解码后得到的JSON字符串可以转换成你想要的POJO
像这样
String recordData =
new StringBuffer(StandardCharsets.UTF_8.decode(committableRecordObject.record().data()))
.toString();