这种方法会强制并行化 spark 中的 "for" 循环吗?

Will this method force parallelization of "for" loops in spark?

我有一个嵌套的 for 循环,它在 4 次 windows 上执行 10 次迭代。伪代码是这样的:

df=spark.read.parquet("very large dataset")
for i in range(1,5):
    time_window= 5 * i
    for j in range(0,10):
        df_out=[operations performed in 10 different slices of the time window]
        df_out.write.parquet("output path")

我知道每个循环都会 运行 顺序进行,因此 spark 中的并行处理仅限于每个内部循环。因此,对于循环中定义的 40 次总迭代 (4 x 10),spark 分布式计算将仅发生在每个内部循环中定义的操作中。

我想知道我是否修改了代码,手动指定了 outerloop 4 次,如下所示:

df=spark.read.parquet("very large dataset")\
.persist(StorageLevel.MEMORY_AND_DISK)

time_window= 5 * 1
for j in range(0,10):
    df_out1=[operations performed in 10 different slices of the time window]
    df_out1.write.parquet("output path")
    
time_window= 5 * 2
for j in range(0,10):
    df_out2=[operations performed in 10 different slices of the time window]
    df_out2.write.parquet("output path")
    
time_window= 5 * 3
for j in range(0,10):
    df_out3=[operations performed in 10 different slices of the time window]
    df_out3.write.parquet("output path")

time_window= 5 * 4
for j in range(0,10):
    df_out4=[operations performed in 10 different slices of the time window]
    df_out4.write.parquet("output path")    

由于每个外部循环的输出独立于它之前的循环,操作应该能够 运行 并行。 因此,不是 运行 按顺序执行 40 个单独的循环(一次 1 个),而是会在代码修改的情况下一次 运行 4 个吗?

您可以使用 Python 多处理 ThreadPool。你的伪代码是这样的:

from multiprocessing.pool import ThreadPool

# define a function which takes the original DF and a time_window arguments
def do_something(df, time_window):
    for j in range(0, 10):
        df_out = _ #operations performed in 10 different slices of the time window]
        df_out.write.parquet("output path")

# run a pool of 4 processes
with ThreadPool(4) as p:
    args = [(df, 5 * i) for i in range(1, 5)]
    p.starmap(do_something, args)