是否可以允许(或扩展)IO 连接器以便它们可以读取以前的 PCollection 项目?
Is it possible to allow (or extend) IO connectors so they can read previous PCollection items?
问题可能不像我想的那么清楚,但一开始解释我想在这里实现的目标很复杂。
有一个 Window,是否可以从一个全局的 windows 中“缩小”windows 并分别触发每个 windows?一点伪代码来澄清一下。
pipeline
.apply("InputStream", stream)
.apply("3600s windowDuration",
Window.into(FixedWindows.of(Duration.standardSeconds(3600)))
)
.apply("/// groupBy 'timestamp, store_id and collection_name'? ///", ...)
.apply("Write to MongoDB",
MongoDbIO
.write()
.withCollection(msg -> msg.getCollection()) // Admits a String but doesn't admit reading from the previous typed PCollection.
);
是否可以根据消息数据应用 MongoDB 写入?或者至少配置 class 这样做?一开始似乎不可能,所以我不知道是否有任何其他与 Beam 相关的解决方法可以允许多次插入。
我的想法是在全局 3600s 内部有较小的内部 windows,然后再应用插入。 FileIO
用它的 .by
方法做的事情。
withCollection 函数中的“集合”是一个 MongoDB 术语,与 Beam 的 PCollections 无关。
MongoDbIO.write 转换只会将其输入 PCollection 的内容写入 MongoDB。您可以将它应用于管道中的任何 PCollection,并且您也可以自由更改管道内的窗口。
如果我没理解错的话,您想根据对象上列出的集合写入 Mongo 集合,对吗?
很遗憾,目前看来是不可能的(参见Javadoc for MongoDBIO)。
如果集合列表相对较小,并且已知先验,那么您可以将事件路由到多个MongoDBIO.write
转换,但这可能是不可能的。有点像这样:
myelements.apply(
Filter.by(msg -> msg.getCollection().equals(COLLECTION1))
.apply("Write to MongoDB - coll1",
MongoDbIO
.write()
.withCollection(COLLECTION1));
myelements.apply(
Filter.by(msg -> msg.getCollection().equals(COLLECTION2))
.apply("Write to MongoDB - coll2",
MongoDbIO
.write()
.withCollection(COLLECTION2));
如果这对您不起作用,那么您可能需要编写自己的逻辑来写入 MongoDB。如果您需要自己编写,我建议您使用 GroupIntoBatches
为每个 MongoDB 集合创建一批元素,然后将它们写出来。
问题可能不像我想的那么清楚,但一开始解释我想在这里实现的目标很复杂。
有一个 Window,是否可以从一个全局的 windows 中“缩小”windows 并分别触发每个 windows?一点伪代码来澄清一下。
pipeline
.apply("InputStream", stream)
.apply("3600s windowDuration",
Window.into(FixedWindows.of(Duration.standardSeconds(3600)))
)
.apply("/// groupBy 'timestamp, store_id and collection_name'? ///", ...)
.apply("Write to MongoDB",
MongoDbIO
.write()
.withCollection(msg -> msg.getCollection()) // Admits a String but doesn't admit reading from the previous typed PCollection.
);
是否可以根据消息数据应用 MongoDB 写入?或者至少配置 class 这样做?一开始似乎不可能,所以我不知道是否有任何其他与 Beam 相关的解决方法可以允许多次插入。
我的想法是在全局 3600s 内部有较小的内部 windows,然后再应用插入。 FileIO
用它的 .by
方法做的事情。
withCollection 函数中的“集合”是一个 MongoDB 术语,与 Beam 的 PCollections 无关。
MongoDbIO.write 转换只会将其输入 PCollection 的内容写入 MongoDB。您可以将它应用于管道中的任何 PCollection,并且您也可以自由更改管道内的窗口。
如果我没理解错的话,您想根据对象上列出的集合写入 Mongo 集合,对吗?
很遗憾,目前看来是不可能的(参见Javadoc for MongoDBIO)。
如果集合列表相对较小,并且已知先验,那么您可以将事件路由到多个MongoDBIO.write
转换,但这可能是不可能的。有点像这样:
myelements.apply(
Filter.by(msg -> msg.getCollection().equals(COLLECTION1))
.apply("Write to MongoDB - coll1",
MongoDbIO
.write()
.withCollection(COLLECTION1));
myelements.apply(
Filter.by(msg -> msg.getCollection().equals(COLLECTION2))
.apply("Write to MongoDB - coll2",
MongoDbIO
.write()
.withCollection(COLLECTION2));
如果这对您不起作用,那么您可能需要编写自己的逻辑来写入 MongoDB。如果您需要自己编写,我建议您使用 GroupIntoBatches
为每个 MongoDB 集合创建一批元素,然后将它们写出来。