如何精确地显式控制Spark的任务调度?
How to explicitly control task schedule of Spark exactly?
我尝试使用 Spark 实现并行化图像处理技术。不同于传统的 Spark 处理数百万个任务。我只想将图像分成我拥有的工人(机器)的数量,并让一名工人处理一个图像补丁。所以一个图像补丁就是一个任务,如果我有 12 个图像补丁,我就有 12 个任务。问题是如何明确控制每个工人的任务时间表。目前的情况是,如果我并行化图像补丁,他们通常会向一两个工作人员发送多个补丁,而让其他人无法工作。我试图设置spark的系统属性来控制spark.cores.max和spark.default.parallelism。但这似乎没有帮助。使任务尽可能分开发送给不同工作人员的唯一方法是扩大 SparkContext.parallelize 的第二个参数 - numSlices。这是代码:
img = misc.imread('test_.bmp')
height, width = img.shape
divisions, patch_width, patch_height = partitionParameters(width, height, 2, 2, border=100)
spark = SparkContext(appName="Miner")
# spark.setSystemProperty('spark.cores.max','1')
spark.setSystemProperty('spark.default.parallelism','24')
broadcast_img = spark.broadcast(img)
start = datetime.now()
print "--------------------", divisions
# run spark
run = spark.parallelize(divisions, 24).cache()
print "--------------- RDD size: ", run._jrdd.splits().size()
result = run.map(lambda (x, y): crop_sub_img(broadcast_img.value, x, y, patch_width, patch_height, width, height)) \
.map(lambda ((x, y), subimg): fastSeg.process(subimg, x, y)) \
.collect()
img = cat_sub_img(result, width, height)
end = datetime.now()
print "time cost:", (end-start)
如你所见,我只有四个补丁集。 divisions 是一个包含图像块 x 轴和 y 轴的元组列表。只是我把numSlices设置为一个很高的值24,远远超过了我在部门的实际任务,现在大部分工人都在使用。但似乎不太合理。如果我设置为 4,它会将所有任务发送给一个工人!必须有某种方式来控制一个工人接受多少任务。我不熟悉 Spark 的核心。谁能帮帮我,谢谢?
有人认为它的发生是图像尺寸对于一个工作人员来说太小了。所以 spark 会假设一个工人可以处理这个并将所有的都发送给一个。
一机多核。 Spark 在您的工作内核之间分配工作,因为它们可以并行执行工作。如果你有 12 台机器,每台机器有 4 个内核,那么你总共有 48 个内核。你应该把图像分成 48 个补丁,这样每个核心都有事做。如果你把imagine分成12个补丁,只有12个核心有事做,其他36个核心就被浪费了。
如果您的图像处理算法有自己的多线程处理,则例外。在这种情况下,您应该在机器上启动 1-core worker。工人每次只会接 1 个任务,你可以随心所欲地进行多线程。如果您 运行 一个独立的 Spark 集群,您可以为此在 worker 上设置 --cores 1
。 (参见documentation。)
我尝试使用 Spark 实现并行化图像处理技术。不同于传统的 Spark 处理数百万个任务。我只想将图像分成我拥有的工人(机器)的数量,并让一名工人处理一个图像补丁。所以一个图像补丁就是一个任务,如果我有 12 个图像补丁,我就有 12 个任务。问题是如何明确控制每个工人的任务时间表。目前的情况是,如果我并行化图像补丁,他们通常会向一两个工作人员发送多个补丁,而让其他人无法工作。我试图设置spark的系统属性来控制spark.cores.max和spark.default.parallelism。但这似乎没有帮助。使任务尽可能分开发送给不同工作人员的唯一方法是扩大 SparkContext.parallelize 的第二个参数 - numSlices。这是代码:
img = misc.imread('test_.bmp')
height, width = img.shape
divisions, patch_width, patch_height = partitionParameters(width, height, 2, 2, border=100)
spark = SparkContext(appName="Miner")
# spark.setSystemProperty('spark.cores.max','1')
spark.setSystemProperty('spark.default.parallelism','24')
broadcast_img = spark.broadcast(img)
start = datetime.now()
print "--------------------", divisions
# run spark
run = spark.parallelize(divisions, 24).cache()
print "--------------- RDD size: ", run._jrdd.splits().size()
result = run.map(lambda (x, y): crop_sub_img(broadcast_img.value, x, y, patch_width, patch_height, width, height)) \
.map(lambda ((x, y), subimg): fastSeg.process(subimg, x, y)) \
.collect()
img = cat_sub_img(result, width, height)
end = datetime.now()
print "time cost:", (end-start)
如你所见,我只有四个补丁集。 divisions 是一个包含图像块 x 轴和 y 轴的元组列表。只是我把numSlices设置为一个很高的值24,远远超过了我在部门的实际任务,现在大部分工人都在使用。但似乎不太合理。如果我设置为 4,它会将所有任务发送给一个工人!必须有某种方式来控制一个工人接受多少任务。我不熟悉 Spark 的核心。谁能帮帮我,谢谢?
有人认为它的发生是图像尺寸对于一个工作人员来说太小了。所以 spark 会假设一个工人可以处理这个并将所有的都发送给一个。
一机多核。 Spark 在您的工作内核之间分配工作,因为它们可以并行执行工作。如果你有 12 台机器,每台机器有 4 个内核,那么你总共有 48 个内核。你应该把图像分成 48 个补丁,这样每个核心都有事做。如果你把imagine分成12个补丁,只有12个核心有事做,其他36个核心就被浪费了。
如果您的图像处理算法有自己的多线程处理,则例外。在这种情况下,您应该在机器上启动 1-core worker。工人每次只会接 1 个任务,你可以随心所欲地进行多线程。如果您 运行 一个独立的 Spark 集群,您可以为此在 worker 上设置 --cores 1
。 (参见documentation。)