Hazelcast Jet - 将列表排出到流中
Hazelcast Jet - drain the list to a stream
我的 Jet 作业正在转换 Redis 流数据 - 转换是 - 我为流中的每个项目查找 map
- 如果发现它包含一个或多个项目(列表)。我想将排水流中的项目写成单独的项目,而不是列表的一个元素。
我的代码有效,但是它将列表作为单个项目写入另一个 Redis 流 - 我需要的是将列表的每个元素分别写入流(以便其他作业可以独立处理项目)。
代码
pipeline.drawFrom(RedisSources.stream("source", uri, "payloads", "$"))
.withIngestionTimestamps()
.groupingKey(k -> k.get("eventType"))
.mapUsingContext(lookupService(), (svc, event, item) -> svc.findHooks(event) /*returns list*/)
.drainTo(RedisSinks.stream("drain", uri, "hooks"));
因此,服务返回的列表应该作为单独的元素写入输出流中。
我可以用什么 api 来发射每个物品?我在文档中找不到太多内容。
要将一项映射到多项,您需要使用 flat map 转换而不是简单的映射转换。
示例如下:
pipeline.drawFrom(RedisSources.stream("source", uri, "payloads", "$"))
.withIngestionTimestamps()
.groupingKey(k -> k.get("eventType"))
.flatMapUsingContext(lookupService(), (svc, event, item) -> Traversers.traverseIterable(svc.findHooks(event)) /*returns list*/)
.drainTo(RedisSinks.stream("drain", uri, "hooks"));
我的 Jet 作业正在转换 Redis 流数据 - 转换是 - 我为流中的每个项目查找 map
- 如果发现它包含一个或多个项目(列表)。我想将排水流中的项目写成单独的项目,而不是列表的一个元素。
我的代码有效,但是它将列表作为单个项目写入另一个 Redis 流 - 我需要的是将列表的每个元素分别写入流(以便其他作业可以独立处理项目)。
代码
pipeline.drawFrom(RedisSources.stream("source", uri, "payloads", "$"))
.withIngestionTimestamps()
.groupingKey(k -> k.get("eventType"))
.mapUsingContext(lookupService(), (svc, event, item) -> svc.findHooks(event) /*returns list*/)
.drainTo(RedisSinks.stream("drain", uri, "hooks"));
因此,服务返回的列表应该作为单独的元素写入输出流中。
我可以用什么 api 来发射每个物品?我在文档中找不到太多内容。
要将一项映射到多项,您需要使用 flat map 转换而不是简单的映射转换。
示例如下:
pipeline.drawFrom(RedisSources.stream("source", uri, "payloads", "$"))
.withIngestionTimestamps()
.groupingKey(k -> k.get("eventType"))
.flatMapUsingContext(lookupService(), (svc, event, item) -> Traversers.traverseIterable(svc.findHooks(event)) /*returns list*/)
.drainTo(RedisSinks.stream("drain", uri, "hooks"));