我可以在 Spark 中动态控制 map 函数的并发吗?

Can I dynamically control concurrency of a map function in Spark?

我正在应用 map 函数对我的数据执行一些 ETL。此功能通常非常快速,并且由于数据分布良好,因此创建了足够多的任务,因此我得到了很好且统一的利用率。

问题是 map 函数将在某些数据组合上成为 I/O 绑定。发生的情况是,通常触发数据将出现在单个块上(它们按顺序到达),因此被单个 node/task 拾取。然后发生的事情是处理 100GB 需要 5-6 秒,而处理单个块(在 MapR 中为 256mb)需要 20 分钟,因为它是由单个线程执行的。

有没有办法只为这个块增加并行化? 在这种情况下,人们通常会做什么?

到目前为止我确定的选项(我将其描述为解决方法)是:

  1. spark.default.parallelism :这将影响全局执行并导致次优的整体时间。尽管文档声明这是混洗操作的并行性,但我观察到它也会影响 map 并行性。您能否详细说明内部发生的情况?这会覆盖块的处理方式吗?
  2. spark.task.cpus :这太粗粒度了,它会再次影响全局执行特性。
  3. map 函数内使用 fork/join,并在检测到 I/O 绑定延迟时委托给 ExecutorService:这会使事情变得复杂,并从框架中获取资源控制将在难以解决的糟糕情况下实现。
  4. sc.textFile("theFile.txt", 100) :这将影响我的主要 RDD(即 100GB)和整个集合的后续 transformations/actions。 tahn 1 好一点,但仍然不理想(根据 pzecevic 的回答更新)

您可以在要应用映射转换的 RDD 上设置并行度。

rdd.repartition(100)

我不知道您是如何创建 RDD 的,但有时您可以在创建 RDD 时指定并行度:

sc.textFile("theFile.txt", 100)

这将直接影响映射任务的数量(在本例中为 100)。