无法反序列化 ActorRef 以将结果发送到不同的 Actor
Unable to deserialize ActorRef to send result to different Actor
我开始使用 Spark Streaming 来处理我收到的实时数据馈送。我的场景是我有一个使用 "with ActorHelper" 的 Akka actor 接收器,然后我让我的 Spark 作业进行一些映射和转换,然后我想将结果发送给另一个 actor。
我的问题是最后一部分。尝试发送给另一个演员时,Spark 引发异常:
15/02/20 16:43:16 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.IllegalStateException: Trying to deserialize a serialized ActorRef without an ActorSystem in scope. Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'
我创建最后一个演员的方式如下:
val actorSystem = SparkEnv.get.actorSystem
val lastActor = actorSystem.actorOf(MyLastActor.props(someParam), "MyLastActor")
然后像这样使用它:
result.foreachRDD(rdd => rdd.foreachPartition(lastActor ! _))
我不确定在哪里或如何提出建议 "Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'"。我需要通过配置设置什么特别的东西吗?或者以不同的方式创建我的演员?
查看以下示例以访问 Spark 域之外的 actor。
/*
* 以下是使用actorStream插入自定义actor作为receiver
*
* 需要注意的重要一点:
* 由于 Actor 可能存在于 spark 框架之外,因此是用户的责任
* 确保类型安全,即接收到的数据类型和 InputDstream
* 应该相同。
*
* 例如:actorStream和SampleActorReceiver都是参数化的
* 相同类型以确保类型安全。
*/
val lines = ssc.actorStream[String](
Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver")
我发现如果我在发送给演员之前收集它就像一个魅力:
result.foreachRDD(rdd => rdd.collect().foreach(producer ! _))
我开始使用 Spark Streaming 来处理我收到的实时数据馈送。我的场景是我有一个使用 "with ActorHelper" 的 Akka actor 接收器,然后我让我的 Spark 作业进行一些映射和转换,然后我想将结果发送给另一个 actor。
我的问题是最后一部分。尝试发送给另一个演员时,Spark 引发异常:
15/02/20 16:43:16 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.IllegalStateException: Trying to deserialize a serialized ActorRef without an ActorSystem in scope. Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'
我创建最后一个演员的方式如下:
val actorSystem = SparkEnv.get.actorSystem
val lastActor = actorSystem.actorOf(MyLastActor.props(someParam), "MyLastActor")
然后像这样使用它:
result.foreachRDD(rdd => rdd.foreachPartition(lastActor ! _))
我不确定在哪里或如何提出建议 "Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'"。我需要通过配置设置什么特别的东西吗?或者以不同的方式创建我的演员?
查看以下示例以访问 Spark 域之外的 actor。
/* * 以下是使用actorStream插入自定义actor作为receiver * * 需要注意的重要一点: * 由于 Actor 可能存在于 spark 框架之外,因此是用户的责任 * 确保类型安全,即接收到的数据类型和 InputDstream * 应该相同。 * * 例如:actorStream和SampleActorReceiver都是参数化的 * 相同类型以确保类型安全。 */
val lines = ssc.actorStream[String](
Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver")
我发现如果我在发送给演员之前收集它就像一个魅力:
result.foreachRDD(rdd => rdd.collect().foreach(producer ! _))