顺序算法中的 Dask Iteration 比前一个长
Dask Iteration in sequential algorithm is longer than the previous one
我目前正在 Dask 和 Spark 中实现 Dijkstra 算法的一个变体(形式比较目的)用于航空公司起飞,它涉及对图形节点的顺序计算。此外,在每一步中,我都会过滤掉图表(节点)中的一些记录,因为它们由于出发时间而变得不可行。然而,尽管数据帧变小了,但新的迭代比之前的迭代花费的时间更长。我通过将中间结果写入 parquet 在 Spark 中解决了这个问题,但我无法为 Dask 解决它。
我怀疑数据帧在图中的每一步都再次执行,但我无法阻止这种情况的发生。
到目前为止,我尝试了以下方法:
- 使用
persist
(这个最快)。但是,每次迭代 UI 中要完成的任务数量都会增加。例如:迭代 8 显示 x/800,迭代 9 显示 x/900(我使用的是 100 个分区)。
while i < n_nodes:
i += 1
# some computations over df
df = client.persist(df)
# add new records from df to explored
# some computations over explored
explored = client.persist(explored)
- 将当前的 df 写入磁盘并在之后立即读取(这在 spark 中非常有效,但在 Dask 中效果不佳,因为它会附加数据,如果目录被删除则会失败)。在这种情况下,我同时使用了
del df
和 client.cancel(df)
,对计算时间影响很小,所以我决定将它们注释掉。
while i < n_nodes:
i += 1
# some computations over df
os.system('rm -r temp_dir/df_vuelos_dask')
df.to_parquet('temp_dir/df_vuelos_dask')
# del df
# client.cancel(df)
df = dd.read_parquet('temp_dir/df_vuelos_dask')
# add new records from df to explored
# some computations over explored
os.system('rm -r temp_dir/explored')
explored.to_parquet('temp_dir/explored')
# del explored
# client.cancel(explored)
dd.read_parquet('temp_dir/explored')
- 使用
client.restart()
。这个不好,把df的内容删了,探索一下,哪里出了问题。
while i < n_nodes:
i += 1
# some computations over df
os.system('rm -r temp_dir/df_vuelos_dask')
df.to_parquet('temp_dir/df_vuelos_dask')
client.restart()
df = dd.read_parquet('temp_dir/df_vuelos_dask')
# add new records from df to explored
# some computations over explored
os.system('rm -r temp_dir/explored')
explored.to_parquet('temp_dir/explored')
client.restart()
dd.read_parquet('temp_dir/explored')
控制台打印的运行时间(秒)如下:
Iteration 2 / 280. Elapsed time: 407.85055565834045
Iteration 3 / 280. Elapsed time: 434.58717703819275
Iteration 4 / 280. Elapsed time: 436.2463436126709
Iteration 5 / 280. Elapsed time: 437.9837713241577
Iteration 6 / 280. Elapsed time: 440.2417469024658
Iteration 7 / 280. Elapsed time: 442.7933940887451
Iteration 8 / 280. Elapsed time: 445.7904782295227
Iteration 9 / 280. Elapsed time: 449.1104226112366
Iteration 10 / 280. Elapsed time: 452.3273584842682
Iteration 11 / 280. Elapsed time: 456.3567247390747
Iteration 12 / 280. Elapsed time: 460.65562629699707
Iteration 13 / 280. Elapsed time: 464.7628743648529
Iteration 14 / 280. Elapsed time: 469.59177350997925
Iteration 15 / 280. Elapsed time: 474.6557366847992
Iteration 16 / 280. Elapsed time: 479.7272925376892
Iteration 17 / 280. Elapsed time: 485.53346991539
Iteration 18 / 280. Elapsed time: 491.11691975593567
Iteration 19 / 280. Elapsed time: 497.39954662323
Iteration 20 / 280. Elapsed time: 504.03624844551086
Iteration 21 / 280. Elapsed time: 510.45858550071716
Iteration 22 / 280. Elapsed time: 517.7796952724457
Iteration 23 / 280. Elapsed time: 525.3149480819702
Iteration 24 / 280. Elapsed time: 532.6355893611908
Iteration 25 / 280. Elapsed time: 541.2597570419312
Iteration 26 / 280. Elapsed time: 549.2841284275055
Iteration 27 / 280. Elapsed time: 558.8050730228424
Iteration 28 / 280. Elapsed time: 567.617687702179
Iteration 29 / 280. Elapsed time: 577.8864963054657
Iteration 30 / 280. Elapsed time: 587.5171909332275
Iteration 31 / 280. Elapsed time: 598.4596126079559
Iteration 32 / 280. Elapsed time: 608.7272901535034
Iteration 33 / 280. Elapsed time: 620.6863214969635
Iteration 34 / 280. Elapsed time: 631.9231634140015
Iteration 35 / 280. Elapsed time: 643.090336561203
Iteration 36 / 280. Elapsed time: 656.1529128551483
Iteration 37 / 280. Elapsed time: 667.9437139034271
Iteration 38 / 280. Elapsed time: 681.2613704204559
Iteration 39 / 280. Elapsed time: 695.7434968948364
Iteration 40 / 280. Elapsed time: 709.1406977176666
Iteration 41 / 280. Elapsed time: 723.0397245883942
Iteration 42 / 280. Elapsed time: 737.5559349060059
Iteration 43 / 280. Elapsed time: 753.8705065250397
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 44 / 280. Elapsed time: 768.2957532405853
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 45 / 280. Elapsed time: 783.177583694458
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 46 / 280. Elapsed time: 798.720709323883
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 47 / 280. Elapsed time: 814.6071207523346
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 48 / 280. Elapsed time: 830.2278523445129
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 49 / 280. Elapsed time: 846.3982262611389
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 50 / 280. Elapsed time: 865.5728619098663
Iteration 51 / 280. Elapsed time: 882.612627029419
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 52 / 280. Elapsed time: 900.9131906032562
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
Iteration 53 / 280. Elapsed time: 919.1079332828522
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
Iteration 54 / 280. Elapsed time: 937.6077470779419
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
Iteration 55 / 280. Elapsed time: 957.1775703430176
我在具有 16GB RAM 和 12 核的笔记本电脑上本地执行它。数据集存储为 parquet 时大约为 7GB。
如果能指导我做错了什么或放弃已完成的图形操作,我将不胜感激。
谢谢!
您的第一个解决方案(persist
)似乎是合理的。 UI中的任务数是累加的(所以不要每次都从头计算,如果你有100个分区,它们会以100的倍数递增)。
这是我正在使用的示例:
import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd
import numpy as np
import time
client = Client()
client
max_number_of_nodes = 35
number_of_ties = 1_000
network = pd.DataFrame(np.random.randint(max_number_of_nodes, size=(number_of_ties,2)), columns=['source', 'target'])
ddf = dd.from_pandas(network, npartitions=10)
for i in range(10):
ddf = ddf[ddf['source']//i!=5]
ddf = client.persist(ddf)
time.sleep(1)
我目前正在 Dask 和 Spark 中实现 Dijkstra 算法的一个变体(形式比较目的)用于航空公司起飞,它涉及对图形节点的顺序计算。此外,在每一步中,我都会过滤掉图表(节点)中的一些记录,因为它们由于出发时间而变得不可行。然而,尽管数据帧变小了,但新的迭代比之前的迭代花费的时间更长。我通过将中间结果写入 parquet 在 Spark 中解决了这个问题,但我无法为 Dask 解决它。
我怀疑数据帧在图中的每一步都再次执行,但我无法阻止这种情况的发生。
到目前为止,我尝试了以下方法:
- 使用
persist
(这个最快)。但是,每次迭代 UI 中要完成的任务数量都会增加。例如:迭代 8 显示 x/800,迭代 9 显示 x/900(我使用的是 100 个分区)。
while i < n_nodes:
i += 1
# some computations over df
df = client.persist(df)
# add new records from df to explored
# some computations over explored
explored = client.persist(explored)
- 将当前的 df 写入磁盘并在之后立即读取(这在 spark 中非常有效,但在 Dask 中效果不佳,因为它会附加数据,如果目录被删除则会失败)。在这种情况下,我同时使用了
del df
和client.cancel(df)
,对计算时间影响很小,所以我决定将它们注释掉。
while i < n_nodes:
i += 1
# some computations over df
os.system('rm -r temp_dir/df_vuelos_dask')
df.to_parquet('temp_dir/df_vuelos_dask')
# del df
# client.cancel(df)
df = dd.read_parquet('temp_dir/df_vuelos_dask')
# add new records from df to explored
# some computations over explored
os.system('rm -r temp_dir/explored')
explored.to_parquet('temp_dir/explored')
# del explored
# client.cancel(explored)
dd.read_parquet('temp_dir/explored')
- 使用
client.restart()
。这个不好,把df的内容删了,探索一下,哪里出了问题。
while i < n_nodes:
i += 1
# some computations over df
os.system('rm -r temp_dir/df_vuelos_dask')
df.to_parquet('temp_dir/df_vuelos_dask')
client.restart()
df = dd.read_parquet('temp_dir/df_vuelos_dask')
# add new records from df to explored
# some computations over explored
os.system('rm -r temp_dir/explored')
explored.to_parquet('temp_dir/explored')
client.restart()
dd.read_parquet('temp_dir/explored')
控制台打印的运行时间(秒)如下:
Iteration 2 / 280. Elapsed time: 407.85055565834045
Iteration 3 / 280. Elapsed time: 434.58717703819275
Iteration 4 / 280. Elapsed time: 436.2463436126709
Iteration 5 / 280. Elapsed time: 437.9837713241577
Iteration 6 / 280. Elapsed time: 440.2417469024658
Iteration 7 / 280. Elapsed time: 442.7933940887451
Iteration 8 / 280. Elapsed time: 445.7904782295227
Iteration 9 / 280. Elapsed time: 449.1104226112366
Iteration 10 / 280. Elapsed time: 452.3273584842682
Iteration 11 / 280. Elapsed time: 456.3567247390747
Iteration 12 / 280. Elapsed time: 460.65562629699707
Iteration 13 / 280. Elapsed time: 464.7628743648529
Iteration 14 / 280. Elapsed time: 469.59177350997925
Iteration 15 / 280. Elapsed time: 474.6557366847992
Iteration 16 / 280. Elapsed time: 479.7272925376892
Iteration 17 / 280. Elapsed time: 485.53346991539
Iteration 18 / 280. Elapsed time: 491.11691975593567
Iteration 19 / 280. Elapsed time: 497.39954662323
Iteration 20 / 280. Elapsed time: 504.03624844551086
Iteration 21 / 280. Elapsed time: 510.45858550071716
Iteration 22 / 280. Elapsed time: 517.7796952724457
Iteration 23 / 280. Elapsed time: 525.3149480819702
Iteration 24 / 280. Elapsed time: 532.6355893611908
Iteration 25 / 280. Elapsed time: 541.2597570419312
Iteration 26 / 280. Elapsed time: 549.2841284275055
Iteration 27 / 280. Elapsed time: 558.8050730228424
Iteration 28 / 280. Elapsed time: 567.617687702179
Iteration 29 / 280. Elapsed time: 577.8864963054657
Iteration 30 / 280. Elapsed time: 587.5171909332275
Iteration 31 / 280. Elapsed time: 598.4596126079559
Iteration 32 / 280. Elapsed time: 608.7272901535034
Iteration 33 / 280. Elapsed time: 620.6863214969635
Iteration 34 / 280. Elapsed time: 631.9231634140015
Iteration 35 / 280. Elapsed time: 643.090336561203
Iteration 36 / 280. Elapsed time: 656.1529128551483
Iteration 37 / 280. Elapsed time: 667.9437139034271
Iteration 38 / 280. Elapsed time: 681.2613704204559
Iteration 39 / 280. Elapsed time: 695.7434968948364
Iteration 40 / 280. Elapsed time: 709.1406977176666
Iteration 41 / 280. Elapsed time: 723.0397245883942
Iteration 42 / 280. Elapsed time: 737.5559349060059
Iteration 43 / 280. Elapsed time: 753.8705065250397
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 44 / 280. Elapsed time: 768.2957532405853
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 45 / 280. Elapsed time: 783.177583694458
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 46 / 280. Elapsed time: 798.720709323883
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 47 / 280. Elapsed time: 814.6071207523346
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 48 / 280. Elapsed time: 830.2278523445129
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 49 / 280. Elapsed time: 846.3982262611389
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 50 / 280. Elapsed time: 865.5728619098663
Iteration 51 / 280. Elapsed time: 882.612627029419
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 52 / 280. Elapsed time: 900.9131906032562
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
Iteration 53 / 280. Elapsed time: 919.1079332828522
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
Iteration 54 / 280. Elapsed time: 937.6077470779419
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
Iteration 55 / 280. Elapsed time: 957.1775703430176
我在具有 16GB RAM 和 12 核的笔记本电脑上本地执行它。数据集存储为 parquet 时大约为 7GB。
如果能指导我做错了什么或放弃已完成的图形操作,我将不胜感激。
谢谢!
您的第一个解决方案(persist
)似乎是合理的。 UI中的任务数是累加的(所以不要每次都从头计算,如果你有100个分区,它们会以100的倍数递增)。
这是我正在使用的示例:
import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd
import numpy as np
import time
client = Client()
client
max_number_of_nodes = 35
number_of_ties = 1_000
network = pd.DataFrame(np.random.randint(max_number_of_nodes, size=(number_of_ties,2)), columns=['source', 'target'])
ddf = dd.from_pandas(network, npartitions=10)
for i in range(10):
ddf = ddf[ddf['source']//i!=5]
ddf = client.persist(ddf)
time.sleep(1)