从 Spark 作业中的 Akka Stream 写入 kafka

Akka Stream from within a Spark Job to write into kafka

愿意最有效地将数据写回kafka,我有兴趣使用Akka Stream将我的RDD分区写回Kafka。

问题是我需要一种方法来为每个执行者而不是每个分区创建一个 actor 系统,这很荒谬。一个 JVM 上的一个节点上可能最终有 8 个 actorSystem。但是,每个分区都有一个 Stream 很好。

有人做过吗?

My understanding, an actor system can't be serialized, hence can't be sent has broadcast variable which would be per executor.

如果有人有过解决该问题的经验并经过测试,请分享一下?

否则我总是可以回退到 https://index.scala-lang.org/benfradet/spark-kafka-writer/spark-kafka-0-10-writer/0.3.0?target=_2.11 但我不确定这是最有效的方法。

您始终可以使用 actor 系统定义全局惰性 val:

object Execution {
  implicit lazy val actorSystem: ActorSystem = ActorSystem()
  implicit lazy val materializer: Materializer = ActorMaterializer()
}

然后你只需将它导入任何你想使用 Akka Streams 的类:

import Execution._

val stream: DStream[...] = ...

stream.foreachRDD { rdd =>
  ...
  rdd.foreachPartition { records =>
    val (queue, done) = Source.queue(...)
      .via(Producer.flow(...))
      .toMat(Sink.ignore)(Keep.both)
      .run()  // implicitly pulls `Execution.materializer` from scope,
              // which in turn will initialize `Execution.actorSystem`

    ... // push records to the queue

    // wait until the stream is completed
    Await.result(done, 10.minutes)
  }
}

以上是一种伪代码,但我认为它应该传达了一般思想。

这样,系统将在每个执行程序 JVM 上仅在需要时初始化一次。此外,您可以使 actor 系统 "daemonic" 以便它在 JVM 完成时自动关闭:

object Execution {
  private lazy val config = ConfigFactory.parseString("akka.daemonic = on")
    .withFallback(ConfigFactory.load())
  implicit lazy val actorSystem: ActorSystem = ActorSystem("system", config)
  implicit lazy val materializer: Materializer = ActorMaterializer()
}

我们正在我们的 Spark 作业中执行此操作,并且运行完美。

这在没有任何类型的广播变量的情况下工作,并且自然地可以用于各种 Spark 作业、流或其他。因为系统是在单例对象中定义的,所以保证每个 JVM 实例只初始化一次(模数各种类加载器恶作剧,但在 Spark 的上下文中并不重要),因此即使一些分区被放置在同一个 JVM 上(可能在不同的线程中),它只会初始化 actor 系统一次。 lazy val保证了初始化的线程安全,而ActorSystem是线程安全的,所以也不会出现这方面的问题。