如何在 Akka-Stream 2.0 流程开始时向 ActorRef 发送消息?
How do I send message to ActorRef at start of Akka-Stream 2.0 flow?
目标是在客户端连接且流开始后发送 WSConnectEvent
。使用 akka-streams 1.0,我能够通过以下方式完成此操作:
Flow(Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)) {
implicit builder =>
sdpSource =>
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].collect {
case TextMessage.Strict(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorAsSource = builder.materializedValue.map(actor => WSConnectEvent(callUUID, userUUID, actor))
fromWebsocket ~> merge.in(0)
actorAsSource ~> merge.in(1)
merge ~> callActorSink
sdpSource ~> toWebsocket
(fromWebsocket.inlet, toWebsocket.outlet)
}
在尝试将其升级为与 Akka-Streams 2.0.1 一起使用时,我更改为以下代码,但我并不是唯一收到 WSConnectEvent
消息的人。我不确定这是不是因为我的源设置不正确,或者我没有正确实现 ActorRef
。
val sdpSource = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)
Flow.fromGraph(
GraphDSL.create() { implicit builder =>
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].collect {
case TextMessage.Strict(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorAsSource = sdpSource.mapMaterializedValue(WSConnectEvent(callUUID, userUUID, _))
fromWebsocket ~> merge.in(0)
actorAsSource ~> merge.in(1)
merge ~> callActorSink
sdpSource ~> toWebsocket
FlowShape(fromWebsocket.in, toWebsocket.out)
}
)
对 sdpSource.mapMaterializedValue(...)
的调用仅将物化值从一种类型(ActorRef
转换为 WSConnectEvent
),它不会以任何方式将其作为元素从 Source
.
然而,Source.materializedValue
提供了一个源,该源将在图具体化后发出具体化值。
那么,你要做的是:
fromWebsocket ~> merge.in(0)
actorAsSource.materializedValue ~> merge.in(1)
感谢 johanandren 的帮助,mapMaterializedValue
不是正确的方法,相反我需要构建一个流来发送 WSConnectEvent
并通过它连接 builder.materializeValue
的输出通过端口中的 'merge',像这样:
// Join events, also sends actor for sending stuff
val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _))
builder.materializedValue ~> actorConnected ~> merge.in(1)
完整的工作示例:
val sdpSource: Source[WSResponseEvent, ActorRef] = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)
Flow.fromGraph(GraphDSL.create(sdpSource) {
implicit builder =>
{ (responseSource) =>
import GraphDSL.Implicits._
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].mapAsync(1)(_ match {
case tm: TextMessage => tm.textStream.runFold("")(_ + _).map(Some(_))
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Future.successful(None)
}).collect {
case Some(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), 2.minutes);
val toCallActor = builder.add(Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID)))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _))
fromWebsocket ~> merge.in(0)
builder.materializedValue ~> actorConnected ~> merge.in(1)
merge ~> toCallActor
responseSource ~> toWebsocket
FlowShape.of(fromWebsocket.in, toWebsocket.out)
}
})
目标是在客户端连接且流开始后发送 WSConnectEvent
。使用 akka-streams 1.0,我能够通过以下方式完成此操作:
Flow(Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)) {
implicit builder =>
sdpSource =>
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].collect {
case TextMessage.Strict(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorAsSource = builder.materializedValue.map(actor => WSConnectEvent(callUUID, userUUID, actor))
fromWebsocket ~> merge.in(0)
actorAsSource ~> merge.in(1)
merge ~> callActorSink
sdpSource ~> toWebsocket
(fromWebsocket.inlet, toWebsocket.outlet)
}
在尝试将其升级为与 Akka-Streams 2.0.1 一起使用时,我更改为以下代码,但我并不是唯一收到 WSConnectEvent
消息的人。我不确定这是不是因为我的源设置不正确,或者我没有正确实现 ActorRef
。
val sdpSource = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)
Flow.fromGraph(
GraphDSL.create() { implicit builder =>
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].collect {
case TextMessage.Strict(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorAsSource = sdpSource.mapMaterializedValue(WSConnectEvent(callUUID, userUUID, _))
fromWebsocket ~> merge.in(0)
actorAsSource ~> merge.in(1)
merge ~> callActorSink
sdpSource ~> toWebsocket
FlowShape(fromWebsocket.in, toWebsocket.out)
}
)
对 sdpSource.mapMaterializedValue(...)
的调用仅将物化值从一种类型(ActorRef
转换为 WSConnectEvent
),它不会以任何方式将其作为元素从 Source
.
Source.materializedValue
提供了一个源,该源将在图具体化后发出具体化值。
那么,你要做的是:
fromWebsocket ~> merge.in(0)
actorAsSource.materializedValue ~> merge.in(1)
感谢 johanandren 的帮助,mapMaterializedValue
不是正确的方法,相反我需要构建一个流来发送 WSConnectEvent
并通过它连接 builder.materializeValue
的输出通过端口中的 'merge',像这样:
// Join events, also sends actor for sending stuff
val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _))
builder.materializedValue ~> actorConnected ~> merge.in(1)
完整的工作示例:
val sdpSource: Source[WSResponseEvent, ActorRef] = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)
Flow.fromGraph(GraphDSL.create(sdpSource) {
implicit builder =>
{ (responseSource) =>
import GraphDSL.Implicits._
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].mapAsync(1)(_ match {
case tm: TextMessage => tm.textStream.runFold("")(_ + _).map(Some(_))
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Future.successful(None)
}).collect {
case Some(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), 2.minutes);
val toCallActor = builder.add(Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID)))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _))
fromWebsocket ~> merge.in(0)
builder.materializedValue ~> actorConnected ~> merge.in(1)
merge ~> toCallActor
responseSource ~> toWebsocket
FlowShape.of(fromWebsocket.in, toWebsocket.out)
}
})