如何并行化多个apache spark rdds?
How to parallelize several apache spark rdds?
我有下一个代码:
sc.parquetFile("some large parquet file with bc").registerTempTable("bcs")
sc.parquetFile("some large parquet file with imps").registerTempTable("imps")
val bcs = sc.sql("select * from bcs")
val imps = sc.sql("select * from imps")
我想做:
bcs.map(x => wrapBC(x)).collect
imps.map(x => wrapIMP(x)).collect
但是当我这样做时,运行 不是异步的。我可以用 Future 来做,就像那样:
val bcsFuture = Future { bcs.map(x => wrapBC(x)).collect }
val impsFuture = Future { imps.map(x => wrapIMP(x)).collect }
val result = for {
bcs <- bcsFuture
imps <- impsFuture
} yield (bcs, imps)
Await.result(result, Duration.Inf) //this return (Array[Bc], Array[Imp])
我想在没有Future的情况下做这个,怎么办?
更新: 我误解了问题。想要的结果不是笛卡尔积 Array[(Bc, Imp)]
.
但我认为单个 map
调用花费多长时间并不重要,因为只要您添加其他转换,Spark 就会尝试以有效的方式组合它们。只要你只在 RDD 上链接 transformations,数据就不会发生任何事情。当您最终应用 action 时,执行引擎将找出一种方法来生成请求的数据。
所以我的建议是不要过多考虑中间步骤并尽可能避免collect
,因为它会将所有数据提取到驱动程序。
看来您正在自己构建笛卡尔积。请尝试 cartesian
:
val bc = bcs.map(x => wrapBC(x))
val imp = imps.map(x => wrapIMP(x))
val result = bc.cartesian(imp).collect
请注意,collect
在最终 RDD 上调用,不再在中间结果上调用。
更新 这最初是在问题更新之前写的。鉴于这些更新,我同意 在这种情况下使用 cartesian
。
确实存在数量有限的操作,这些操作将为 RDD[A]
生成 FutureAction[A]
并在后台执行。这些在 AsyncRDDActions class 上可用,只要您导入 SparkContext._
任何 RDD
都可以根据需要隐式转换为 AysnchRDDAction
。对于您的特定代码示例:
bcs.map(x => wrapBC(x)).collectAsync
imps.map(x => wrapIMP(x)).collectAsync
除了在后台评估 DAG 以采取行动外,生成的 FutureAction
还具有 cancel
方法来尝试提前结束处理。
警告
这可能与您认为的不同。如果目的是从两个来源获取数据然后将它们结合起来,您更有可能想要加入或分组RDDs 代替。为此,您可以查看 PairRDDFunctions 中可用的函数,同样可以通过隐式转换在 RDD 上使用。
如果不打算让数据图进行交互,那么根据我的经验,运行并发批处理可能只会减慢两者的速度,尽管这可能是集群如何处理的结果已配置。如果资源管理器设置为让每个执行阶段以 FIFO 顺序垄断集群(我相信独立模式和 YARN 模式下的默认设置;我不确定 Mesos),那么每个异步收集将与每个竞争其他为那个垄断,运行他们的任务,然后再争夺下一个执行阶段。
将此与使用 Future
包装对下游服务或数据库的阻塞调用进行比较,例如,其中所讨论的资源是完全独立的,或者通常具有足够的资源容量来并行处理多个请求而无需争论.
您可以使用 union
来解决这个问题。例如:
bcs.map(x => wrapBC(x).asInstanceOf[Any])
imps.map(x => wrapIMP(x).asInstanceOf[Any])
val result = (bcs union imps).collect()
val bcsResult = result collect { case bc: Bc => bc }
val impsResult = result collect { case imp: Imp => imp }
如果要使用sortBy或其他操作,可以使用trait或main的继承class。
我有下一个代码:
sc.parquetFile("some large parquet file with bc").registerTempTable("bcs")
sc.parquetFile("some large parquet file with imps").registerTempTable("imps")
val bcs = sc.sql("select * from bcs")
val imps = sc.sql("select * from imps")
我想做:
bcs.map(x => wrapBC(x)).collect
imps.map(x => wrapIMP(x)).collect
但是当我这样做时,运行 不是异步的。我可以用 Future 来做,就像那样:
val bcsFuture = Future { bcs.map(x => wrapBC(x)).collect }
val impsFuture = Future { imps.map(x => wrapIMP(x)).collect }
val result = for {
bcs <- bcsFuture
imps <- impsFuture
} yield (bcs, imps)
Await.result(result, Duration.Inf) //this return (Array[Bc], Array[Imp])
我想在没有Future的情况下做这个,怎么办?
更新: 我误解了问题。想要的结果不是笛卡尔积 Array[(Bc, Imp)]
.
但我认为单个 map
调用花费多长时间并不重要,因为只要您添加其他转换,Spark 就会尝试以有效的方式组合它们。只要你只在 RDD 上链接 transformations,数据就不会发生任何事情。当您最终应用 action 时,执行引擎将找出一种方法来生成请求的数据。
所以我的建议是不要过多考虑中间步骤并尽可能避免collect
,因为它会将所有数据提取到驱动程序。
看来您正在自己构建笛卡尔积。请尝试 cartesian
:
val bc = bcs.map(x => wrapBC(x))
val imp = imps.map(x => wrapIMP(x))
val result = bc.cartesian(imp).collect
请注意,collect
在最终 RDD 上调用,不再在中间结果上调用。
更新 这最初是在问题更新之前写的。鉴于这些更新,我同意 cartesian
。
确实存在数量有限的操作,这些操作将为 RDD[A]
生成 FutureAction[A]
并在后台执行。这些在 AsyncRDDActions class 上可用,只要您导入 SparkContext._
任何 RDD
都可以根据需要隐式转换为 AysnchRDDAction
。对于您的特定代码示例:
bcs.map(x => wrapBC(x)).collectAsync
imps.map(x => wrapIMP(x)).collectAsync
除了在后台评估 DAG 以采取行动外,生成的 FutureAction
还具有 cancel
方法来尝试提前结束处理。
警告
这可能与您认为的不同。如果目的是从两个来源获取数据然后将它们结合起来,您更有可能想要加入或分组RDDs 代替。为此,您可以查看 PairRDDFunctions 中可用的函数,同样可以通过隐式转换在 RDD 上使用。
如果不打算让数据图进行交互,那么根据我的经验,运行并发批处理可能只会减慢两者的速度,尽管这可能是集群如何处理的结果已配置。如果资源管理器设置为让每个执行阶段以 FIFO 顺序垄断集群(我相信独立模式和 YARN 模式下的默认设置;我不确定 Mesos),那么每个异步收集将与每个竞争其他为那个垄断,运行他们的任务,然后再争夺下一个执行阶段。
将此与使用 Future
包装对下游服务或数据库的阻塞调用进行比较,例如,其中所讨论的资源是完全独立的,或者通常具有足够的资源容量来并行处理多个请求而无需争论.
您可以使用 union
来解决这个问题。例如:
bcs.map(x => wrapBC(x).asInstanceOf[Any])
imps.map(x => wrapIMP(x).asInstanceOf[Any])
val result = (bcs union imps).collect()
val bcsResult = result collect { case bc: Bc => bc }
val impsResult = result collect { case imp: Imp => imp }
如果要使用sortBy或其他操作,可以使用trait或main的继承class。