通过 Scala Spark 并行读取单独的目录并创建单独的 RDD
Read Separate Directories & Create Separate RDDs in Parallel via Scala Spark
我需要从单独的源目录读取 JSON 个文件并为每个目录创建单独的表。我希望并行执行此操作,但 Spark 不支持嵌套 RDD,因此目前它是按顺序执行的。是否有一个很好的解决方案来并行获取这些目录 read/processed?
这是我正在尝试的示例片段,但由于嵌套的 RDD,它不起作用:
def readJsonCreateTable(tableInfo: (String, String)) {
val df = spark
.read
.json(tableInfo._1)
df.createOrReplaceTempView(tableInfo._2)
}
val dirList = List(("/mnt/jsondir1", "temptable1"),
("/mnt/jsondir2", "temptable2"),
("/mnt/jsondir3", "temptable3"))
val dirRDD = sc.parallelize(dirList)
dirRDD.foreach(readJsonCreateTable) // Nested RDD error
将最后一行更改为 dirRDD.collect.foreach 可以,但是工作不会分发并按顺序执行,因此非常慢。
也试过 dirRDD.collect.par.foreach,但它只在驱动程序上运行并行线程,并没有利用所有其他节点。
我研究了 foreachAsync,但由于嵌套,我不确定在这种情况下异步是否一定是并行的。
这是通过 Databricks 使用 Spark 2.0 和 Scala 2.11。
===========
加法:
我尝试了 foreachAsync returns Spark 中的 FutureAction,但也给出了错误。
import scala.concurrent._
import scala.concurrent.duration._
.
.
.
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable)
Await.result(dirFuture, 1 second)
显然 SimpleFutureAction 不可序列化
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction
您可以使用 Scala parallel collections or futures 在 Spark 驱动程序上并行化代码 运行。 Spark 驱动程序是 thread-safe,因此这将按预期工作。
这是一个使用并行集合的示例 explicitly-specified thread-pools:
val dirList = List(
("dbfs:/databricks-datasets/flights/departuredelays.csv", "departuredelays"),
("dbfs:/databricks-datasets/amazon/users/", "users")
).par
val pool = new scala.concurrent.forkjoin.ForkJoinPool(2)
try {
dirList.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(pool)
dirList.foreach { case (filename, tableName) =>
println(s"Starting to create table for $tableName")
val df = spark.read.json(filename)
println(s"Done creating table for $tableName")
df.createOrReplaceTempView(tableName)
}
} finally {
pool.shutdown() // to prevent thread leaks.
// You could also re-use thread pools across collections.
}
当我 运行 在 Databricks 中执行此操作时,它会生成流式日志输出,表明这两个表正在并行加载:
Starting to create table for departuredelays
Starting to create table for users
Done creating table for departuredelays
Done creating table for users
这种并行性也反映在 Spark UI 的作业时间线视图中。
当然,您也可以为此使用 Java 个线程。简而言之,从多个线程调用 Spark 驱动程序 API 是安全的,因此选择您选择的 JVM 并发框架并向 Spark 驱动程序发出并行调用以创建您的表。
我需要从单独的源目录读取 JSON 个文件并为每个目录创建单独的表。我希望并行执行此操作,但 Spark 不支持嵌套 RDD,因此目前它是按顺序执行的。是否有一个很好的解决方案来并行获取这些目录 read/processed?
这是我正在尝试的示例片段,但由于嵌套的 RDD,它不起作用:
def readJsonCreateTable(tableInfo: (String, String)) {
val df = spark
.read
.json(tableInfo._1)
df.createOrReplaceTempView(tableInfo._2)
}
val dirList = List(("/mnt/jsondir1", "temptable1"),
("/mnt/jsondir2", "temptable2"),
("/mnt/jsondir3", "temptable3"))
val dirRDD = sc.parallelize(dirList)
dirRDD.foreach(readJsonCreateTable) // Nested RDD error
将最后一行更改为 dirRDD.collect.foreach 可以,但是工作不会分发并按顺序执行,因此非常慢。
也试过 dirRDD.collect.par.foreach,但它只在驱动程序上运行并行线程,并没有利用所有其他节点。
我研究了 foreachAsync,但由于嵌套,我不确定在这种情况下异步是否一定是并行的。
这是通过 Databricks 使用 Spark 2.0 和 Scala 2.11。
===========
加法:
我尝试了 foreachAsync returns Spark 中的 FutureAction,但也给出了错误。
import scala.concurrent._
import scala.concurrent.duration._
.
.
.
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable)
Await.result(dirFuture, 1 second)
显然 SimpleFutureAction 不可序列化
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction
您可以使用 Scala parallel collections or futures 在 Spark 驱动程序上并行化代码 运行。 Spark 驱动程序是 thread-safe,因此这将按预期工作。
这是一个使用并行集合的示例 explicitly-specified thread-pools:
val dirList = List(
("dbfs:/databricks-datasets/flights/departuredelays.csv", "departuredelays"),
("dbfs:/databricks-datasets/amazon/users/", "users")
).par
val pool = new scala.concurrent.forkjoin.ForkJoinPool(2)
try {
dirList.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(pool)
dirList.foreach { case (filename, tableName) =>
println(s"Starting to create table for $tableName")
val df = spark.read.json(filename)
println(s"Done creating table for $tableName")
df.createOrReplaceTempView(tableName)
}
} finally {
pool.shutdown() // to prevent thread leaks.
// You could also re-use thread pools across collections.
}
当我 运行 在 Databricks 中执行此操作时,它会生成流式日志输出,表明这两个表正在并行加载:
Starting to create table for departuredelays
Starting to create table for users
Done creating table for departuredelays
Done creating table for users
这种并行性也反映在 Spark UI 的作业时间线视图中。
当然,您也可以为此使用 Java 个线程。简而言之,从多个线程调用 Spark 驱动程序 API 是安全的,因此选择您选择的 JVM 并发框架并向 Spark 驱动程序发出并行调用以创建您的表。