如何从 Sink 发出消息并将它们传递给另一个函数?
How to emit messages from a Sink and pass them to another function?
我目前正在使用 Akka-HTTP 构建客户端 WebSockets 使用者。我不想尝试在 Sink 中进行解析,而是想将代码包装在一个函数中,该函数从 Sink 发出结果,然后稍后使用此输出(来自函数)进行进一步处理(更多解析......等等。 ).
我目前能够打印来自接收器的每条消息;但是,函数的 return 类型仍然是 Unit
。我的 objective 是从函数发出一个 String
,对于落在接收器中的每个项目,然后使用 returned 字符串进行进一步的解析。我有到目前为止的代码(注意:它主要是样板)。
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse}
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
object client extends App {
def parseData(uri: String)(implicit system: ActorSystem, materializer: Materializer): Unit = {
val defaultSettings = ClientConnectionSettings(system)
val pingCounter = new AtomicInteger()
val customWebsocketSettings = defaultSettings.websocketSettings.withPeriodicKeepAliveData(
() => ByteString(s"debug-${pingCounter.incrementAndGet()}")
)
val customSettings = defaultSettings.withWebsocketSettings(customWebsocketSettings)
val outgoing = Source.maybe[Message]
val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {
case message: TextMessage.Strict => message.text // I Want to emit/stream this message as a String from the function (or get a handle on it from the outside)
case _ => println("Other")
}
val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest(uri), settings = customSettings)
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(sink)(Keep.both)
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
connected.onComplete {
case Success(value) => value
case Failure(exception) => throw exception
}
closed.onComplete { _ =>
println("Retrying...")
parseData(uri)
}
upgradeResponse.onComplete {
case Success(value) => println(value)
case Failure(exception) => throw exception
}
}
}
在一个单独的对象中,我想进行解析,所以类似于:
import akka.actor.ActorSystem
import akka.stream.Materializer
import api.client.parseData
object Application extends App {
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = Materializer(system)
val uri = "ws://localhost:8080/foobar"
val res = parseData(uri) // I want to handle the function output here
// parse(res)
println(res)
有没有办法可以从函数外部获取 Sink 的句柄,或者我是否需要在 Sink 中进行任何解析。我主要是尽量不要让 Sink 过于复杂。
更新:我也在考虑向流中添加另一个 Flow
元素(处理解析)是否比在流外获取值更好。
添加流程元素似乎可以解决您的问题,而且完全符合习惯。
你必须记住的是,sinks 语义是为了描述如何“终止”流,所以虽然它可以描述非常复杂的计算,但它总是 return 一个单一的值,它return仅在流结束后编辑。
换句话说,接收器不是 return 每个流元素的值,它 return 是每个整个流的值。
我目前正在使用 Akka-HTTP 构建客户端 WebSockets 使用者。我不想尝试在 Sink 中进行解析,而是想将代码包装在一个函数中,该函数从 Sink 发出结果,然后稍后使用此输出(来自函数)进行进一步处理(更多解析......等等。 ).
我目前能够打印来自接收器的每条消息;但是,函数的 return 类型仍然是 Unit
。我的 objective 是从函数发出一个 String
,对于落在接收器中的每个项目,然后使用 returned 字符串进行进一步的解析。我有到目前为止的代码(注意:它主要是样板)。
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse}
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
object client extends App {
def parseData(uri: String)(implicit system: ActorSystem, materializer: Materializer): Unit = {
val defaultSettings = ClientConnectionSettings(system)
val pingCounter = new AtomicInteger()
val customWebsocketSettings = defaultSettings.websocketSettings.withPeriodicKeepAliveData(
() => ByteString(s"debug-${pingCounter.incrementAndGet()}")
)
val customSettings = defaultSettings.withWebsocketSettings(customWebsocketSettings)
val outgoing = Source.maybe[Message]
val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {
case message: TextMessage.Strict => message.text // I Want to emit/stream this message as a String from the function (or get a handle on it from the outside)
case _ => println("Other")
}
val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest(uri), settings = customSettings)
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(sink)(Keep.both)
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
connected.onComplete {
case Success(value) => value
case Failure(exception) => throw exception
}
closed.onComplete { _ =>
println("Retrying...")
parseData(uri)
}
upgradeResponse.onComplete {
case Success(value) => println(value)
case Failure(exception) => throw exception
}
}
}
在一个单独的对象中,我想进行解析,所以类似于:
import akka.actor.ActorSystem
import akka.stream.Materializer
import api.client.parseData
object Application extends App {
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = Materializer(system)
val uri = "ws://localhost:8080/foobar"
val res = parseData(uri) // I want to handle the function output here
// parse(res)
println(res)
有没有办法可以从函数外部获取 Sink 的句柄,或者我是否需要在 Sink 中进行任何解析。我主要是尽量不要让 Sink 过于复杂。
更新:我也在考虑向流中添加另一个 Flow
元素(处理解析)是否比在流外获取值更好。
添加流程元素似乎可以解决您的问题,而且完全符合习惯。
你必须记住的是,sinks 语义是为了描述如何“终止”流,所以虽然它可以描述非常复杂的计算,但它总是 return 一个单一的值,它return仅在流结束后编辑。
换句话说,接收器不是 return 每个流元素的值,它 return 是每个整个流的值。