顺序算法中的 Dask Iteration 比前一个长

Dask Iteration in sequential algorithm is longer than the previous one

我目前正在 Dask 和 Spark 中实现 Dijkstra 算法的一个变体(形式比较目的)用于航空公司起飞,它涉及对图形节点的顺序计算。此外,在每一步中,我都会过滤掉图表(节点)中的一些记录,因为它们由于出发时间而变得不可行。然而,尽管数据帧变小了,但新的迭代比之前的迭代花费的时间更长。我通过将中间结果写入 parquet 在 Spark 中解决了这个问题,但我无法为 Dask 解决它。

我怀疑数据帧在图中的每一步都再次执行,但我无法阻止这种情况的发生。

到目前为止,我尝试了以下方法:

  1. 使用persist(这个最快)。但是,每次迭代 UI 中要完成的任务数量都会增加。例如:迭代 8 显示 x/800,迭代 9 显示 x/900(我使用的是 100 个分区)。
while i < n_nodes:
    i += 1
    # some computations over df
    df = client.persist(df)
    # add new records from df to explored
    # some computations over explored
    explored = client.persist(explored)
  1. 将当前的 df 写入磁盘并在之后立即读取(这在 spark 中非常有效,但在 Dask 中效果不佳,因为它会附加数据,如果目录被删除则会失败)。在这种情况下,我同时使用了 del dfclient.cancel(df),对计算时间影响很小,所以我决定将它们注释掉。
while i < n_nodes:
    i += 1
    # some computations over df
    os.system('rm -r temp_dir/df_vuelos_dask')
    df.to_parquet('temp_dir/df_vuelos_dask')
    # del df
    # client.cancel(df)
    df = dd.read_parquet('temp_dir/df_vuelos_dask')
    # add new records from df to explored
    # some computations over explored
    os.system('rm -r temp_dir/explored')
    explored.to_parquet('temp_dir/explored')
    # del explored
    # client.cancel(explored)
    dd.read_parquet('temp_dir/explored')

  1. 使用client.restart()。这个不好,把df的内容删了,探索一下,哪里出了问题。
while i < n_nodes:
    i += 1
    # some computations over df
    os.system('rm -r temp_dir/df_vuelos_dask')
    df.to_parquet('temp_dir/df_vuelos_dask')
    client.restart()
    df = dd.read_parquet('temp_dir/df_vuelos_dask')
    # add new records from df to explored
    # some computations over explored
    os.system('rm -r temp_dir/explored')
    explored.to_parquet('temp_dir/explored')
    client.restart()
    dd.read_parquet('temp_dir/explored')

控制台打印的运行时间(秒)如下:

 Iteration 2 / 280. Elapsed time: 407.85055565834045
 Iteration 3 / 280. Elapsed time: 434.58717703819275
 Iteration 4 / 280. Elapsed time: 436.2463436126709
 Iteration 5 / 280. Elapsed time: 437.9837713241577
 Iteration 6 / 280. Elapsed time: 440.2417469024658
 Iteration 7 / 280. Elapsed time: 442.7933940887451
 Iteration 8 / 280. Elapsed time: 445.7904782295227
 Iteration 9 / 280. Elapsed time: 449.1104226112366
 Iteration 10 / 280. Elapsed time: 452.3273584842682
 Iteration 11 / 280. Elapsed time: 456.3567247390747
 Iteration 12 / 280. Elapsed time: 460.65562629699707
 Iteration 13 / 280. Elapsed time: 464.7628743648529
 Iteration 14 / 280. Elapsed time: 469.59177350997925
 Iteration 15 / 280. Elapsed time: 474.6557366847992
     Iteration 16 / 280. Elapsed time: 479.7272925376892
 Iteration 17 / 280. Elapsed time: 485.53346991539
 Iteration 18 / 280. Elapsed time: 491.11691975593567
 Iteration 19 / 280. Elapsed time: 497.39954662323
 Iteration 20 / 280. Elapsed time: 504.03624844551086
 Iteration 21 / 280. Elapsed time: 510.45858550071716
 Iteration 22 / 280. Elapsed time: 517.7796952724457
 Iteration 23 / 280. Elapsed time: 525.3149480819702
 Iteration 24 / 280. Elapsed time: 532.6355893611908
 Iteration 25 / 280. Elapsed time: 541.2597570419312
 Iteration 26 / 280. Elapsed time: 549.2841284275055
 Iteration 27 / 280. Elapsed time: 558.8050730228424
 Iteration 28 / 280. Elapsed time: 567.617687702179
 Iteration 29 / 280. Elapsed time: 577.8864963054657
 Iteration 30 / 280. Elapsed time: 587.5171909332275
 Iteration 31 / 280. Elapsed time: 598.4596126079559
 Iteration 32 / 280. Elapsed time: 608.7272901535034
 Iteration 33 / 280. Elapsed time: 620.6863214969635
 Iteration 34 / 280. Elapsed time: 631.9231634140015
 Iteration 35 / 280. Elapsed time: 643.090336561203
 Iteration 36 / 280. Elapsed time: 656.1529128551483
 Iteration 37 / 280. Elapsed time: 667.9437139034271
 Iteration 38 / 280. Elapsed time: 681.2613704204559
 Iteration 39 / 280. Elapsed time: 695.7434968948364
 Iteration 40 / 280. Elapsed time: 709.1406977176666
 Iteration 41 / 280. Elapsed time: 723.0397245883942
 Iteration 42 / 280. Elapsed time: 737.5559349060059
 Iteration 43 / 280. Elapsed time: 753.8705065250397
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 44 / 280. Elapsed time: 768.2957532405853
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 45 / 280. Elapsed time: 783.177583694458
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 46 / 280. Elapsed time: 798.720709323883
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 47 / 280. Elapsed time: 814.6071207523346
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 48 / 280. Elapsed time: 830.2278523445129
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 49 / 280. Elapsed time: 846.3982262611389
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 50 / 280. Elapsed time: 865.5728619098663
 Iteration 51 / 280. Elapsed time: 882.612627029419
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 52 / 280. Elapsed time: 900.9131906032562
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
 Iteration 53 / 280. Elapsed time: 919.1079332828522
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
 Iteration 54 / 280. Elapsed time: 937.6077470779419
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
 Iteration 55 / 280. Elapsed time: 957.1775703430176

我在具有 16GB RAM 和 12 核的笔记本电脑上本地执行它。数据集存储为 parquet 时大约为 7GB。

如果能指导我做错了什么或放弃已完成的图形操作,我将不胜感激。

谢谢!

您的第一个解决方案(persist)似乎是合理的。 UI中的任务数是累加的(所以不要每次都从头计算,如果你有100个分区,它们会以100的倍数递增)。

这是我正在使用的示例:

import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd
import numpy as np
import time

client = Client()
client

max_number_of_nodes = 35
number_of_ties = 1_000
network = pd.DataFrame(np.random.randint(max_number_of_nodes, size=(number_of_ties,2)), columns=['source', 'target'])
ddf = dd.from_pandas(network, npartitions=10)

for i in range(10):
    ddf = ddf[ddf['source']//i!=5]
    ddf = client.persist(ddf)
    time.sleep(1)