如何解决 akka.stream.Graph sink error akka stream using akka typed
how to resolve akka.stream.Graph sink error akka stream using akka typed
我是 akka 流的新手,我输入的 akka 代码在经典 akka 中运行良好
这就是我正在做的
class FlowActor extends Actor{
val log = LoggerFactory.getLogger(this.getClass)
def receive: Receive = {
case word: String =>
val replyTo = word.capitalize
log.info("Actor {}, Capitalizing {}",context.self.path, replyTo)
sender() ! replyTo
}
}
val source = Source(List("hello","from","akka","streams!"))
val flowActor = system.actorOf(RoundRobinPool(10).props(Props[FlowActor]), "flowActor")
val flow = Flow[String].mapAsync(parallelism = 5)(elem => (flowActor ? elem).mapTo[String])
// sent from actor to stream to "ack" processing of given element
val AckMessage = AckingReceiver.Ack
// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = AckingReceiver.StreamInitialized
val OnCompleteMessage = AckingReceiver.StreamCompleted
val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex)
val receiver = system.actorOf(Props(new SimpleAckingReceiver(AckMessage)), name = "recevier")
val sink = Sink.actorRefWithBackpressure(
receiver,
onInitMessage = InitMessage,
ackMessage = AckMessage,
onCompleteMessage = OnCompleteMessage,
onFailureMessage = onErrorMessage
)
val stream = source.via(flow).to(sink)
stream.run()
在输出中我得到了预期的结果
现在当我从 akka classic 转向 akka actors 并做了这样的事情时
object FlowActor {
val log = LoggerFactory.getLogger(this.getClass)
val FlowServiceKey = ServiceKey[FlowActor.TransformText]("FlowActor")
sealed trait Command
final case class TransformText(text: String, replyTo: ActorRef[TextTransformed]) extends Command with CborSerializable
final case class TextTransformed(text: String) extends CborSerializable
def apply(): Behavior[Command] =
Behaviors.setup { ctx =>
// each worker registers themselves with the receptionist
ctx.log.info("Registering myself with receptionist")
ctx.system.receptionist ! Receptionist.Register(FlowServiceKey, ctx.self)
Behaviors.receiveMessage {
case TransformText(text, replyTo) =>
replyTo ! TextTransformed(text.capitalize)
Behaviors.same
}
}
}
object SinkActor {
sealed trait Event
case object StartWork extends Event
trait Ack
object Ack extends Ack
trait Protocol
case class Init(ackTo: ActorRef[Ack]) extends Protocol
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol
def apply(actorMaterializer: ActorMaterializer): Behavior[Event] = Behaviors.setup { ctx =>
implicit val mat = actorMaterializer
implicit val timeout=Timeout(5, SECONDS)
Behaviors.receiveMessage {
case STartWork=>
val ref = ctx
.spawn( (FlowActor()) ,"flowActor"
)
import FlowActor._
val askFlow: Flow[String, TextTransformed, NotUsed] =
ActorFlow.ask(ref)(TransformText.apply)
def targetActor(): ActorRef[Protocol] = ???
val actor: ActorRef[Protocol] = targetActor()
val source = Source(List("hello","from","akka","streams!"))
val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
ref = actor,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
val stream = source.via(askFlow).runWith(sink)
Behaviors.same
}
}
}
最后一行出现编译时错误
found : akka.stream.scaladsl.Sink[String,akka.NotUsed]
[error]
required: akka.stream.Graph[akka.stream.SinkShape[sample.cluster.akkastreams.FlowActor.TextTransformed],?]
[error] val stream = source.via(askFlow).runWith(sink)
我
请指导我在这里遗漏了什么?
发生这种情况是因为代码试图将发出 FlowActor.TextTransformed
元素的 Flow
连接到需要 String
元素的 Sink
。它需要将 TextTransformed
数据转换为 String
对象:
val stream = source.via(askFlow)
.map(element => element.text) // or equivalently: .map(_.text)
.runWith(sink)
至少还有两种其他可能的方法可以做到这一点,但我认为上面的例子是最好的方法,至少对于更现实和复杂的应用程序来说是这样。
FlowActor
可以直接回复 String
,而不是 TextTransformed
,方法是将 TransformText
中的 replyTo
字段的类型更改为 [=22] =].这将更接近于原始 classic 示例的工作方式,但在 actor 协议中使用像 TextTransformed
这样的包装器类型是一个很好的做法,因为它提高了类型安全性并且可以更容易地发展协议.
- 接收器可以期望
TextTransformed
个元素而不是 String
,方法是让 messageAdaptor
函数句柄从 TextTransformed
对象中解包 String
,或者让 Message
class 在其 msg
字段中使用 TextTransformed
而不是 String
。这种方法的缺点是它将接收器与流紧密耦合。
我是 akka 流的新手,我输入的 akka 代码在经典 akka 中运行良好 这就是我正在做的
class FlowActor extends Actor{
val log = LoggerFactory.getLogger(this.getClass)
def receive: Receive = {
case word: String =>
val replyTo = word.capitalize
log.info("Actor {}, Capitalizing {}",context.self.path, replyTo)
sender() ! replyTo
}
}
val source = Source(List("hello","from","akka","streams!"))
val flowActor = system.actorOf(RoundRobinPool(10).props(Props[FlowActor]), "flowActor")
val flow = Flow[String].mapAsync(parallelism = 5)(elem => (flowActor ? elem).mapTo[String])
// sent from actor to stream to "ack" processing of given element
val AckMessage = AckingReceiver.Ack
// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = AckingReceiver.StreamInitialized
val OnCompleteMessage = AckingReceiver.StreamCompleted
val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex)
val receiver = system.actorOf(Props(new SimpleAckingReceiver(AckMessage)), name = "recevier")
val sink = Sink.actorRefWithBackpressure(
receiver,
onInitMessage = InitMessage,
ackMessage = AckMessage,
onCompleteMessage = OnCompleteMessage,
onFailureMessage = onErrorMessage
)
val stream = source.via(flow).to(sink)
stream.run()
在输出中我得到了预期的结果 现在当我从 akka classic 转向 akka actors 并做了这样的事情时
object FlowActor {
val log = LoggerFactory.getLogger(this.getClass)
val FlowServiceKey = ServiceKey[FlowActor.TransformText]("FlowActor")
sealed trait Command
final case class TransformText(text: String, replyTo: ActorRef[TextTransformed]) extends Command with CborSerializable
final case class TextTransformed(text: String) extends CborSerializable
def apply(): Behavior[Command] =
Behaviors.setup { ctx =>
// each worker registers themselves with the receptionist
ctx.log.info("Registering myself with receptionist")
ctx.system.receptionist ! Receptionist.Register(FlowServiceKey, ctx.self)
Behaviors.receiveMessage {
case TransformText(text, replyTo) =>
replyTo ! TextTransformed(text.capitalize)
Behaviors.same
}
}
}
object SinkActor {
sealed trait Event
case object StartWork extends Event
trait Ack
object Ack extends Ack
trait Protocol
case class Init(ackTo: ActorRef[Ack]) extends Protocol
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol
def apply(actorMaterializer: ActorMaterializer): Behavior[Event] = Behaviors.setup { ctx =>
implicit val mat = actorMaterializer
implicit val timeout=Timeout(5, SECONDS)
Behaviors.receiveMessage {
case STartWork=>
val ref = ctx
.spawn( (FlowActor()) ,"flowActor"
)
import FlowActor._
val askFlow: Flow[String, TextTransformed, NotUsed] =
ActorFlow.ask(ref)(TransformText.apply)
def targetActor(): ActorRef[Protocol] = ???
val actor: ActorRef[Protocol] = targetActor()
val source = Source(List("hello","from","akka","streams!"))
val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
ref = actor,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
val stream = source.via(askFlow).runWith(sink)
Behaviors.same
}
}
}
最后一行出现编译时错误
found : akka.stream.scaladsl.Sink[String,akka.NotUsed]
[error]
required: akka.stream.Graph[akka.stream.SinkShape[sample.cluster.akkastreams.FlowActor.TextTransformed],?]
[error] val stream = source.via(askFlow).runWith(sink)
我
请指导我在这里遗漏了什么?
发生这种情况是因为代码试图将发出 FlowActor.TextTransformed
元素的 Flow
连接到需要 String
元素的 Sink
。它需要将 TextTransformed
数据转换为 String
对象:
val stream = source.via(askFlow)
.map(element => element.text) // or equivalently: .map(_.text)
.runWith(sink)
至少还有两种其他可能的方法可以做到这一点,但我认为上面的例子是最好的方法,至少对于更现实和复杂的应用程序来说是这样。
FlowActor
可以直接回复String
,而不是TextTransformed
,方法是将TransformText
中的replyTo
字段的类型更改为 [=22] =].这将更接近于原始 classic 示例的工作方式,但在 actor 协议中使用像TextTransformed
这样的包装器类型是一个很好的做法,因为它提高了类型安全性并且可以更容易地发展协议.- 接收器可以期望
TextTransformed
个元素而不是String
,方法是让messageAdaptor
函数句柄从TextTransformed
对象中解包String
,或者让Message
class 在其msg
字段中使用TextTransformed
而不是String
。这种方法的缺点是它将接收器与流紧密耦合。