由 websocket 消息控制的动态源和流的 Akka 流最佳实践

Akka stream best practice for dynamic Source and Flow controlled by websocket messages

我试图了解使用 akka 流和 alpakka 实现以下场景(简化)的最佳方式是什么:

  1. 前端打开与后端的websocket连接
  2. 后端应该等待带有一些参数的初始化消息(例如 bootstrapServerstopicName 和一个 transformationMethod 是一个字符串参数)
  3. 一旦这些信息到位,后端就可以启动 alpakka 消费者从 bootstrapServers 的主题 topicName 中消费,并对基于 transformationMethod 的数据应用一些转换,推送这些websocket 中的结果
  4. 前端可以周期性地通过websocket发送改变transformationMethod字段的消息,这样从Kafka消费的消息的转换算法可以根据提供的transformationMethod值动态改变进入网络套接字。

我不明白是否有可能在图内的 akka 流上实现这一点,尤其是动态部分,既用于 alpakka 消费者的初始化,也用于动态更改 transformationMethod 参数.

示例:

前端建立连接,10 秒后通过套接字发送以下内容:

{"bootstrapServers": "localhost:9092", "topicName": "topic", "transformationMethod": "PLUS_ONE"}

因此,Alpakka 消费者被实例化并开始从 Kafka 读取消息。

消息在 Kafka 中流动,因此它到达 1 并且在 websocket 中前端将收到 2(由于 PLUS_ONE 转换方法,它可能被放置在map 或带有 Flow 的 via),然后 2 等前端接收 3 等等。

然后,前端发送:

{"transformationMethod": "SQUARE"}

所以现在,从 Kafka 到达 3,前端将接收 9,然后 4,因此输出将是 16 ecc...

这大概就是我想获得的流量了

我能够创建一个与 Alpakka 消费者的 websocket 连接,执行某种“静态”转换并将结果推回 websocket,这很简单,我想念的是这个动态部分,但我不确定如果我可以在同一个图中实现它,或者如果我需要更多层(也许有一些管理流程的 Actor 并且 activate/change Alpakka 消费者的行为会实时发送消息?)

谢谢

我可能倾向于通过为每个 websocket 生成一个 actor 来实现这一点,预先实现一个 Source 来接收来自 actor 的消息(可能使用 ActorSource.actorRefWithBackpressure),构建一个 Sink(可能使用 ActorSink.actorRefWithBackpressure)将传入的 websocket 消息调整为 control-plane 消息(初始化(包括与预物化源关联的 ActorRef 和转换更改)并将它们发送给参与者,以及然后使用 WebsocketUpgrade.

上的 handleMessagesWithSinkSource 将它们捆绑在一起

您生成的 actor 将在收到初始化消息后启动一个流,该流从 Kafka 向其提供消息;通过等待 ack 的请求协议让流馈送消息,可以将一些背压反馈给 Kafka;为了使该流保持活动状态,actor 需要在特定时间段内进行确认,而不管下游做了什么,因此需要决定是让 actor 缓冲消息还是丢弃它们。