groupBy 的子流是否可以依赖于它们生成的键?

Can the subflows of groupBy depend on the keys they were generated from ?

我有一个包含与用户关联的数据的流程。我也有每个用户的状态,我可以从数据库异步获取。

我想将我的流程与每个用户一个子流程分开,并在具体化子流程时为每个用户加载状态,以便可以根据该状态处理子流程的元素。

如果我不想合并下游的子流,我可以用 groupBySink.lazyInit 做一些事情:

def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...

val treatByUser: Sink[Element] = Flow[Element].groupBy(
  Int.MaxValue, 
  getUserId
).to(
  Sink.lazyInit(
    elt => getState(getUserId(elt)).map(treatUser),
    ??? // this is never called, since the subflow is created when an element comes
  )
)

但是,如果 treatUser 变成 Flow,这将不起作用,因为 Sink.lazyInit 没有等价物。

由于 groupBy 的子流仅在推送新元素时才具体化,因此应该可以使用此元素来具体化子流,但我无法调整 groupBy 的源代码,所以这项工作始终如一。同样,Sink.lazyInit 似乎不容易翻译成 Flow 的情况。

知道如何解决这个问题吗?

您必须查看的相关 Akka 问题是 #20129: add Sink.dynamic and Flow.dynamic

在相关的 PR #20579 中,他们实际上实现了 LazySink 个内容。

他们计划LazyFlow接下来:

Will do next lazyFlow with similar signature.

不幸的是,您必须等待该功能在 Akka 中实现或自己编写(然后考虑向 Akka 提交 PR)。