在 Akka 流中连接流

Concatenating Flows in Akka Streams

我们正在使用 Kafka,并希望使用 interactive queries 来访问我们的状态存储中的数据。我们现有的服务使用 Akka HTTP 来提供 REST API,我们希望将交互式查询集成到流程中。

似乎 kafka-streams-query 非常适合这个。但是,它通过公开使用低级 API 的 route 属性 集成到 Akka HTTP 中,映射到 Flow[HttpRequest, HttpResponse, Any]。我们之前的所有代码都使用 Akka HTTP 的路由 DSL 连接代码。

我希望像下面这样的代码可以工作,但它没有:

implicit val system:ActorSystem = ActorSystem("web")
implicit val materializer:ActorMaterializer = ActorMaterializer()
implicit val ec = system.dispatcher

val firstRoutes:Route = ... //a route object populated
val lastRoutes:Route = ... //other route object populad

val iqServiceFlow:Flow[HttpRequest, HttpResponse, Any] = ...// code that returns the interactive query service

val firstFlow = Route.handlerFlow(firstRoutes)
val lastFlow = Route.handlerFlow(lastRoutes)

// The following code doesn't work though everything I've seen online suggests it should
val handler = firstFlow via iqServiceFlow via lastFlow

Http().bindAndHandle(handler, "0.0.0.0", 8000)

如何在 Akka Streams 中合并流?

编辑: 更正了处理程序分配语句。

为清楚起见,让我们首先明确显示所有 return 类型:

val iqServiceFlow: Flow[HttpRequest, HttpResponse, Any] = ...
val firstFlow: Flow[HttpRequest, HttpResponse, NotUsed] = Route.handlerFlow(firstRoutes)
val lastFlow: Flow[HttpRequest, HttpResponse, NotUsed]  = Route.handlerFlow(lastRoutes)

此外,而不是...

val handler = firstRoutes via iqServiceFlow via lastFlow

...您的意思可能是:

val handler = firstFlow via iqServiceFlow via lastFlow

为了将流程与via链接在一起,输入和输出类型必须匹配:即第一个流程的输出类型必须与第二个流程的输入类型相同,并且很快。您尝试使用处理程序执行的操作如下:

[HttpRequest, HttpResponse] // firstFlow
                   |
                   v
             [HttpRequest, HttpResponse] // iqServiceFlow
                                |
                                v
                          [HttpRequest, HttpResponse] // lastFlow

你所有流的输出类型都是HttpResponse,但它们各自的输入类型都是HttpRequest,所以你不能将它们与via链接在一起。

要链接您的流程,您需要一个函数以某种方式将 HttpResponse 转换为 HttpRequest:

val respToReq: HttpResponse => HttpRequest = ...

您可以从上述函数创建流程:

val convertingFlow: Flow[HttpResponse, HttpRequest] = Flow.fromFunction(respToReq)

现在您可以链接您的流程:

val handler = firstFlow via convertingFlow via iqServiceFlow via convertingFlow via lastFlow

类型排列如下:

[HttpRequest, HttpResponse] // firstFlow
                   |
                   v
             [HttpResponse, HttpRequest] // convertingFlow
                                |
                                v
                           [HttpRequest, HttpResponse] // iqServiceFlow
                                              |
                                              v
                                        [HttpResponse, HttpRequest] // convertingFlow
                                                            |
                                                            v                              
                                                      [HttpRequest, HttpResponse] // lastFlow

以上假定您可以重复使用相同的转换 function/flow。如果这个假设不成立,您显然可以创建不同的转换 functions/flows.