Akka-stream 和委托处理给演员

Akka-stream and delegating processing to an actor

我有以下情况,我试图将处理委托给演员。我想要发生的是,每当我的流程处理消息时,它都会将其发送给演员,演员会将其大写并将其作为响应写入流。

所以我应该能够连接到端口 8000,输入 "hello",让流将它发送给演员,然后让演员将它发布回流,这样它就会以大写形式回显给我. Actor 本身非常基础,来自文档中的 ActorPublisher 示例。

我知道这段代码不起作用,我清理了我的实验以使其能够编译。现在,它只是两个独立的流。我尝试尝试合并源或汇,但无济于事。

object Sample  {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("sample")
    implicit val materializer = ActorMaterializer()

    val connections: Source[IncomingConnection, 
        Future[ServerBinding]] = Tcp().bind("localhost", 8000)
    val filter = Source.actorPublisher[ByteString](Props[Filter])

    val filterRef = Flow[ByteString]
      .to(Sink.ignore)
      .runWith(filter)

    connections runForeach { conn =>
      val echo = Flow[ByteString] .map {


        // would like to send 'p' to the actor, 
        // and have it publish to the stream
        case p:ByteString => filterRef ! p
      }
    }
  }
}

// this actor is supposed to simply uppercase all 
// input and write it to the stream
class Filter extends ActorPublisher[ByteString] with Actor
{
  var buf = Vector.empty[ByteString]
  val delay = 0

  def receive = {
    case p: ByteString =>
      if (buf.isEmpty && totalDemand > 0)
        onNext(p)
      else {
        buf :+= ByteString(p.utf8String.toUpperCase)
        deliverBuf()
      }
    case Request(_) =>
      deliverBuf()
    case Cancel =>
      context.stop(self)
  }


   @tailrec final def deliverBuf(): Unit =
    if (totalDemand > 0) {
      if (totalDemand <= Int.MaxValue) {
        val (use, keep) = buf.splitAt(totalDemand.toInt)
        buf = keep
        use foreach onNext
      } else {
        val (use, keep) = buf.splitAt(Int.MaxValue)
        buf = keep
        use foreach onNext
        deliverBuf()
      }
    }
}

我以前也遇到过这个问题,绕弯子解决了,希望你能接受。本质上,它涉及创建一个接收器,该接收器立即将它收到的消息转发给 src actor。

当然,您可以使用直接流(将其注释掉),但我想这不是本练习的重点:)

object Sample  {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("sample")
    implicit val materializer = ActorMaterializer()

    val connections: Source[IncomingConnection,
      Future[ServerBinding]] = Tcp().bind("localhost", 8000)

    def filterProps = Props[Filter]

    connections runForeach { conn =>
      val actorRef = system.actorOf(filterProps)
      val snk = Sink.foreach[ByteString]{s => actorRef ! s}
      val src = Source.fromPublisher(ActorPublisher[ByteString](actorRef))
      conn.handleWith(Flow.fromSinkAndSource(snk, src))

//      conn.handleWith(Flow[ByteString].map(s => ByteString(s.utf8String.toUpperCase())))
    }
  }
}

// this actor is supposed to simply uppercase all
// input and write it to the stream
class Filter extends ActorPublisher[ByteString]
{
  import akka.stream.actor.ActorPublisherMessage._
  var buf = mutable.Queue.empty[String]
  val delay = 0

  def receive = {
    case p: ByteString =>
      buf += p.utf8String.toUpperCase
      deliverBuf()
    case Request(n) =>
      deliverBuf()
    case Cancel =>
      context.stop(self)
  }

  def deliverBuf(): Unit = {
    while (totalDemand > 0 && buf.nonEmpty) {
      val s = ByteString(buf.dequeue() + "\n")
      onNext(s)
    }
  }