将嵌套 for 循环转换为响应式 Flux

Converting nested for loops to reactive Flux

我有一个代码逻辑需要在一系列嵌套循环中。 这是示例:

String q = null;
for(C c in getC()){
    if(Valid(c)){
        String Id = c.Id;
        List<M> mList = getM(Id);
        for(M m in mList){
            q = Message.Format("query:{}",m);
            List<D> dList = getD(Id, q);
            sendDToKafka(dList);
        }
    }
}

我正在尝试使用项目反应器将上述逻辑转换为反应式。 我到目前为止的代码:

Flux.fromIterable(getC())
    .map(c -> c.getId)
    .doOnNext(cId -> getM(cId))
    .map(m -> m.trim())
    .doOnNext(m -> getD()) // need to pass in query and Id to getD()
    .subscribe();

我面临的问题很少:

  1. 如何将 IsValid() 方法合并到查询中。
  2. 我需要在两个地方重用我在第一张地图 - .map(c -> c.getId) 上获得的 cId 值。如果在下一步中不立即使用,我如何跟踪该值。
  3. 有没有办法在反应式查询中形成 q 变量以作为参数传递给 getD()
  4. 如果代码是一种有效的方法,我将非常感谢任何反馈。

首先,doOnNext 方法是为了副作用而不是为了改变事件流。此外,如果您要将某些内容转换为反应式,则整个管道需要是非阻塞的,您应该避免调用任何阻塞的代码。如果您有无法更改的阻止代码,you can follow the advice here:

对于过滤,您可以使用 filter,对于在多个位置使用 cId,可以将其作为元组传递到链中(那里有很多库),或者您可以创建自己的 class 为此目的。

  1. How can I incorporate the IsValid() method into the query.

有一个运算符:

Flux.fromIterable(getC())
            .filter(c -> valid(c))
  1. I need to reuse the cId value I get on the first map - .map(c -> c.getId), in two places. How can I keep track of that value if not used immediately in the next step.

在特定的简单情况下,您可以像这样简单地使用嵌套 flatmap

.flatMap(id ->
        Flux.fromIterable(getM(id))
            .flatMap(m -> {
                String q = Message.Format("query: {}", m);
                List<D> dList = getD(id, q);
                return sendDToKafka(dList);
            })
    )

  1. Is there a way to form the q variable within the reactive query to pass as argument in getD()

您不能在流中使用非最终 q 变量。看看 AtomicReference<String>.

正如另一个答案中已经提到的,doOnNext 用于副作用,map 用于将某些东西从一种类型映射到另一种类型。