Akka HTTP WebSocket Server 消息处理构建Flow时如何根据Sink内容声明Source?

Akka HTTP WebSocket Server How to declare Source based on Sink content when building Flow for message handlement?

我正在玩 Akka Stream/Http 并试图弄清楚如何在 websocket 服务器中执行以下操作(希望不使用 Actor 模式):

1. 在客户端初始连接请求时,握手后,服务器将以json格式监听客户端的初始Message
2.服务器收到TextMessage.Strict后,如果有效,它将丰富Message以构建"Predicate"(例如过滤器映射),然后服务器将使用该"Predicate"建立一个Source[Message, _]

我尝试使用 handleMessagesWithSinkSource,但在我看来,这个 API 的目的是独立生成 SinkSource(参见路线 /ws)。我也尝试使用 handleWebSocketMessages(参见路线 ws/filter),但对我来说,Flow 中的 Sink(inlet) 和 Source(outlet) 之间没有任何联系。我对 Sink 应该如何工作的理解一定是错误的:

我的路线:

              (path("ws") & get & pathEndOrSingleSlash) {
                extractUpgradeToWebSocket { upgrade =>
                  complete(upgrade.handleMessagesWithSinkSource(Sink.ignore,
                    getSourceAll)
                } // this route is working fine, pushing all event Messages to client once connected 
              } ~
              (path("ws" / "filter") & get & pathEndOrSingleSlash) {
                handleWebSocketMessages(getSourceFiltered) 
              } // this route I cannot figure out a way to build `Flow` dynamically based on `Sink`...


getSourceAllgetSourceFiltered

的想法
val getSourceAll: Source[Message, NotUsed] = ??? // Stream source genereating Messages based on backend event

val getSourceFiltered: Flow[Message, Message, _] = ??? // the outgoing Source should push Server event messages based on client's "Predicate" message 

建立连接后,我希望用户发送像这样的过滤器:

{
 productId: 1,
 city: New York
}

然后服务器应该不断从后端(数据库)实时推送数据流(事件),其中包含更详细的产品信息返回给用户客户端:

{
orderId: 1122,
productId: 1,
productName: Coke,
vendor: ABC,
city: New York
timestamp: 2019-11-13 09:30:00
}
{
orderId: 3322,
productId: 1,
productName: Coke,
vendor: EFG,
city: New York
timestamp: 2019-11-13 09:31:00
}
...

Actor 绝对需要处理这个吗?如果是这样,任何指导都会非常有帮助!

更新 简而言之,如何根据用户的消息发出事件驱动的服务器推送源?

此外,也许我对如何构建新的 Source[Message, _] 感到困惑?我知道 Flow 是不可变的,但应该是一种根据输入进行切换的方法(Flow.fromSinkAndSource?)。因为 Akka 只有指令 api,例如:handleWebSocketMessages(flow: Flow[Message, Message, _]),它只使用输入消息,但不会产生新的 Source,至于 handleMessagesWithSinkSource(sink, source),接收器和源没有逻辑连接到我。我仍在努力思考如何让它发挥作用..

虽然不是很明显,但 Flow[Message, Message, _] 足以实现大多数协议。请记住,Flow 可以通过 statefulMapConcatflatMapConcat 等函数构建几乎任意数量的状态。 Flow 甚至可以在没有直接接收输入以通过 extrapolatemerge 等函数回复输入的情况下开始发射东西 - 使用一些滴答声源。

你的情况:

val getSourceFiltered: Flow[Message, Message, _] = Flow[Message]
  .take(1)  // you only care about the first thing that the client sends 
  .flatMapConcat {
    case TextMessage.Strict(txtMsg: String) =>

      // Here is where you parse and make your filter using the message the client message
      val clientFilter: Message => Boolean = makeFilter(txtMsg)
      getSourceAll.filter(clientFilter)


    case _ => Source.single(TextMessage("Expected a single strict JSON message"))
  }