使用 AWS Java DynamoDB 流 Kinesis 适配器处理 DynamoDB 流
Processing DynamoDB streams using the AWS Java DynamoDB streams Kinesis adapter
我正在尝试使用 DynamoDB 流和 AWS 提供的 Java DynamoDB 流 Kinesis 适配器捕获 DynamoDB table 更改。我在 Scala 应用程序中使用 AWS Java SDK。
我开始关注 AWS guide and by going through the AWS published code example。但是,我在让亚马逊自己发布的代码在我的环境中运行时遇到了问题。我的问题在于 KinesisClientLibConfiguration
对象。
在示例代码中,KinesisClientLibConfiguration
配置了DynamoDB提供的流ARN。
new KinesisClientLibConfiguration("streams-adapter-demo",
streamArn,
streamsCredentials,
"streams-demo-worker")
我在我的 Scala 应用程序中采用了类似的模式,首先从我的 Dynamo table:
中找到当前的 ARN
lazy val streamArn = dynamoClient.describeTable(config.tableName)
.getTable.getLatestStreamArn
然后使用提供的 ARN 创建 KinesisClientLibConfiguration
:
lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
"testProcess",
streamArn,
defaultProviderChain,
"testWorker"
).withMaxRecords(1000)
.withRegionName("eu-west-1")
.withMetricsLevel(MetricsLevel.NONE)
.withIdleTimeBetweenReadsInMillis(500)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
我已经验证了提供的流 ARN,一切都与我在 AWS 控制台中看到的相符。
在运行时,我最终收到一个异常,指出提供的 ARN 不是有效的流名称:
com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call
SEVERE: Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at
'streamName' failed to satisfy constraint: Member must satisfy regular
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code:
400; Error Code: ValidationException; Request ID: )
查看 KinesisClientLibConfiguration
上提供的文档,这确实有意义,因为第二个参数被列为 streamName,但没有提及 ARN。
我似乎在 KinesisClientLibConfiguration
上找不到任何与 ARN 相关的内容。由于我使用的是 DynamoDB 流而不是 Kinesis 流,所以我也不确定如何找到我的流名称。
此时我不确定我从已发布的 AWS 示例中遗漏了什么,他们似乎使用的是更旧版本的 KCL。我使用的是 amazon-kinesis-client 1.7.0 版。
这个问题实际上超出了我的范围 KinesisClientLibConfiguration
。
通过使用相同的配置并提供 DynamoDB 流适配器库中包含的流适配器以及 DynamoDB 和 CloudWatch 的客户端,我能够解决这个问题。
我的工作解决方案现在看起来像这样。
正在定义 Kinesis 客户端配置。
//Kinesis config for DynamoDB streams
lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
getClass.getName, //DynamoDB shard lease table name
streamArn, //pulled from the dynamo table at runtime
dynamoCredentials, //DefaultAWSCredentialsProviderChain
KeywordTrackingActor.NAME //Lease owner name
).withMaxRecords(1000) //using AWS recommended value
.withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
定义流适配器和 CloudWatch 客户端
val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials)
streamAdapterClient.setRegion(region)
val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials)
cloudWatchClient.setRegion(region)
创建一个 RecordProcessorFactory
的实例,由您定义一个 class 来实现提供的 KCL IRecordProcessorFactory
和返回的 IRecordProcessor
.
val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName)
还有我遗漏的部分,所有这些都需要提供给你的工人。
val worker :Worker =
new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(kinesisConfig)
.kinesisClient(streamAdapterClient)
.dynamoDBClient(dynamoClient)
.cloudWatchClient(cloudWatchClient)
.build()
//this will start record processing
streamExecutorService.submit(worker)
或者,您可以使用 com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker
而不是 com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
它在内部使用 AmazonDynamoDBStreamsAdapterClient
.
即
lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
getClass.getName, //DynamoDB shard lease table name
streamArn, //pulled from the dynamo table at runtime
dynamoCredentials, //DefaultAWSCredentialsProviderChain
KeywordTrackingActor.NAME //Lease owner name
).withMaxRecords(1000) //using AWS recommended value
.withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
val worker = new com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker(recordProcessorFactory, kinesisConfig)
只是为了回答问题所在 - 当它只需要流名称时您提供了 ARN。
我最近对该项目进行了 PR gfc-aws-kinesis,您现在只需传递适配器并编写 KinesisRecordAdapter 实现即可使用它。
在示例中,我使用 Scanamo 来解析 hashmap
创建客户端
val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
new AmazonDynamoDBStreamsAdapterClient()
在配置中传递:
val streamConfig = KinesisStreamConsumerConfig[Option[A]](
applicationName,
config.stream, //the full dynamodb stream arn
regionName = Some(config.region),
checkPointInterval = config.checkpointInterval,
initialPositionInStream = config.streamPosition,
dynamoDBKinesisAdapterClient = Some(streamAdapterClient)
)
KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)
创建适合读取dynamodb事件的隐式记录reader:
implicit val kinesisRecordReader
: KinesisRecordReader[Option[A]] =
new KinesisRecordReader[Option[A]] {
override def apply(record: Record): Option[A] = {
record match {
case recordAdapter: RecordAdapter =>
val dynamoRecord: DynamoRecord =
recordAdapter.getInternalObject
dynamoRecord.getEventName match {
case "INSERT" =>
ScanamoFree
.read[A](
dynamoRecord.getDynamodb.getNewImage)
.toOption
case _ => None
}
case _ => None
}
}
}
我正在尝试使用 DynamoDB 流和 AWS 提供的 Java DynamoDB 流 Kinesis 适配器捕获 DynamoDB table 更改。我在 Scala 应用程序中使用 AWS Java SDK。
我开始关注 AWS guide and by going through the AWS published code example。但是,我在让亚马逊自己发布的代码在我的环境中运行时遇到了问题。我的问题在于 KinesisClientLibConfiguration
对象。
在示例代码中,KinesisClientLibConfiguration
配置了DynamoDB提供的流ARN。
new KinesisClientLibConfiguration("streams-adapter-demo",
streamArn,
streamsCredentials,
"streams-demo-worker")
我在我的 Scala 应用程序中采用了类似的模式,首先从我的 Dynamo table:
中找到当前的 ARNlazy val streamArn = dynamoClient.describeTable(config.tableName)
.getTable.getLatestStreamArn
然后使用提供的 ARN 创建 KinesisClientLibConfiguration
:
lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
"testProcess",
streamArn,
defaultProviderChain,
"testWorker"
).withMaxRecords(1000)
.withRegionName("eu-west-1")
.withMetricsLevel(MetricsLevel.NONE)
.withIdleTimeBetweenReadsInMillis(500)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
我已经验证了提供的流 ARN,一切都与我在 AWS 控制台中看到的相符。
在运行时,我最终收到一个异常,指出提供的 ARN 不是有效的流名称:
com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call
SEVERE: Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at
'streamName' failed to satisfy constraint: Member must satisfy regular
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code:
400; Error Code: ValidationException; Request ID: )
查看 KinesisClientLibConfiguration
上提供的文档,这确实有意义,因为第二个参数被列为 streamName,但没有提及 ARN。
我似乎在 KinesisClientLibConfiguration
上找不到任何与 ARN 相关的内容。由于我使用的是 DynamoDB 流而不是 Kinesis 流,所以我也不确定如何找到我的流名称。
此时我不确定我从已发布的 AWS 示例中遗漏了什么,他们似乎使用的是更旧版本的 KCL。我使用的是 amazon-kinesis-client 1.7.0 版。
这个问题实际上超出了我的范围 KinesisClientLibConfiguration
。
通过使用相同的配置并提供 DynamoDB 流适配器库中包含的流适配器以及 DynamoDB 和 CloudWatch 的客户端,我能够解决这个问题。
我的工作解决方案现在看起来像这样。
正在定义 Kinesis 客户端配置。
//Kinesis config for DynamoDB streams
lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
getClass.getName, //DynamoDB shard lease table name
streamArn, //pulled from the dynamo table at runtime
dynamoCredentials, //DefaultAWSCredentialsProviderChain
KeywordTrackingActor.NAME //Lease owner name
).withMaxRecords(1000) //using AWS recommended value
.withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
定义流适配器和 CloudWatch 客户端
val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials)
streamAdapterClient.setRegion(region)
val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials)
cloudWatchClient.setRegion(region)
创建一个 RecordProcessorFactory
的实例,由您定义一个 class 来实现提供的 KCL IRecordProcessorFactory
和返回的 IRecordProcessor
.
val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName)
还有我遗漏的部分,所有这些都需要提供给你的工人。
val worker :Worker =
new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(kinesisConfig)
.kinesisClient(streamAdapterClient)
.dynamoDBClient(dynamoClient)
.cloudWatchClient(cloudWatchClient)
.build()
//this will start record processing
streamExecutorService.submit(worker)
或者,您可以使用 com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker
而不是 com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
它在内部使用 AmazonDynamoDBStreamsAdapterClient
.
即
lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
getClass.getName, //DynamoDB shard lease table name
streamArn, //pulled from the dynamo table at runtime
dynamoCredentials, //DefaultAWSCredentialsProviderChain
KeywordTrackingActor.NAME //Lease owner name
).withMaxRecords(1000) //using AWS recommended value
.withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
val worker = new com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker(recordProcessorFactory, kinesisConfig)
只是为了回答问题所在 - 当它只需要流名称时您提供了 ARN。
我最近对该项目进行了 PR gfc-aws-kinesis,您现在只需传递适配器并编写 KinesisRecordAdapter 实现即可使用它。
在示例中,我使用 Scanamo 来解析 hashmap
创建客户端
val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
new AmazonDynamoDBStreamsAdapterClient()
在配置中传递:
val streamConfig = KinesisStreamConsumerConfig[Option[A]](
applicationName,
config.stream, //the full dynamodb stream arn
regionName = Some(config.region),
checkPointInterval = config.checkpointInterval,
initialPositionInStream = config.streamPosition,
dynamoDBKinesisAdapterClient = Some(streamAdapterClient)
)
KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)
创建适合读取dynamodb事件的隐式记录reader:
implicit val kinesisRecordReader
: KinesisRecordReader[Option[A]] =
new KinesisRecordReader[Option[A]] {
override def apply(record: Record): Option[A] = {
record match {
case recordAdapter: RecordAdapter =>
val dynamoRecord: DynamoRecord =
recordAdapter.getInternalObject
dynamoRecord.getEventName match {
case "INSERT" =>
ScanamoFree
.read[A](
dynamoRecord.getDynamodb.getNewImage)
.toOption
case _ => None
}
case _ => None
}
}
}