Kafka -> Flink 数据流 -> MongoDB
Kafka -> Flink DataStream -> MongoDB
我想设置 Flink,以便它将数据流从 Apache Kafka 转换并重定向到 MongoDB。出于测试目的,我在 flink-streaming-connectors.kafka 示例 (https://github.com/apache/flink).
之上构建
Kafka 流被 Flink 正确地标记为红色,我可以将它们映射等,但是当我想将每个收到和转换的消息保存到 MongoDB 时,问题就出现了。我发现的关于 MongoDB 集成的唯一示例是来自 github 的 flink-mongodb-test。不幸的是它使用静态数据源(数据库),而不是数据流。
我认为 MongoDB 应该有一些 DataStream.addSink 实现,但显然没有。
实现它的最佳方法是什么?我是否需要编写自定义接收器功能,或者我可能遗漏了什么?也许应该以不同的方式完成?
我不依赖于任何解决方案,因此我们将不胜感激。
下面是一个示例,说明我究竟得到了什么作为输入以及我需要存储什么作为输出。
Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>
Flink: DataStream.map({
return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection
正如你在这个例子中看到的,我主要使用 Flink 来进行 Kafka 的消息流缓冲和一些基本的解析。
目前 Flink 中没有可用的 Streaming MongoDB sink。
但是,有两种方法可以将数据写入MongoDB:
使用Flink的DataStream.write()
调用。它允许您将任何 OutputFormat(来自 Batch API)用于流式传输。使用Flink的HadoopOutputFormatWrapper,可以使用官方的MongoDB Hadoop connector
自己实现接收器。使用 Streaming API 实现接收器非常容易,我确信 MongoDB 有一个很好的 Java 客户端库。
这两种方法都不提供任何复杂的处理保证。但是,当您将 Flink 与 Kafka(并启用检查点)一起使用时,您将拥有至少一次语义:在错误情况下,数据将再次流式传输到 MongoDB 接收器。
如果您正在进行幂等更新,重做这些更新应该不会导致任何不一致。
如果您真的需要 MongoDB 的精确一次语义,您应该提交 JIRA in Flink 并与社区讨论如何实现它。
作为 Robert Metzger 回答的替代方法,您可以将结果再次写入 Kafka,然后使用维护的 Kafka 连接器之一将主题内容放入 MongoDB 数据库。
Kafka -> Flink -> Kafka -> Mongo/Anything
通过这种方法,您可以保持 "at-least-once semantics" 行为。
我想设置 Flink,以便它将数据流从 Apache Kafka 转换并重定向到 MongoDB。出于测试目的,我在 flink-streaming-connectors.kafka 示例 (https://github.com/apache/flink).
之上构建Kafka 流被 Flink 正确地标记为红色,我可以将它们映射等,但是当我想将每个收到和转换的消息保存到 MongoDB 时,问题就出现了。我发现的关于 MongoDB 集成的唯一示例是来自 github 的 flink-mongodb-test。不幸的是它使用静态数据源(数据库),而不是数据流。
我认为 MongoDB 应该有一些 DataStream.addSink 实现,但显然没有。
实现它的最佳方法是什么?我是否需要编写自定义接收器功能,或者我可能遗漏了什么?也许应该以不同的方式完成?
我不依赖于任何解决方案,因此我们将不胜感激。
下面是一个示例,说明我究竟得到了什么作为输入以及我需要存储什么作为输出。
Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>
Flink: DataStream.map({
return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection
正如你在这个例子中看到的,我主要使用 Flink 来进行 Kafka 的消息流缓冲和一些基本的解析。
目前 Flink 中没有可用的 Streaming MongoDB sink。
但是,有两种方法可以将数据写入MongoDB:
使用Flink的
DataStream.write()
调用。它允许您将任何 OutputFormat(来自 Batch API)用于流式传输。使用Flink的HadoopOutputFormatWrapper,可以使用官方的MongoDB Hadoop connector自己实现接收器。使用 Streaming API 实现接收器非常容易,我确信 MongoDB 有一个很好的 Java 客户端库。
这两种方法都不提供任何复杂的处理保证。但是,当您将 Flink 与 Kafka(并启用检查点)一起使用时,您将拥有至少一次语义:在错误情况下,数据将再次流式传输到 MongoDB 接收器。 如果您正在进行幂等更新,重做这些更新应该不会导致任何不一致。
如果您真的需要 MongoDB 的精确一次语义,您应该提交 JIRA in Flink 并与社区讨论如何实现它。
作为 Robert Metzger 回答的替代方法,您可以将结果再次写入 Kafka,然后使用维护的 Kafka 连接器之一将主题内容放入 MongoDB 数据库。
Kafka -> Flink -> Kafka -> Mongo/Anything
通过这种方法,您可以保持 "at-least-once semantics" 行为。