无法反序列化 ActorRef 以将结果发送到不同的 Actor

Unable to deserialize ActorRef to send result to different Actor

我开始使用 Spark Streaming 来处理我收到的实时数据馈送。我的场景是我有一个使用 "with ActorHelper" 的 Akka actor 接收器,然后我让我的 Spark 作业进行一些映射和转换,然后我想将结果发送给另一个 actor。

我的问题是最后一部分。尝试发送给另一个演员时,Spark 引发异常:

15/02/20 16:43:16 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.IllegalStateException: Trying to deserialize a serialized ActorRef without an ActorSystem in scope. Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'

我创建最后一个演员的方式如下:

val actorSystem = SparkEnv.get.actorSystem
val lastActor = actorSystem.actorOf(MyLastActor.props(someParam), "MyLastActor")

然后像这样使用它:

result.foreachRDD(rdd => rdd.foreachPartition(lastActor ! _))

我不确定在哪里或如何提出建议 "Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'"。我需要通过配置设置什么特别的东西吗?或者以不同的方式创建我的演员?

查看以下示例以访问 Spark 域之外的 actor。

/* * 以下是使用actorStream插入自定义actor作为receiver * * 需要注意的重要一点: * 由于 Actor 可能存在于 spark 框架之外,因此是用户的责任 * 确保类型安全,即接收到的数据类型和 InputDstream * 应该相同。 * * 例如:actorStream和SampleActorReceiver都是参数化的 * 相同类型以确保类型安全。 */

val lines = ssc.actorStream[String](
  Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
    host, port.toInt))), "SampleReceiver")

我发现如果我在发送给演员之前收集它就像一个魅力:

result.foreachRDD(rdd =>  rdd.collect().foreach(producer ! _))