从 Spark 作业中的 Akka Stream 写入 kafka
Akka Stream from within a Spark Job to write into kafka
愿意最有效地将数据写回kafka,我有兴趣使用Akka Stream将我的RDD分区写回Kafka。
问题是我需要一种方法来为每个执行者而不是每个分区创建一个 actor 系统,这很荒谬。一个 JVM 上的一个节点上可能最终有 8 个 actorSystem。但是,每个分区都有一个 Stream 很好。
有人做过吗?
My understanding, an actor system can't be serialized, hence can't be
sent has broadcast variable which would be per executor.
如果有人有过解决该问题的经验并经过测试,请分享一下?
否则我总是可以回退到 https://index.scala-lang.org/benfradet/spark-kafka-writer/spark-kafka-0-10-writer/0.3.0?target=_2.11 但我不确定这是最有效的方法。
您始终可以使用 actor 系统定义全局惰性 val:
object Execution {
implicit lazy val actorSystem: ActorSystem = ActorSystem()
implicit lazy val materializer: Materializer = ActorMaterializer()
}
然后你只需将它导入任何你想使用 Akka Streams 的类:
import Execution._
val stream: DStream[...] = ...
stream.foreachRDD { rdd =>
...
rdd.foreachPartition { records =>
val (queue, done) = Source.queue(...)
.via(Producer.flow(...))
.toMat(Sink.ignore)(Keep.both)
.run() // implicitly pulls `Execution.materializer` from scope,
// which in turn will initialize `Execution.actorSystem`
... // push records to the queue
// wait until the stream is completed
Await.result(done, 10.minutes)
}
}
以上是一种伪代码,但我认为它应该传达了一般思想。
这样,系统将在每个执行程序 JVM 上仅在需要时初始化一次。此外,您可以使 actor 系统 "daemonic" 以便它在 JVM 完成时自动关闭:
object Execution {
private lazy val config = ConfigFactory.parseString("akka.daemonic = on")
.withFallback(ConfigFactory.load())
implicit lazy val actorSystem: ActorSystem = ActorSystem("system", config)
implicit lazy val materializer: Materializer = ActorMaterializer()
}
我们正在我们的 Spark 作业中执行此操作,并且运行完美。
这在没有任何类型的广播变量的情况下工作,并且自然地可以用于各种 Spark 作业、流或其他。因为系统是在单例对象中定义的,所以保证每个 JVM 实例只初始化一次(模数各种类加载器恶作剧,但在 Spark 的上下文中并不重要),因此即使一些分区被放置在同一个 JVM 上(可能在不同的线程中),它只会初始化 actor 系统一次。 lazy val
保证了初始化的线程安全,而ActorSystem
是线程安全的,所以也不会出现这方面的问题。
愿意最有效地将数据写回kafka,我有兴趣使用Akka Stream将我的RDD分区写回Kafka。
问题是我需要一种方法来为每个执行者而不是每个分区创建一个 actor 系统,这很荒谬。一个 JVM 上的一个节点上可能最终有 8 个 actorSystem。但是,每个分区都有一个 Stream 很好。
有人做过吗?
My understanding, an actor system can't be serialized, hence can't be sent has broadcast variable which would be per executor.
如果有人有过解决该问题的经验并经过测试,请分享一下?
否则我总是可以回退到 https://index.scala-lang.org/benfradet/spark-kafka-writer/spark-kafka-0-10-writer/0.3.0?target=_2.11 但我不确定这是最有效的方法。
您始终可以使用 actor 系统定义全局惰性 val:
object Execution {
implicit lazy val actorSystem: ActorSystem = ActorSystem()
implicit lazy val materializer: Materializer = ActorMaterializer()
}
然后你只需将它导入任何你想使用 Akka Streams 的类:
import Execution._
val stream: DStream[...] = ...
stream.foreachRDD { rdd =>
...
rdd.foreachPartition { records =>
val (queue, done) = Source.queue(...)
.via(Producer.flow(...))
.toMat(Sink.ignore)(Keep.both)
.run() // implicitly pulls `Execution.materializer` from scope,
// which in turn will initialize `Execution.actorSystem`
... // push records to the queue
// wait until the stream is completed
Await.result(done, 10.minutes)
}
}
以上是一种伪代码,但我认为它应该传达了一般思想。
这样,系统将在每个执行程序 JVM 上仅在需要时初始化一次。此外,您可以使 actor 系统 "daemonic" 以便它在 JVM 完成时自动关闭:
object Execution {
private lazy val config = ConfigFactory.parseString("akka.daemonic = on")
.withFallback(ConfigFactory.load())
implicit lazy val actorSystem: ActorSystem = ActorSystem("system", config)
implicit lazy val materializer: Materializer = ActorMaterializer()
}
我们正在我们的 Spark 作业中执行此操作,并且运行完美。
这在没有任何类型的广播变量的情况下工作,并且自然地可以用于各种 Spark 作业、流或其他。因为系统是在单例对象中定义的,所以保证每个 JVM 实例只初始化一次(模数各种类加载器恶作剧,但在 Spark 的上下文中并不重要),因此即使一些分区被放置在同一个 JVM 上(可能在不同的线程中),它只会初始化 actor 系统一次。 lazy val
保证了初始化的线程安全,而ActorSystem
是线程安全的,所以也不会出现这方面的问题。