如何在 Spark 流应用程序中处理 DynamoDB Stream

How to process DynamoDB Stream in a Spark streaming application

我想使用来自 Spark Streaming 应用程序的 DynamoDB 流。

Spark Streaming 使用 KCL 从 Kinesis 读取。有一个库可以使 KCL 能够从 DynamoDB 流中读取:dynamodb-streams-kinesis-adapter.

但是可以将这个库插入到 spark 中吗?有人这样做过吗?

我正在使用 Spark 2.1.0。

我的备份计划是让另一个应用程序从 DynamoDB 流读取到 Kinesis 流。

谢谢

执行此操作的方法是实现 KinesisInputDStream 以使用 dynamodb-streams-kinesis-adapter 提供的工作程序 official guidelines 建议是这样的:

final Worker worker = StreamsWorkerFactory .createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);

从Spark的角度来看,它是在KinesisInputDStream.scala

中的kinesis-asl模块下实现的

我已经在 Spark 2.4.0 上试过了。这是我的回购协议。它几乎不需要改进,但可以完成工作

https://github.com/ravi72munde/spark-dynamo-stream-asl

修改好KinesisInputDStream后,我们就可以使用了,如下图。 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName("sample-tablename-2") .regionName("us-east-1") .initialPosition(new Latest()) .checkpointAppName("sample-app") .checkpointInterval(Milliseconds(100)) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()