如何解决 akka.stream.Graph sink error akka stream using akka typed

how to resolve akka.stream.Graph sink error akka stream using akka typed

我是 akka 流的新手,我输入的 akka 代码在经典 akka 中运行良好 这就是我正在做的

class FlowActor extends Actor{
  val log = LoggerFactory.getLogger(this.getClass)

  def receive: Receive = {
    case word: String =>
      val replyTo = word.capitalize
      log.info("Actor {}, Capitalizing {}",context.self.path, replyTo)
      sender() ! replyTo
  }
}

val source = Source(List("hello","from","akka","streams!"))
val flowActor = system.actorOf(RoundRobinPool(10).props(Props[FlowActor]), "flowActor")
val flow = Flow[String].mapAsync(parallelism = 5)(elem => (flowActor ? elem).mapTo[String])
// sent from actor to stream to "ack" processing of given element
    val AckMessage = AckingReceiver.Ack

    // sent from stream to actor to indicate start, end or failure of stream:
    val InitMessage = AckingReceiver.StreamInitialized
    val OnCompleteMessage = AckingReceiver.StreamCompleted
    val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex)

    val receiver = system.actorOf(Props(new SimpleAckingReceiver(AckMessage)), name = "recevier")

    val sink = Sink.actorRefWithBackpressure(
      receiver,
      onInitMessage = InitMessage,
      ackMessage = AckMessage,
      onCompleteMessage = OnCompleteMessage,
      onFailureMessage = onErrorMessage
    )

    val stream = source.via(flow).to(sink)
    stream.run()

在输出中我得到了预期的结果 现在当我从 akka classic 转向 akka actors 并做了这样的事情时

object FlowActor {
  val log = LoggerFactory.getLogger(this.getClass)
  val FlowServiceKey = ServiceKey[FlowActor.TransformText]("FlowActor")

  sealed trait Command
  final case class TransformText(text: String, replyTo: ActorRef[TextTransformed]) extends Command with CborSerializable
  final case class TextTransformed(text: String) extends CborSerializable

  def apply(): Behavior[Command] =
    Behaviors.setup { ctx =>
      // each worker registers themselves with the receptionist
      ctx.log.info("Registering myself with receptionist")
      ctx.system.receptionist ! Receptionist.Register(FlowServiceKey, ctx.self)

      Behaviors.receiveMessage {
        case TransformText(text, replyTo) =>
          replyTo ! TextTransformed(text.capitalize)
          Behaviors.same
      }
    }
}

object SinkActor {

  sealed trait Event
  case object StartWork extends Event
  trait Ack
  object Ack extends Ack

  trait Protocol
  case class Init(ackTo: ActorRef[Ack]) extends Protocol
  case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
  case object Complete extends Protocol
  case class Fail(ex: Throwable) extends Protocol
def apply(actorMaterializer: ActorMaterializer): Behavior[Event] = Behaviors.setup { ctx =>
    implicit val mat = actorMaterializer
    implicit val timeout=Timeout(5, SECONDS)
  Behaviors.receiveMessage {
    case STartWork=>
  val ref = ctx
        .spawn( (FlowActor()) ,"flowActor"
        )
        import FlowActor._
        val askFlow: Flow[String, TextTransformed, NotUsed] =
          ActorFlow.ask(ref)(TransformText.apply)
def targetActor(): ActorRef[Protocol] = ???

        val actor: ActorRef[Protocol] = targetActor()

        val source = Source(List("hello","from","akka","streams!"))

        val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
          ref = actor,
          onCompleteMessage = Complete,
          onFailureMessage = Fail.apply,
          messageAdapter = Message.apply,
          onInitMessage = Init.apply,
          ackMessage = Ack)

        val stream = source.via(askFlow).runWith(sink)
Behaviors.same
    }
  }
  }

最后一行出现编译时错误

found   : akka.stream.scaladsl.Sink[String,akka.NotUsed]
[error]  
required: akka.stream.Graph[akka.stream.SinkShape[sample.cluster.akkastreams.FlowActor.TextTransformed],?]
[error]         val stream = source.via(askFlow).runWith(sink)

请指导我在这里遗漏了什么?

发生这种情况是因为代码试图将发出 FlowActor.TextTransformed 元素的 Flow 连接到需要 String 元素的 Sink。它需要将 TextTransformed 数据转换为 String 对象:

val stream = source.via(askFlow)
    .map(element => element.text) // or equivalently: .map(_.text)
    .runWith(sink)

至少还有两种其他可能的方法可以做到这一点,但我认为上面的例子是最好的方法,至少对于更现实和复杂的应用程序来说是这样。

  1. FlowActor 可以直接回复 String,而不是 TextTransformed,方法是将 TransformText 中的 replyTo 字段的类型更改为 [=22] =].这将更接近于原始 classic 示例的工作方式,但在 actor 协议中使用像 TextTransformed 这样的包装器类型是一个很好的做法,因为它提高了类型安全性并且可以更容易地发展协议.
  2. 接收器可以期望 TextTransformed 个元素而不是 String,方法是让 messageAdaptor 函数句柄从 TextTransformed 对象中解包 String,或者让 Message class 在其 msg 字段中使用 TextTransformed 而不是 String。这种方法的缺点是它将接收器与流紧密耦合。