groupBy 的子流是否可以依赖于它们生成的键?
Can the subflows of groupBy depend on the keys they were generated from ?
我有一个包含与用户关联的数据的流程。我也有每个用户的状态,我可以从数据库异步获取。
我想将我的流程与每个用户一个子流程分开,并在具体化子流程时为每个用户加载状态,以便可以根据该状态处理子流程的元素。
如果我不想合并下游的子流,我可以用 groupBy
和 Sink.lazyInit
做一些事情:
def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...
val treatByUser: Sink[Element] = Flow[Element].groupBy(
Int.MaxValue,
getUserId
).to(
Sink.lazyInit(
elt => getState(getUserId(elt)).map(treatUser),
??? // this is never called, since the subflow is created when an element comes
)
)
但是,如果 treatUser
变成 Flow
,这将不起作用,因为 Sink.lazyInit
没有等价物。
由于 groupBy
的子流仅在推送新元素时才具体化,因此应该可以使用此元素来具体化子流,但我无法调整 groupBy 的源代码,所以这项工作始终如一。同样,Sink.lazyInit
似乎不容易翻译成 Flow
的情况。
知道如何解决这个问题吗?
您必须查看的相关 Akka 问题是 #20129: add Sink.dynamic and Flow.dynamic。
在相关的 PR #20579 中,他们实际上实现了 LazySink
个内容。
他们计划LazyFlow
接下来:
Will do next lazyFlow with similar signature.
不幸的是,您必须等待该功能在 Akka 中实现或自己编写(然后考虑向 Akka 提交 PR)。
我有一个包含与用户关联的数据的流程。我也有每个用户的状态,我可以从数据库异步获取。
我想将我的流程与每个用户一个子流程分开,并在具体化子流程时为每个用户加载状态,以便可以根据该状态处理子流程的元素。
如果我不想合并下游的子流,我可以用 groupBy
和 Sink.lazyInit
做一些事情:
def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...
val treatByUser: Sink[Element] = Flow[Element].groupBy(
Int.MaxValue,
getUserId
).to(
Sink.lazyInit(
elt => getState(getUserId(elt)).map(treatUser),
??? // this is never called, since the subflow is created when an element comes
)
)
但是,如果 treatUser
变成 Flow
,这将不起作用,因为 Sink.lazyInit
没有等价物。
由于 groupBy
的子流仅在推送新元素时才具体化,因此应该可以使用此元素来具体化子流,但我无法调整 groupBy 的源代码,所以这项工作始终如一。同样,Sink.lazyInit
似乎不容易翻译成 Flow
的情况。
知道如何解决这个问题吗?
您必须查看的相关 Akka 问题是 #20129: add Sink.dynamic and Flow.dynamic。
在相关的 PR #20579 中,他们实际上实现了 LazySink
个内容。
他们计划LazyFlow
接下来:
Will do next lazyFlow with similar signature.
不幸的是,您必须等待该功能在 Akka 中实现或自己编写(然后考虑向 Akka 提交 PR)。