如何在 Spark 流应用程序中处理 DynamoDB Stream
How to process DynamoDB Stream in a Spark streaming application
我想使用来自 Spark Streaming 应用程序的 DynamoDB 流。
Spark Streaming 使用 KCL 从 Kinesis 读取。有一个库可以使 KCL 能够从 DynamoDB 流中读取:dynamodb-streams-kinesis-adapter.
但是可以将这个库插入到 spark 中吗?有人这样做过吗?
我正在使用 Spark 2.1.0。
我的备份计划是让另一个应用程序从 DynamoDB 流读取到 Kinesis 流。
谢谢
执行此操作的方法是实现 KinesisInputDStream 以使用 dynamodb-streams-kinesis-adapter
提供的工作程序
official guidelines 建议是这样的:
final Worker worker = StreamsWorkerFactory
.createDynamoDbStreamsWorker(
recordProcessorFactory,
workerConfig,
adapterClient,
amazonDynamoDB,
amazonCloudWatchClient);
从Spark的角度来看,它是在KinesisInputDStream.scala
中的kinesis-asl模块下实现的
我已经在 Spark 2.4.0 上试过了。这是我的回购协议。它几乎不需要改进,但可以完成工作
https://github.com/ravi72munde/spark-dynamo-stream-asl
修改好KinesisInputDStream后,我们就可以使用了,如下图。
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("sample-tablename-2")
.regionName("us-east-1")
.initialPosition(new Latest())
.checkpointAppName("sample-app")
.checkpointInterval(Milliseconds(100))
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
我想使用来自 Spark Streaming 应用程序的 DynamoDB 流。
Spark Streaming 使用 KCL 从 Kinesis 读取。有一个库可以使 KCL 能够从 DynamoDB 流中读取:dynamodb-streams-kinesis-adapter.
但是可以将这个库插入到 spark 中吗?有人这样做过吗?
我正在使用 Spark 2.1.0。
我的备份计划是让另一个应用程序从 DynamoDB 流读取到 Kinesis 流。
谢谢
执行此操作的方法是实现 KinesisInputDStream 以使用 dynamodb-streams-kinesis-adapter
提供的工作程序
official guidelines 建议是这样的:
final Worker worker = StreamsWorkerFactory
.createDynamoDbStreamsWorker(
recordProcessorFactory,
workerConfig,
adapterClient,
amazonDynamoDB,
amazonCloudWatchClient);
从Spark的角度来看,它是在KinesisInputDStream.scala
中的kinesis-asl模块下实现的我已经在 Spark 2.4.0 上试过了。这是我的回购协议。它几乎不需要改进,但可以完成工作
https://github.com/ravi72munde/spark-dynamo-stream-asl
修改好KinesisInputDStream后,我们就可以使用了,如下图。
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("sample-tablename-2")
.regionName("us-east-1")
.initialPosition(new Latest())
.checkpointAppName("sample-app")
.checkpointInterval(Milliseconds(100))
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()