我的 Akka-http 流的每一部分都必须生成消息类型的输出吗?
Does every part of my Akka-http stream have to be producing output with the type Message?
我正在尝试通过 WebSockets 使用流,我想解析接收到的消息并在它们到达接收器之前对其进行转换。但是,每当我使用不接受 Message
作为输入的 Source 或 Sink 时,我都会不断收到错误消息。
Therefore a WebSocket connection is modelled as either something you
connect a Flow[Message, Message, Mat] to or a Flow[Message, Message,
Mat] that you connect a Source[Message, Mat] and a Sink[Message, Mat]
to.
我仍然不确定我是否理解正确。我的困惑是:使用 Akka-http websockets 的 Sources、Flows & Sinks 是否必须始终传递 Message
类型?有办法解决吗?最重要的是,这里的最佳做法是什么?
我制定了一个简化的代码片段(不能运行),但应该有助于概念化我的问题。
val outgoing = Source.maybe[Message]
val decoder = Flow[Message] map {x => TextMessage("Hello from decode")}
// Do I need to pass a Message here?
val wrongDecoder = Flow[String] map {x => "Help :( I can't Sink! Maybe because I'm String?"}
val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {case message: TextMessage.Strict => message.text}
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri))
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.viaMat(decoder)(Keep.left)
.viaMat(wrongDecoder)(Keep.left) // IDE compiler tells me it expected a Graph but found a Flow?
.toMat(sink)(Keep.both)
.run()
是的,流必须接受并发出 Message
s。
val source: Source[Message, SoMat] = ???
val flow: Flow[Message, Message, FMat] = ???
val sink: Sink[Message, SiMat] = ???
source.via(flow).runWith(sink) // Ignoring which materializations you'd actually want to keep
这似乎是限制性的,但请注意一些签名:
// in Source (with types at least partially expanded)
def map[T](f: Message => T): Source[T, SoMat]
// in Flow (with types at least partially expanded)
def map[T](f: Message => T): Flow[Message, T, FMat]
// in Sink (with types at least partially expanded)
def contramap[T](f: T => Message): Sink[T, SiMap]
即如果您有从 Message
到该类型的函数,则可以将 Source[Message]
映射到您选择的任何类型的 Source
中。同样,您可以将 Sink[Message]
转换为任何类型的 Sink
。对于 Flow
,您可以将其映射到任何类型并将其映射回 Message
.
例如,您可以
val outgoing = Source.maybe[Message]
val decoder: Flow[Message, String, NotUsed] = Flow[Message].mapConcat { m =>
m match {
case TextMessage.Strict(msg) => List(msg)
case _ => Nil
}
}
val stringProcessor: Flow[String, String, NotUsed] = Flow[String].map { s => s.replace(':', ';') }
val sink: Sink[String, Future[Done]] = Sink.foreach[Message] {
case TextMessage.Strict(text) => println(text)
}.contramap[String] { s => TextMessage.Strict(s) }
val websocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri))
val (upgradeResponse, closed) =
outgoing
.viaMat(websocketFlow)(Keep.right)
.viaMat(decoder)(Keep.left)
.viaMat(stringProcessor)(Keep.left)
.toMat(sink)(Keep.both)
.run()
我可能会将 websocketFlow
和 decoder
组合成一个从 Message
到 String
的流程(或者任何适用于业务逻辑的域类型(stringProcessor
在这种情况下)),使用上面的 contramapped Sink
。
val stringsFromWebsocket: Flow[Message, String, Future[WebsocketUpgradeResponse]] =
Http()
.webSocketClientFlow(WebSocketRequest(uri))
.viaMat(decoder)(Keep.left)
val (upgradeResponse, closed) =
outgoing
.viaMat(stringsFromWebsocket)(Keep.right)
.viaMat(stringProcessor)(Keep.left)
.toMat(sink)(Keep.both)
.run()
我正在尝试通过 WebSockets 使用流,我想解析接收到的消息并在它们到达接收器之前对其进行转换。但是,每当我使用不接受 Message
作为输入的 Source 或 Sink 时,我都会不断收到错误消息。
Therefore a WebSocket connection is modelled as either something you connect a Flow[Message, Message, Mat] to or a Flow[Message, Message, Mat] that you connect a Source[Message, Mat] and a Sink[Message, Mat] to.
我仍然不确定我是否理解正确。我的困惑是:使用 Akka-http websockets 的 Sources、Flows & Sinks 是否必须始终传递 Message
类型?有办法解决吗?最重要的是,这里的最佳做法是什么?
我制定了一个简化的代码片段(不能运行),但应该有助于概念化我的问题。
val outgoing = Source.maybe[Message]
val decoder = Flow[Message] map {x => TextMessage("Hello from decode")}
// Do I need to pass a Message here?
val wrongDecoder = Flow[String] map {x => "Help :( I can't Sink! Maybe because I'm String?"}
val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {case message: TextMessage.Strict => message.text}
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri))
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.viaMat(decoder)(Keep.left)
.viaMat(wrongDecoder)(Keep.left) // IDE compiler tells me it expected a Graph but found a Flow?
.toMat(sink)(Keep.both)
.run()
是的,流必须接受并发出 Message
s。
val source: Source[Message, SoMat] = ???
val flow: Flow[Message, Message, FMat] = ???
val sink: Sink[Message, SiMat] = ???
source.via(flow).runWith(sink) // Ignoring which materializations you'd actually want to keep
这似乎是限制性的,但请注意一些签名:
// in Source (with types at least partially expanded)
def map[T](f: Message => T): Source[T, SoMat]
// in Flow (with types at least partially expanded)
def map[T](f: Message => T): Flow[Message, T, FMat]
// in Sink (with types at least partially expanded)
def contramap[T](f: T => Message): Sink[T, SiMap]
即如果您有从 Message
到该类型的函数,则可以将 Source[Message]
映射到您选择的任何类型的 Source
中。同样,您可以将 Sink[Message]
转换为任何类型的 Sink
。对于 Flow
,您可以将其映射到任何类型并将其映射回 Message
.
例如,您可以
val outgoing = Source.maybe[Message]
val decoder: Flow[Message, String, NotUsed] = Flow[Message].mapConcat { m =>
m match {
case TextMessage.Strict(msg) => List(msg)
case _ => Nil
}
}
val stringProcessor: Flow[String, String, NotUsed] = Flow[String].map { s => s.replace(':', ';') }
val sink: Sink[String, Future[Done]] = Sink.foreach[Message] {
case TextMessage.Strict(text) => println(text)
}.contramap[String] { s => TextMessage.Strict(s) }
val websocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri))
val (upgradeResponse, closed) =
outgoing
.viaMat(websocketFlow)(Keep.right)
.viaMat(decoder)(Keep.left)
.viaMat(stringProcessor)(Keep.left)
.toMat(sink)(Keep.both)
.run()
我可能会将 websocketFlow
和 decoder
组合成一个从 Message
到 String
的流程(或者任何适用于业务逻辑的域类型(stringProcessor
在这种情况下)),使用上面的 contramapped Sink
。
val stringsFromWebsocket: Flow[Message, String, Future[WebsocketUpgradeResponse]] =
Http()
.webSocketClientFlow(WebSocketRequest(uri))
.viaMat(decoder)(Keep.left)
val (upgradeResponse, closed) =
outgoing
.viaMat(stringsFromWebsocket)(Keep.right)
.viaMat(stringProcessor)(Keep.left)
.toMat(sink)(Keep.both)
.run()