dask迭代比前一种算法长

kuhbmx9i  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(474)

我目前正在dask和spark(form comparison purposes)中实现dijkstra算法的一个变体,用于航班起飞,这涉及到对图的节点进行顺序计算。另外,在每一步中,我都会过滤掉图(节点)中的一些记录,因为它们由于离开时间而变得不可行。然而,尽管Dataframe变小了,但是新的迭代要比以前的迭代花费更长的时间。我在spark中通过将中间结果写入parquet解决了这个问题,但在dask中无法解决它。
我怀疑dataframe在图中的每一步都会再次执行,但是我无法阻止这种情况的发生。
到目前为止,我已经尝试了以下方法:
使用 persist (这是最快的)。但是,每次迭代都会增加ui中要完成的任务数。例如:迭代8显示x/800,迭代9显示x/900(我使用100个分区)。

while i < n_nodes:
    i += 1
    # some computations over df
    df = client.persist(df)
    # add new records from df to explored
    # some computations over explored
    explored = client.persist(explored)

将当前df写入磁盘,然后立即读取(这在spark中工作得非常好,但在dask中效果不太好,因为它会附加数据,如果删除目录,则会失败)。在这种情况下,我使用了两者(不是同时使用), del df 以及 client.cancel(df) 对计算时间几乎没有影响,所以我决定把它们注解掉。

while i < n_nodes:
    i += 1
    # some computations over df
    os.system('rm -r temp_dir/df_vuelos_dask')
    df.to_parquet('temp_dir/df_vuelos_dask')
    # del df
    # client.cancel(df)
    df = dd.read_parquet('temp_dir/df_vuelos_dask')
    # add new records from df to explored
    # some computations over explored
    os.system('rm -r temp_dir/explored')
    explored.to_parquet('temp_dir/explored')
    # del explored
    # client.cancel(explored)
    dd.read_parquet('temp_dir/explored')

使用 client.restart() . 这一个不好,因为它删除了df的内容,并探讨了哪些问题。

while i < n_nodes:
    i += 1
    # some computations over df
    os.system('rm -r temp_dir/df_vuelos_dask')
    df.to_parquet('temp_dir/df_vuelos_dask')
    client.restart()
    df = dd.read_parquet('temp_dir/df_vuelos_dask')
    # add new records from df to explored
    # some computations over explored
    os.system('rm -r temp_dir/explored')
    explored.to_parquet('temp_dir/explored')
    client.restart()
    dd.read_parquet('temp_dir/explored')

以下是控制台中打印的已用时间(秒)的输出:

Iteration 2 / 280. Elapsed time: 407.85055565834045
 Iteration 3 / 280. Elapsed time: 434.58717703819275
 Iteration 4 / 280. Elapsed time: 436.2463436126709
 Iteration 5 / 280. Elapsed time: 437.9837713241577
 Iteration 6 / 280. Elapsed time: 440.2417469024658
 Iteration 7 / 280. Elapsed time: 442.7933940887451
 Iteration 8 / 280. Elapsed time: 445.7904782295227
 Iteration 9 / 280. Elapsed time: 449.1104226112366
 Iteration 10 / 280. Elapsed time: 452.3273584842682
 Iteration 11 / 280. Elapsed time: 456.3567247390747
 Iteration 12 / 280. Elapsed time: 460.65562629699707
 Iteration 13 / 280. Elapsed time: 464.7628743648529
 Iteration 14 / 280. Elapsed time: 469.59177350997925
 Iteration 15 / 280. Elapsed time: 474.6557366847992
     Iteration 16 / 280. Elapsed time: 479.7272925376892
 Iteration 17 / 280. Elapsed time: 485.53346991539
 Iteration 18 / 280. Elapsed time: 491.11691975593567
 Iteration 19 / 280. Elapsed time: 497.39954662323
 Iteration 20 / 280. Elapsed time: 504.03624844551086
 Iteration 21 / 280. Elapsed time: 510.45858550071716
 Iteration 22 / 280. Elapsed time: 517.7796952724457
 Iteration 23 / 280. Elapsed time: 525.3149480819702
 Iteration 24 / 280. Elapsed time: 532.6355893611908
 Iteration 25 / 280. Elapsed time: 541.2597570419312
 Iteration 26 / 280. Elapsed time: 549.2841284275055
 Iteration 27 / 280. Elapsed time: 558.8050730228424
 Iteration 28 / 280. Elapsed time: 567.617687702179
 Iteration 29 / 280. Elapsed time: 577.8864963054657
 Iteration 30 / 280. Elapsed time: 587.5171909332275
 Iteration 31 / 280. Elapsed time: 598.4596126079559
 Iteration 32 / 280. Elapsed time: 608.7272901535034
 Iteration 33 / 280. Elapsed time: 620.6863214969635
 Iteration 34 / 280. Elapsed time: 631.9231634140015
 Iteration 35 / 280. Elapsed time: 643.090336561203
 Iteration 36 / 280. Elapsed time: 656.1529128551483
 Iteration 37 / 280. Elapsed time: 667.9437139034271
 Iteration 38 / 280. Elapsed time: 681.2613704204559
 Iteration 39 / 280. Elapsed time: 695.7434968948364
 Iteration 40 / 280. Elapsed time: 709.1406977176666
 Iteration 41 / 280. Elapsed time: 723.0397245883942
 Iteration 42 / 280. Elapsed time: 737.5559349060059
 Iteration 43 / 280. Elapsed time: 753.8705065250397
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 44 / 280. Elapsed time: 768.2957532405853
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 45 / 280. Elapsed time: 783.177583694458
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 46 / 280. Elapsed time: 798.720709323883
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 47 / 280. Elapsed time: 814.6071207523346
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 48 / 280. Elapsed time: 830.2278523445129
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 49 / 280. Elapsed time: 846.3982262611389
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 50 / 280. Elapsed time: 865.5728619098663
 Iteration 51 / 280. Elapsed time: 882.612627029419
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 52 / 280. Elapsed time: 900.9131906032562
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
 Iteration 53 / 280. Elapsed time: 919.1079332828522
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
 Iteration 54 / 280. Elapsed time: 937.6077470779419
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
 Iteration 55 / 280. Elapsed time: 957.1775703430176

我在一台有16gb ram和12核的笔记本电脑上本地执行它。数据集大约为7gb,存储为parquet。
我将感谢你的指导,我做错了什么或一种方法,放弃已完成的图形行动。
谢谢!

hyrbngr7

hyrbngr71#

您的第一个解决方案(使用 persist ),似乎是合理的。ui中的任务数是累积的(因此不应该每次都从头开始计算,如果有100个分区,它们将以100的倍数递增)。
下面是我正在使用的一个示例:

import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd
import numpy as np
import time

client = Client()
client

max_number_of_nodes = 35
number_of_ties = 1_000
network = pd.DataFrame(np.random.randint(max_number_of_nodes, size=(number_of_ties,2)), columns=['source', 'target'])
ddf = dd.from_pandas(network, npartitions=10)

for i in range(10):
    ddf = ddf[ddf['source']//i!=5]
    ddf = client.persist(ddf)
    time.sleep(1)

相关问题