由 websocket 消息控制的动态源和流的 Akka 流最佳实践
Akka stream best practice for dynamic Source and Flow controlled by websocket messages
我试图了解使用 akka 流和 alpakka 实现以下场景(简化)的最佳方式是什么:
- 前端打开与后端的websocket连接
- 后端应该等待带有一些参数的初始化消息(例如
bootstrapServers
、topicName
和一个 transformationMethod
是一个字符串参数)
- 一旦这些信息到位,后端就可以启动 alpakka 消费者从
bootstrapServers
的主题 topicName
中消费,并对基于 transformationMethod
的数据应用一些转换,推送这些websocket 中的结果
- 前端可以周期性地通过websocket发送改变
transformationMethod
字段的消息,这样从Kafka消费的消息的转换算法可以根据提供的transformationMethod
值动态改变进入网络套接字。
我不明白是否有可能在图内的 akka 流上实现这一点,尤其是动态部分,既用于 alpakka 消费者的初始化,也用于动态更改 transformationMethod
参数.
示例:
前端建立连接,10 秒后通过套接字发送以下内容:
{"bootstrapServers": "localhost:9092", "topicName": "topic", "transformationMethod": "PLUS_ONE"}
因此,Alpakka 消费者被实例化并开始从 Kafka 读取消息。
消息在 Kafka 中流动,因此它到达 1
并且在 websocket 中前端将收到 2
(由于 PLUS_ONE
转换方法,它可能被放置在map
或带有 Flow 的 via
),然后 2
等前端接收 3
等等。
然后,前端发送:
{"transformationMethod": "SQUARE"}
所以现在,从 Kafka 到达 3
,前端将接收 9
,然后 4
,因此输出将是 16
ecc...
这大概就是我想获得的流量了
我能够创建一个与 Alpakka 消费者的 websocket 连接,执行某种“静态”转换并将结果推回 websocket,这很简单,我想念的是这个动态部分,但我不确定如果我可以在同一个图中实现它,或者如果我需要更多层(也许有一些管理流程的 Actor 并且 activate/change Alpakka 消费者的行为会实时发送消息?)
谢谢
我可能倾向于通过为每个 websocket 生成一个 actor 来实现这一点,预先实现一个 Source
来接收来自 actor 的消息(可能使用 ActorSource.actorRefWithBackpressure
),构建一个 Sink
(可能使用 ActorSink.actorRefWithBackpressure
)将传入的 websocket 消息调整为 control-plane 消息(初始化(包括与预物化源关联的 ActorRef
和转换更改)并将它们发送给参与者,以及然后使用 WebsocketUpgrade
.
上的 handleMessagesWithSinkSource
将它们捆绑在一起
您生成的 actor 将在收到初始化消息后启动一个流,该流从 Kafka 向其提供消息;通过等待 ack 的请求协议让流馈送消息,可以将一些背压反馈给 Kafka;为了使该流保持活动状态,actor 需要在特定时间段内进行确认,而不管下游做了什么,因此需要决定是让 actor 缓冲消息还是丢弃它们。
我试图了解使用 akka 流和 alpakka 实现以下场景(简化)的最佳方式是什么:
- 前端打开与后端的websocket连接
- 后端应该等待带有一些参数的初始化消息(例如
bootstrapServers
、topicName
和一个transformationMethod
是一个字符串参数) - 一旦这些信息到位,后端就可以启动 alpakka 消费者从
bootstrapServers
的主题topicName
中消费,并对基于transformationMethod
的数据应用一些转换,推送这些websocket 中的结果 - 前端可以周期性地通过websocket发送改变
transformationMethod
字段的消息,这样从Kafka消费的消息的转换算法可以根据提供的transformationMethod
值动态改变进入网络套接字。
我不明白是否有可能在图内的 akka 流上实现这一点,尤其是动态部分,既用于 alpakka 消费者的初始化,也用于动态更改 transformationMethod
参数.
示例:
前端建立连接,10 秒后通过套接字发送以下内容:
{"bootstrapServers": "localhost:9092", "topicName": "topic", "transformationMethod": "PLUS_ONE"}
因此,Alpakka 消费者被实例化并开始从 Kafka 读取消息。
消息在 Kafka 中流动,因此它到达 1
并且在 websocket 中前端将收到 2
(由于 PLUS_ONE
转换方法,它可能被放置在map
或带有 Flow 的 via
),然后 2
等前端接收 3
等等。
然后,前端发送:
{"transformationMethod": "SQUARE"}
所以现在,从 Kafka 到达 3
,前端将接收 9
,然后 4
,因此输出将是 16
ecc...
这大概就是我想获得的流量了
我能够创建一个与 Alpakka 消费者的 websocket 连接,执行某种“静态”转换并将结果推回 websocket,这很简单,我想念的是这个动态部分,但我不确定如果我可以在同一个图中实现它,或者如果我需要更多层(也许有一些管理流程的 Actor 并且 activate/change Alpakka 消费者的行为会实时发送消息?)
谢谢
我可能倾向于通过为每个 websocket 生成一个 actor 来实现这一点,预先实现一个 Source
来接收来自 actor 的消息(可能使用 ActorSource.actorRefWithBackpressure
),构建一个 Sink
(可能使用 ActorSink.actorRefWithBackpressure
)将传入的 websocket 消息调整为 control-plane 消息(初始化(包括与预物化源关联的 ActorRef
和转换更改)并将它们发送给参与者,以及然后使用 WebsocketUpgrade
.
handleMessagesWithSinkSource
将它们捆绑在一起
您生成的 actor 将在收到初始化消息后启动一个流,该流从 Kafka 向其提供消息;通过等待 ack 的请求协议让流馈送消息,可以将一些背压反馈给 Kafka;为了使该流保持活动状态,actor 需要在特定时间段内进行确认,而不管下游做了什么,因此需要决定是让 actor 缓冲消息还是丢弃它们。