如何从 Sink 发出消息并将它们传递给另一个函数?

How to emit messages from a Sink and pass them to another function?

我目前正在使用 Akka-HTTP 构建客户端 WebSockets 使用者。我不想尝试在 Sink 中进行解析,而是想将代码包装在一个函数中,该函数从 Sink 发出结果,然后稍后使用此输出(来自函数)进行进一步处理(更多解析......等等。 ).

我目前能够打印来自接收器的每条消息;但是,函数的 return 类型仍然是 Unit。我的 objective 是从函数发出一个 String,对于落在接收器中的每个项目,然后使用 returned 字符串进行进一步的解析。我有到目前为止的代码(注意:它主要是样板)。

import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse}
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

object client extends App {

  def parseData(uri: String)(implicit system: ActorSystem, materializer: Materializer): Unit = {

    val defaultSettings = ClientConnectionSettings(system)
    val pingCounter = new AtomicInteger()
    val customWebsocketSettings = defaultSettings.websocketSettings.withPeriodicKeepAliveData(
      () => ByteString(s"debug-${pingCounter.incrementAndGet()}")
    )

    val customSettings = defaultSettings.withWebsocketSettings(customWebsocketSettings)

    val outgoing = Source.maybe[Message]

    val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {
      case message: TextMessage.Strict => message.text // I Want to emit/stream this message as a String from the function (or get a handle on it from the outside)
      case _ => println("Other")
    }

    val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
      Http().webSocketClientFlow(WebSocketRequest(uri), settings = customSettings)

    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right)
        .toMat(sink)(Keep.both)
        .run()


    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(
          s"Connection failed: ${upgrade.response.status}"
        )
      }
    }

    connected.onComplete {
      case Success(value) => value
      case Failure(exception) => throw exception
    }

    closed.onComplete { _ =>
      println("Retrying...")
      parseData(uri)
    }

    upgradeResponse.onComplete {
      case Success(value) => println(value)
      case Failure(exception) => throw exception
    }
  }
}

在一个单独的对象中,我想进行解析,所以类似于:

import akka.actor.ActorSystem
import akka.stream.Materializer
import api.client.parseData



object Application extends App {
  implicit val system: ActorSystem = ActorSystem()
  implicit val materializer: Materializer = Materializer(system)
  val uri = "ws://localhost:8080/foobar"

  val res = parseData(uri) // I want to handle the function output here 
  // parse(res)
  println(res)

有没有办法可以从函数外部获取 Sink 的句柄,或者我是否需要在 Sink 中进行任何解析。我主要是尽量不要让 Sink 过于复杂。

更新:我也在考虑向流中添加另一个 Flow 元素(处理解析)是否比在流外获取值更好。

添加流程元素似乎可以解决您的问题,而且完全符合习惯。

你必须记住的是,sinks 语义是为了描述如何“终止”流,所以虽然它可以描述非常复杂的计算,但它总是 return 一个单一的值,它return仅在流结束后编辑。

换句话说,接收器不是 return 每个流元素的值,它 return 是每个整个流的值。