Hazelcast Jet - 将列表排出到流中

Hazelcast Jet - drain the list to a stream

我的 Jet 作业正在转换 Redis 流数据 - 转换是 - 我为流中的每个项目查找 map - 如果发现它包含一个或多个项目(列表)。我想将排水流中的项目写成单独的项目,而不是列表的一个元素。

我的代码有效,但是它将列表作为单个项目写入另一个 Redis 流 - 我需要的是将列表的每个元素分别写入流(以便其他作业可以独立处理项目)。

代码

pipeline.drawFrom(RedisSources.stream("source", uri, "payloads", "$"))
        .withIngestionTimestamps()
        .groupingKey(k -> k.get("eventType"))
        .mapUsingContext(lookupService(), (svc, event, item) -> svc.findHooks(event) /*returns list*/) 
        .drainTo(RedisSinks.stream("drain", uri, "hooks"));

因此,服务返回的列表应该作为单独的元素写入输出流中。

我可以用什么 api 来发射每个物品?我在文档中找不到太多内容。

要将一项映射到多项,您需要使用 flat map 转换而不是简单的映射转换。

示例如下:

pipeline.drawFrom(RedisSources.stream("source", uri, "payloads", "$"))
        .withIngestionTimestamps()
        .groupingKey(k -> k.get("eventType"))
        .flatMapUsingContext(lookupService(), (svc, event, item) -> Traversers.traverseIterable(svc.findHooks(event)) /*returns list*/) 
        .drainTo(RedisSinks.stream("drain", uri, "hooks"));