通过 akka 流链接上下文
Chaining context through akka streams
我正在将一些 C# 代码转换为 scala 和 akka 流。
我的 C# 代码如下所示:
Task<Result1> GetPartialResult1Async(Request request) ...
Task<Result2> GetPartialResult2Async(Request request) ...
async Task<Result> GetResultAsync(Request request)
{
var result1 = await GetPartialResult1Async(request);
var result2 = await GetPartialResult2Async(request);
return new Result(request, result1, result2);
}
现在是 akka 流。我没有从 Request
到结果的 Task
的函数,而是从请求到结果的流程。
所以我已经有了以下两个流程:
val partialResult1Flow: Flow[Request, Result1, NotUsed] = ...
val partialResult2Flow: Flow[Request, Result2, NotUsed] = ...
但是我看不出如何将它们组合成一个完整的流程,因为在第一个流程上调用 via 会丢失原始请求,而在第二个流程上调用 via 我们会丢失第一个流程的结果.
所以我创建了一个 WithState monad,它看起来像这样:
case class WithState[+TState, +TValue](value: TValue, state: TState) {
def map[TResult](func: TValue => TResult): WithState[TState, TResult] = {
WithState(func(value), state)
}
... bunch more helper functions go here
}
然后我将我的原始流程重写为如下所示:
def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] = ...
def partialResult2Flow: Flow[WithState[TState, Request], WithState[TState, Result2]] = ...
并像这样使用它们:
val flow = Flow[Request]
.map(x => WithState(x, x))
.via(partialResult1Flow)
.map(x => WithState(x.state, (x.state, x.value))
.via(partialResult2Flow)
.map(x => Result(x.state._1, x.state._2, x.value))
现在可以了,但是我当然不能保证流量会如何使用。所以我真的应该让它接受一个状态参数:
def flow[TState] = Flow[WithState[TState, Request]]
.map(x => WithState(x.value, (x.state, x.value)))
.via(partialResult1Flow)
.map(x => WithState(x.state._2, (x.state, x.value))
.via(partialResult2Flow)
.map(x => WithState(Result(x.state._1._2, x.state._2, x.value), x.state._1._1))
现在,在这个阶段我的代码变得非常难以阅读。我可以通过命名函数来清理它,并使用 case 类 而不是元组等。但从根本上说,这里有很多附带的复杂性,这是很难避免的。
我错过了什么吗?这不是 Akka 流的好用例吗?是否有一些内置的方法可以做到这一点?
免责声明,我对 C# 的 async/await 并不完全熟悉。
从我快速阅读 C# 文档中了解到的内容,Task<T>
是一个严格的(即急切的,而不是懒惰的)评估计算,如果成功,最终将包含 T
. Scala 的等价物是 Future[T]
,其中 C# 代码的等价物是:
import scala.concurrent.{ ExecutionContext, Future }
def getPartialResult1Async(req: Request): Future[Result1] = ???
def getPartialResult2Async(req: Request): Future[Result2] = ???
def getResultAsync(req: Request)(implicit ectx: ExecutionContext): Future[Result] = {
val result1 = getPartialResult1Async(req)
val result2 = getPartialResult2Async(req)
result1.zipWith(result2) { tup => val (r1, r2) = tup
new Result(req, r1, r2)
}
/* Could also:
* for {
* r1 <- result1
* r2 <- result2
* } yield { new Result(req, r1, r2) }
*
* Note that both the `result1.zipWith(result2)` and the above `for`
* construction may compute the two partial results simultaneously. If you
* want to ensure that the second partial result is computed after the first
* partial result is successfully computed:
* for {
* r1 <- getPartialResult1Async(req)
* r2 <- getPartialResult2Async(req)
* } yield new Result(req, r1, r2)
*/
}
这种特殊情况不需要 Akka Streams,但如果您有其他需要使用 Akka Streams,您可以将其表达为
val actorSystem = ??? // In Akka Streams 2.6, you'd probably have this as an implicit val
val parallelism = ??? // Controls requests in flight
val flow = Flow[Request]
.mapAsync(parallelism) { req =>
import actorSystem.dispatcher
getPartialResult1Async(req).map { r1 => (req, r1) }
}
.mapAsync(parallelism) { tup =>
import actorSystem.dispatcher
getPartialResult2Async(tup._2).map { r2 =>
new Result(tup._1, tup._2, r2)
}
}
/* Given the `getResultAsync` function in the previous snippet, you could also:
* val flow = Flow[Request].mapAsync(parallelism) { req =>
* getResultAsync(req)(actorSystem.dispatcher)
* }
*/
基于 Future
的实现的一个优点是,它很容易与您想要在给定上下文中使用的 concurrency/parallelism 的任何 Scala 抽象集成(例如猫、akka 流、akka ).我对 Akka Streams 集成的一般直觉是我在第二个代码块中的注释中的三行代码方向。
与我在问题中描述的方法相比,我没有任何根本不同的方法。
然而,电流可以显着改善:
第 1 阶段:FlowWithContext
可以使用内置的 FlowWithContext
.
而不是使用自定义 WithState
monad
这样做的好处是您可以在流上使用标准运算符,而无需担心转换 WithState
monad。 Akka 会为您处理这件事。
所以不用
def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] =
Flow[WithState[TState, Request]].mapAsync(_ mapAsync {doRequest(_)})
我们可以这样写:
def partialResult1Flow[TState]: FlowWithContext[Request, TState, Result1, TState, NotUsed] =
FlowWithContext[Request, TState].mapAsync(doRequest(_))
不幸的是,虽然当您不需要更改上下文时 FlowWithContext 很容易编写,但当您需要通过需要移动一些当前数据的流时使用它有点繁琐进入上下文(就像我们的一样)。为此,您需要转换为 Flow
(使用 asFlow
),然后使用 asFlowWithContext
.
转换回 FlowWithContext
我发现在这种情况下,将整个内容写成 Flow
最简单,然后在最后转换为 FlowWithContext
。
例如:
def flow[TState]: FlowWithContext[Request, TState, Result, TState, NotUsed] =
Flow[(Request, TState)]
.map(x => (x._1, (x._1, x._2)))
.via(partialResult1Flow)
.map(x => (x._2._1, (x._2._1, x._1, x._2._2))
.via(partialResult2Flow)
.map(x => (Result(x._2._1, x._2._2, x._1), x._2._2))
.asFlowWithContext((a: Request, b: TState) => (a,b))(_._2)
.map(_._1)
这样更好吗?
在这种特殊情况下,情况可能更糟。在其他情况下,您很少需要更改上下文会更好。但是,无论哪种方式,我都建议使用它的内置功能,而不是依赖自定义 monad。
第 2 阶段:通过使用
为了使它更加用户友好,我为 Flow 和 FlowWithContext 创建了一个 viaUsing
扩展方法:
import akka.stream.{FlowShape, Graph}
import akka.stream.scaladsl.{Flow, FlowWithContext}
object FlowExtensions {
implicit class FlowViaUsingOps[In, Out, Mat](val f: Flow[In, Out, Mat]) extends AnyVal {
def viaUsing[Out2, Using, Mat2](func: Out => Using)(flow: Graph[FlowShape[(Using, Out), (Out2, Out)], Mat2]) : Flow[In, (Out2, Out), Mat] =
f.map(x => (func(x), x)).via(flow)
}
implicit class FlowWithContextViaUsingOps[In, CtxIn, Out, CtxOut, Mat](val f: FlowWithContext[In, CtxIn, Out, CtxOut, Mat]) extends AnyVal {
def viaUsing[Out2, Using, Mat2](func: Out => Using)(flow: Graph[FlowShape[(Using, (Out, CtxOut)), (Out2, (Out, CtxOut))], Mat2]):
FlowWithContext[In, CtxIn, (Out2, Out), CtxOut, Mat] =
f
.asFlow
.map(x => (func(x._1), (x._1, x._2)))
.via(flow)
.asFlowWithContext((a: In, b: CtxIn) => (a,b))(_._2._2)
.map(x => (x._1, x._2._1))
}
}
viaUsing
的目的是从当前输出为 FlowWithContext
创建输入,同时通过上下文传递它来保留当前输出。它导致 Flow
其输出是嵌套流和原始流输出的元组。
使用 viaUsing
我们的示例简化为:
def flow[TState]: FlowWithContext[Request, TState, Result, TState, NotUsed] =
FlowWithContext[Request, TState]
.viaUsing(x => x)(partialResult1Flow)
.viaUsing(x => x._2)(partialResult2Flow)
.map(x => Result(x._2._2, x._2._1, x._1))
我认为这是一项重大改进。我已经请求将 viaUsing 添加到 Akka 而不是依赖扩展方法 here.
我同意使用 Akka Streams 进行背压很有用。但是,我不相信将 partialResult
s 的计算建模为流在这里有用。将 'inner' 逻辑基于 Future
并将其包装在流程的 mapAsync
中以将背压作为一个单元应用于整个操作似乎更简单,甚至可能更好。
这基本上是 Levi Ramsey 早期出色回答的精简重构:
import scala.concurrent.{ ExecutionContext, Future }
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
case class Request()
case class Result1()
case class Result2()
case class Response(r: Request, r1: Result1, r2: Result2)
def partialResult1(req: Request): Future[Result1] = ???
def partialResult2(req: Request): Future[Result2] = ???
val system = akka.actor.ActorSystem()
implicit val ec: ExecutionContext = system.dispatcher
val flow: Flow[Request, Response, NotUsed] =
Flow[Request]
.mapAsync(parallelism = 12) { req =>
for {
res1 <- partialResult1(req)
res2 <- partialResult2(req)
} yield (Response(req, res1, res2))
}
我会从这个开始,只有当你知道你有理由将 partialResult1
和 partialResult2
分成不同的阶段时,才会在 Flow
中引入一个中间步骤。根据您的要求,mapAsyncUnordered
可能更合适。
我正在将一些 C# 代码转换为 scala 和 akka 流。
我的 C# 代码如下所示:
Task<Result1> GetPartialResult1Async(Request request) ...
Task<Result2> GetPartialResult2Async(Request request) ...
async Task<Result> GetResultAsync(Request request)
{
var result1 = await GetPartialResult1Async(request);
var result2 = await GetPartialResult2Async(request);
return new Result(request, result1, result2);
}
现在是 akka 流。我没有从 Request
到结果的 Task
的函数,而是从请求到结果的流程。
所以我已经有了以下两个流程:
val partialResult1Flow: Flow[Request, Result1, NotUsed] = ...
val partialResult2Flow: Flow[Request, Result2, NotUsed] = ...
但是我看不出如何将它们组合成一个完整的流程,因为在第一个流程上调用 via 会丢失原始请求,而在第二个流程上调用 via 我们会丢失第一个流程的结果.
所以我创建了一个 WithState monad,它看起来像这样:
case class WithState[+TState, +TValue](value: TValue, state: TState) {
def map[TResult](func: TValue => TResult): WithState[TState, TResult] = {
WithState(func(value), state)
}
... bunch more helper functions go here
}
然后我将我的原始流程重写为如下所示:
def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] = ...
def partialResult2Flow: Flow[WithState[TState, Request], WithState[TState, Result2]] = ...
并像这样使用它们:
val flow = Flow[Request]
.map(x => WithState(x, x))
.via(partialResult1Flow)
.map(x => WithState(x.state, (x.state, x.value))
.via(partialResult2Flow)
.map(x => Result(x.state._1, x.state._2, x.value))
现在可以了,但是我当然不能保证流量会如何使用。所以我真的应该让它接受一个状态参数:
def flow[TState] = Flow[WithState[TState, Request]]
.map(x => WithState(x.value, (x.state, x.value)))
.via(partialResult1Flow)
.map(x => WithState(x.state._2, (x.state, x.value))
.via(partialResult2Flow)
.map(x => WithState(Result(x.state._1._2, x.state._2, x.value), x.state._1._1))
现在,在这个阶段我的代码变得非常难以阅读。我可以通过命名函数来清理它,并使用 case 类 而不是元组等。但从根本上说,这里有很多附带的复杂性,这是很难避免的。
我错过了什么吗?这不是 Akka 流的好用例吗?是否有一些内置的方法可以做到这一点?
免责声明,我对 C# 的 async/await 并不完全熟悉。
从我快速阅读 C# 文档中了解到的内容,Task<T>
是一个严格的(即急切的,而不是懒惰的)评估计算,如果成功,最终将包含 T
. Scala 的等价物是 Future[T]
,其中 C# 代码的等价物是:
import scala.concurrent.{ ExecutionContext, Future }
def getPartialResult1Async(req: Request): Future[Result1] = ???
def getPartialResult2Async(req: Request): Future[Result2] = ???
def getResultAsync(req: Request)(implicit ectx: ExecutionContext): Future[Result] = {
val result1 = getPartialResult1Async(req)
val result2 = getPartialResult2Async(req)
result1.zipWith(result2) { tup => val (r1, r2) = tup
new Result(req, r1, r2)
}
/* Could also:
* for {
* r1 <- result1
* r2 <- result2
* } yield { new Result(req, r1, r2) }
*
* Note that both the `result1.zipWith(result2)` and the above `for`
* construction may compute the two partial results simultaneously. If you
* want to ensure that the second partial result is computed after the first
* partial result is successfully computed:
* for {
* r1 <- getPartialResult1Async(req)
* r2 <- getPartialResult2Async(req)
* } yield new Result(req, r1, r2)
*/
}
这种特殊情况不需要 Akka Streams,但如果您有其他需要使用 Akka Streams,您可以将其表达为
val actorSystem = ??? // In Akka Streams 2.6, you'd probably have this as an implicit val
val parallelism = ??? // Controls requests in flight
val flow = Flow[Request]
.mapAsync(parallelism) { req =>
import actorSystem.dispatcher
getPartialResult1Async(req).map { r1 => (req, r1) }
}
.mapAsync(parallelism) { tup =>
import actorSystem.dispatcher
getPartialResult2Async(tup._2).map { r2 =>
new Result(tup._1, tup._2, r2)
}
}
/* Given the `getResultAsync` function in the previous snippet, you could also:
* val flow = Flow[Request].mapAsync(parallelism) { req =>
* getResultAsync(req)(actorSystem.dispatcher)
* }
*/
基于 Future
的实现的一个优点是,它很容易与您想要在给定上下文中使用的 concurrency/parallelism 的任何 Scala 抽象集成(例如猫、akka 流、akka ).我对 Akka Streams 集成的一般直觉是我在第二个代码块中的注释中的三行代码方向。
与我在问题中描述的方法相比,我没有任何根本不同的方法。
然而,电流可以显着改善:
第 1 阶段:FlowWithContext
可以使用内置的 FlowWithContext
.
WithState
monad
这样做的好处是您可以在流上使用标准运算符,而无需担心转换 WithState
monad。 Akka 会为您处理这件事。
所以不用
def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] =
Flow[WithState[TState, Request]].mapAsync(_ mapAsync {doRequest(_)})
我们可以这样写:
def partialResult1Flow[TState]: FlowWithContext[Request, TState, Result1, TState, NotUsed] =
FlowWithContext[Request, TState].mapAsync(doRequest(_))
不幸的是,虽然当您不需要更改上下文时 FlowWithContext 很容易编写,但当您需要通过需要移动一些当前数据的流时使用它有点繁琐进入上下文(就像我们的一样)。为此,您需要转换为 Flow
(使用 asFlow
),然后使用 asFlowWithContext
.
FlowWithContext
我发现在这种情况下,将整个内容写成 Flow
最简单,然后在最后转换为 FlowWithContext
。
例如:
def flow[TState]: FlowWithContext[Request, TState, Result, TState, NotUsed] =
Flow[(Request, TState)]
.map(x => (x._1, (x._1, x._2)))
.via(partialResult1Flow)
.map(x => (x._2._1, (x._2._1, x._1, x._2._2))
.via(partialResult2Flow)
.map(x => (Result(x._2._1, x._2._2, x._1), x._2._2))
.asFlowWithContext((a: Request, b: TState) => (a,b))(_._2)
.map(_._1)
这样更好吗?
在这种特殊情况下,情况可能更糟。在其他情况下,您很少需要更改上下文会更好。但是,无论哪种方式,我都建议使用它的内置功能,而不是依赖自定义 monad。
第 2 阶段:通过使用
为了使它更加用户友好,我为 Flow 和 FlowWithContext 创建了一个 viaUsing
扩展方法:
import akka.stream.{FlowShape, Graph}
import akka.stream.scaladsl.{Flow, FlowWithContext}
object FlowExtensions {
implicit class FlowViaUsingOps[In, Out, Mat](val f: Flow[In, Out, Mat]) extends AnyVal {
def viaUsing[Out2, Using, Mat2](func: Out => Using)(flow: Graph[FlowShape[(Using, Out), (Out2, Out)], Mat2]) : Flow[In, (Out2, Out), Mat] =
f.map(x => (func(x), x)).via(flow)
}
implicit class FlowWithContextViaUsingOps[In, CtxIn, Out, CtxOut, Mat](val f: FlowWithContext[In, CtxIn, Out, CtxOut, Mat]) extends AnyVal {
def viaUsing[Out2, Using, Mat2](func: Out => Using)(flow: Graph[FlowShape[(Using, (Out, CtxOut)), (Out2, (Out, CtxOut))], Mat2]):
FlowWithContext[In, CtxIn, (Out2, Out), CtxOut, Mat] =
f
.asFlow
.map(x => (func(x._1), (x._1, x._2)))
.via(flow)
.asFlowWithContext((a: In, b: CtxIn) => (a,b))(_._2._2)
.map(x => (x._1, x._2._1))
}
}
viaUsing
的目的是从当前输出为 FlowWithContext
创建输入,同时通过上下文传递它来保留当前输出。它导致 Flow
其输出是嵌套流和原始流输出的元组。
使用 viaUsing
我们的示例简化为:
def flow[TState]: FlowWithContext[Request, TState, Result, TState, NotUsed] =
FlowWithContext[Request, TState]
.viaUsing(x => x)(partialResult1Flow)
.viaUsing(x => x._2)(partialResult2Flow)
.map(x => Result(x._2._2, x._2._1, x._1))
我认为这是一项重大改进。我已经请求将 viaUsing 添加到 Akka 而不是依赖扩展方法 here.
我同意使用 Akka Streams 进行背压很有用。但是,我不相信将 partialResult
s 的计算建模为流在这里有用。将 'inner' 逻辑基于 Future
并将其包装在流程的 mapAsync
中以将背压作为一个单元应用于整个操作似乎更简单,甚至可能更好。
这基本上是 Levi Ramsey 早期出色回答的精简重构:
import scala.concurrent.{ ExecutionContext, Future }
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
case class Request()
case class Result1()
case class Result2()
case class Response(r: Request, r1: Result1, r2: Result2)
def partialResult1(req: Request): Future[Result1] = ???
def partialResult2(req: Request): Future[Result2] = ???
val system = akka.actor.ActorSystem()
implicit val ec: ExecutionContext = system.dispatcher
val flow: Flow[Request, Response, NotUsed] =
Flow[Request]
.mapAsync(parallelism = 12) { req =>
for {
res1 <- partialResult1(req)
res2 <- partialResult2(req)
} yield (Response(req, res1, res2))
}
我会从这个开始,只有当你知道你有理由将 partialResult1
和 partialResult2
分成不同的阶段时,才会在 Flow
中引入一个中间步骤。根据您的要求,mapAsyncUnordered
可能更合适。