python 优化Dask计算(内存影响和通信开销)

zpjtge22  于 2023-09-29  发布在  Python
关注(0)|答案(1)|浏览(112)

我正在做一个最初针对Python多处理库进行优化的项目,我试图使用Dask将工作分发到多个节点上。特别是我正在尝试使用SSHCluster。
为了尽可能地优化,我已经将worker方法更改为更细粒度的,即:工作在尽可能小的水平上,需要更小的输入并返回更小的输出。我试图利用最少的内存,同时花最少的时间来完成一项任务。
我的数据结构类型如下:一个大字典,包含内部整数值、数组和字典。以及更简单的字典、数组和集合。它们本质上是动态的,即它们应该被工作方法改变,并返回给客户端,然后由对相同(和其他)工作方法的后续调用使用。
我还有一个dict的dict,它是静态的,还有一些其他的对象也具有静态数据。我将这些属性保存为JSON文件,使用client.upload_file方法将文件上传到worker,并使用client.register_worker_callbacks方法将这些文件注册到worker端,以便能够将它们用作worker方法的“全局共享”内存(这会占用一些内存,特别是因为数据在每个worker的内存空间中复制,但是,它工作得很好,因为数据在创建worker时加载一次,然后由任何后续的worker方法(任务)计算共享)。
然而,当涉及到动态存储器时,这些数据(大约为大小为800 MB),需要在开始计算工作方法之前以最有效的方式传递给工作方法。
我想出了三种可能的方法来实现客户和工人之间的这种沟通:
1.根据每个worker方法的需求,将数据结构拆分为“部分数据结构”。例如,如果工作者方法将处理人A、B、C,则仅包括A、B、C的对应数据。然后随后只返回A、B、C的数据。
1.使用以下命令将这些数据结构分散给工作人员:scatter(dynamic_obj,broadcast=true).将分散的数据结构作为future传递给worker(沿着其他小参数)。然后在worker端构建仅供本地使用的“部分数据结构”,并返回类似于1的结果。
1.使用Dask数据结构,如Dask.Bag或Dask. Array。
第一个起作用;我只是不确定我的内存消耗是否是最佳的。例如,客户端使用大约6.3gb的内存,调度器使用大约2.8gb的内存,工作器使用大约2.3gb的内存。我使用client.submit或client.map方法,然后使用as_completed方法评估结果期货。我也会在评估其结果后立即释放每个未来。虽然我认为客户端应该使用6.3GB的内存,但我不确定为什么调度程序要使用那么多内存,而我应该如此迅速地发布结果。工人似乎有一个基线记忆,这是大约1. 9 GB,所以2. 3 GB似乎是可以接受的“工作记忆”,而任务正在进行中。
第二个选择行不通。尝试呼叫时:

client.scatter(dynamic_obj, broadcast=True)

在我所描述的“一个大的dict,具有内部整数值,数组和dict”上,我得到:
distributed.comm.core.CommClosedError:在<TCP(closed)ConnectionPool.scatter local=tcp://127.0.0.1:58858 remote=tcop://127.0.1.1:35767>中:流已关闭
大约32分钟后。这可能是因为“嵌套dict”类型的数据结构吗?集合甚至没有那么大,并且大多数内部dict/数组此时都是空的。
我不确定第三个选项,特别是因为从我读过的文章和我看过的视频来看,这些似乎对大型数据集的分布式计算更有用,而我没有足够的数据集。我正在使用的数据结构(以及上面提到的)是我发现的需要由worker方法返回的结果的最方便的数据表示。然而,我想知道,当使用Dask数据结构时,我是否可以在数据通信开销方面获得自动加速。如果我能在这方面被指出正确的方向,那将是很有趣的。

tzdcorbm

tzdcorbm1#

这并不是一个答案...
我认为,当你的数据结构与dask提供的“集合”(bad,array,dataframe)不匹配时,使用ad-hoc client.submit是一件很好的事情。
请记住,您不需要同时提交所有工作(这相当于您可能在其他地方看到的dask.delayed工作流)。例如,您可以提交一批100个期货,并等待它们完成后再提交更多;或者因为你正在使用as_completed,所以每次完成一个任务时提交一个新任务。这种模式应该有助于保持调度程序的内存使用率较低,因为它可以“忘记”旧的工作,而不是将所有内容排队。
第二个选择行不通。
序列化一堆嵌套的python对象在字节和CPU时间方面可能比您想象的要昂贵。实际上,JSON文本在某些情况下可能更有效,因为它所能表示的内容更有限。而且,这允许您将IO传输内存使用与加载和解析分离,而不是依赖pickle来做正确的事情。
如果你的数据是嵌套的JSON,但实际上每个条目都有一个一致的模式,你可能会对awkward array感兴趣,如果你正在做数值工作,它可能会更有效地利用内存,并提供对你的值的快速矢量化计算。

相关问题