使用折叠的 Akka 流程未完成
Akka flow using fold not completing
我有一些代码大致如下所示:其中 A 是两个映射的元组
def methodName(): Flow[A, B, NotUsed] = {
val filter = Flow[A].map(a => a._2.slice(0, 2))
val split = Flow[A._2]
.mapConcat(identity)
.map(t => {
B.random
})
.fold(B.empty)((a, b) => {
new B(a._1, a._2 ++ Seq(b._1), a._3 ++ Seq(b._2), a._4)
})
val logK = Flow[B].log("K", c => {
log.info("here")
})
filter.via(split).via(logK)
}
但是当我 运行 这样做时,流在折叠阶段停止,我不明白为什么。我可以确认 A._2 中的集合已完全耗尽,当我用不同的操作替换折叠时,流程继续进行并且没有被阻止。据我所知,上游 mapConcat 正在调用 completeStage。所以我不确定为什么弃牌阶段没有接到那个电话并且不知道进入下一阶段。
看来这是我使用的 akka 版本中的错误:akka: "2.5.23", akkaHttp: "10.1.10"
当我升级到 akka 时:“2.6.8”和 akkaHttpV =“10.2.0”
一切都按预期工作
我有一些代码大致如下所示:其中 A 是两个映射的元组
def methodName(): Flow[A, B, NotUsed] = {
val filter = Flow[A].map(a => a._2.slice(0, 2))
val split = Flow[A._2]
.mapConcat(identity)
.map(t => {
B.random
})
.fold(B.empty)((a, b) => {
new B(a._1, a._2 ++ Seq(b._1), a._3 ++ Seq(b._2), a._4)
})
val logK = Flow[B].log("K", c => {
log.info("here")
})
filter.via(split).via(logK)
}
但是当我 运行 这样做时,流在折叠阶段停止,我不明白为什么。我可以确认 A._2 中的集合已完全耗尽,当我用不同的操作替换折叠时,流程继续进行并且没有被阻止。据我所知,上游 mapConcat 正在调用 completeStage。所以我不确定为什么弃牌阶段没有接到那个电话并且不知道进入下一阶段。
看来这是我使用的 akka 版本中的错误:akka: "2.5.23", akkaHttp: "10.1.10"
当我升级到 akka 时:“2.6.8”和 akkaHttpV =“10.2.0” 一切都按预期工作