将嵌套 for 循环转换为响应式 Flux
Converting nested for loops to reactive Flux
我有一个代码逻辑需要在一系列嵌套循环中。
这是示例:
String q = null;
for(C c in getC()){
if(Valid(c)){
String Id = c.Id;
List<M> mList = getM(Id);
for(M m in mList){
q = Message.Format("query:{}",m);
List<D> dList = getD(Id, q);
sendDToKafka(dList);
}
}
}
我正在尝试使用项目反应器将上述逻辑转换为反应式。
我到目前为止的代码:
Flux.fromIterable(getC())
.map(c -> c.getId)
.doOnNext(cId -> getM(cId))
.map(m -> m.trim())
.doOnNext(m -> getD()) // need to pass in query and Id to getD()
.subscribe();
我面临的问题很少:
- 如何将 IsValid() 方法合并到查询中。
- 我需要在两个地方重用我在第一张地图 - .map(c -> c.getId) 上获得的 cId 值。如果在下一步中不立即使用,我如何跟踪该值。
- 有没有办法在反应式查询中形成 q 变量以作为参数传递给 getD()
- 如果代码是一种有效的方法,我将非常感谢任何反馈。
首先,doOnNext
方法是为了副作用而不是为了改变事件流。此外,如果您要将某些内容转换为反应式,则整个管道需要是非阻塞的,您应该避免调用任何阻塞的代码。如果您有无法更改的阻止代码,you can follow the advice here:
对于过滤,您可以使用 filter
,对于在多个位置使用 cId
,可以将其作为元组传递到链中(那里有很多库),或者您可以创建自己的 class 为此目的。
- How can I incorporate the IsValid() method into the query.
有一个运算符:
Flux.fromIterable(getC())
.filter(c -> valid(c))
- I need to reuse the cId value I get on the first map - .map(c -> c.getId), in two places. How can I keep track of that value if not
used immediately in the next step.
在特定的简单情况下,您可以像这样简单地使用嵌套 flatmap
:
.flatMap(id ->
Flux.fromIterable(getM(id))
.flatMap(m -> {
String q = Message.Format("query: {}", m);
List<D> dList = getD(id, q);
return sendDToKafka(dList);
})
)
见
- Is there a way to form the q variable within the reactive query to pass as argument in getD()
您不能在流中使用非最终 q
变量。看看 AtomicReference<String>
.
正如另一个答案中已经提到的,doOnNext
用于副作用,map
用于将某些东西从一种类型映射到另一种类型。
我有一个代码逻辑需要在一系列嵌套循环中。 这是示例:
String q = null;
for(C c in getC()){
if(Valid(c)){
String Id = c.Id;
List<M> mList = getM(Id);
for(M m in mList){
q = Message.Format("query:{}",m);
List<D> dList = getD(Id, q);
sendDToKafka(dList);
}
}
}
我正在尝试使用项目反应器将上述逻辑转换为反应式。 我到目前为止的代码:
Flux.fromIterable(getC())
.map(c -> c.getId)
.doOnNext(cId -> getM(cId))
.map(m -> m.trim())
.doOnNext(m -> getD()) // need to pass in query and Id to getD()
.subscribe();
我面临的问题很少:
- 如何将 IsValid() 方法合并到查询中。
- 我需要在两个地方重用我在第一张地图 - .map(c -> c.getId) 上获得的 cId 值。如果在下一步中不立即使用,我如何跟踪该值。
- 有没有办法在反应式查询中形成 q 变量以作为参数传递给 getD()
- 如果代码是一种有效的方法,我将非常感谢任何反馈。
首先,doOnNext
方法是为了副作用而不是为了改变事件流。此外,如果您要将某些内容转换为反应式,则整个管道需要是非阻塞的,您应该避免调用任何阻塞的代码。如果您有无法更改的阻止代码,you can follow the advice here:
对于过滤,您可以使用 filter
,对于在多个位置使用 cId
,可以将其作为元组传递到链中(那里有很多库),或者您可以创建自己的 class 为此目的。
- How can I incorporate the IsValid() method into the query.
有一个运算符:
Flux.fromIterable(getC())
.filter(c -> valid(c))
- I need to reuse the cId value I get on the first map - .map(c -> c.getId), in two places. How can I keep track of that value if not used immediately in the next step.
在特定的简单情况下,您可以像这样简单地使用嵌套 flatmap
:
.flatMap(id ->
Flux.fromIterable(getM(id))
.flatMap(m -> {
String q = Message.Format("query: {}", m);
List<D> dList = getD(id, q);
return sendDToKafka(dList);
})
)
见
- Is there a way to form the q variable within the reactive query to pass as argument in getD()
您不能在流中使用非最终 q
变量。看看 AtomicReference<String>
.
正如另一个答案中已经提到的,doOnNext
用于副作用,map
用于将某些东西从一种类型映射到另一种类型。