Python多处理安全地写入文件

ppcbkaq5  于 2023-08-02  发布在  Python
关注(0)|答案(4)|浏览(131)

我试图解决一个涉及许多子问题的大型数值问题,我使用Python的多处理模块(特别是Pool.map)将不同的独立子问题分解到不同的核心上。每个子问题都涉及到计算很多子问题,我试图有效地将这些结果存储到一个文件中,如果它们还没有被任何过程计算过,否则跳过计算,只从文件中读取结果。
我遇到了文件的并发问题:不同的进程有时会检查是否已经计算了一个子问题(通过查找存储结果的文件),看看它还没有,运行计算,然后尝试同时将结果写入同一个文件。如何避免像这样的写入冲突?

t2a7ltrp

t2a7ltrp1#

@GP89提到了一个很好的解决方案。使用队列将写入任务发送到对文件具有唯一写入访问权限的专用进程。所有其他工作线程都具有只读访问权限。这将消除冲突。下面是一个使用apply_async的例子,但它也可以使用map:

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    start = time.clock()
    s = 'this is a test'
    txt = s
    for i in range(200000):
        txt += s 
    done = time.clock() - start
    with open(fn, 'rb') as f:
        size = len(f.read())
    res = 'Process' + str(arg), str(size), done
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                f.write('killed')
                break
            f.write(str(m) + '\n')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for i in range(80):
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()

字符串

f8rj6qna

f8rj6qna2#

在我看来,您需要使用Manager将结果临时保存到列表中,然后将结果从列表写入文件。另外,使用starmap传递要处理的对象和托管列表。第一步是构建要传递给starmap的参数,其中包括托管列表。

from multiprocessing import Manager
from multiprocessing import Pool  
import pandas as pd

def worker(row, param):
    # do something here and then append it to row
    x = param**2
    row.append(x)

if __name__ == '__main__':
    pool_parameter = [] # list of objects to process
    with Manager() as mgr:
        row = mgr.list([])

        # build list of parameters to send to starmap
        for param in pool_parameter:
            params.append([row,param])

        with Pool() as p:
            p.starmap(worker, params)

字符串
从这一点上,你需要决定你将如何处理列表。如果您有大量的RAM和庞大的数据集,可以使用pandas连接。然后你可以很容易地将文件保存为csv或pickle。

df = pd.concat(row, ignore_index=True)

        df.to_pickle('data.pickle')
        df.to_csv('data.csv')

vc9ivgsu

vc9ivgsu3#

为了回应评论说这是在集群上运行的,一个简单的选项,它不依赖于进程间通信,是使用Python标准库中的fcntl锁定memoization文件。
这适用于MacOS,我希望它能在大多数unix系统上工作,尽管它需要在您的特定网络存储实现上进行测试:

safe.py

import fcntl
import time

def myprint(*args):
    print(time.ctime(), *args)

def main():
    with open("safe.txt", "r+") as file:

        myprint("locking")

        # this will block (unless LOCK_EX | LOCK_NB is used)
        fcntl.lockf(file, fcntl.LOCK_EX)

        lines = file.readlines()

        # make race conditions more likely
        time.sleep(1)
        
        # "1" or one more than the the previous entry
        newval = int(lines[-1])+1 if lines else 1

        print(newval)

        file.write(str(newval) + "\n")
        file.flush()

        myprint("unlocking")

        fcntl.lockf(file, fcntl.F_UNLCK)

if __name__ == '__main__':
    main()

字符串
你可以通过在终端中运行以下命令来检查它是否在本地工作:

touch safe.txt  # this needs to already exist

for x in 1 2 3 4 5
do
  python safe.py &
done

cat safe.txt  # should have 1-5 inside


如果将其与多处理结合使用,每个进程可能都需要自己的文件描述符(因此在每个进程中分别运行open())。

nnt7mjpx

nnt7mjpx4#

我想我也会发布一个稍微简单的问题的解决方案,因为每当我寻找我的问题时,这个页面就会出现。
我有点松散地基于@MikeHunter的解决方案above。我需要稍微不同的东西的原因是,我想在每个进程结束时写入的数组相当大,这意味着将它们放入队列并从队列中获取它们并使用不同的进程写入它们意味着大量的pickle和unpickle超大数组。这个 * 并不 * 处理OP要求的检查许多子问题和子子问题的问题,但是它处理了问题的“标题”!
我该怎么办?
我没有共享数组,而是将“0”放在队列中,让进程得到“0”。如果没有什么可获取的,则进程空闲,直到有东西为止。如果有,进程开始写入,并在写入后将某些内容放入队列中。重要的是,给定一个较长的写入过程,q.get()默认将timeout设置为None。如果写入时间很长,而超时设置为较短的时间,则当然会失败。

from multiprocessing import Process, Queue
import h5py
import numpy as np
from time import sleep, time

def func(i, q, filename, subfilename):

    # Reading from the subfile
    with h5py.File(subfilename, 'r') as ds:
        array = ds['array'][:]

    sleeptime = np.random.rand(1)*4 + 1
    sleep(sleeptime[0])

    # Print array loaded to compare to output in the summary file
    print(i, f'{sleeptime[0]:.3f}', array)

    # If anything is put in the queue it means that a process can start writing
    q.get()

    with h5py.File(filename, 'r+') as ds:
        ds['array'][i, :] = array

    # Indicate to the other processes that we are done writing to the summary
    # file
    q.put(0)

if __name__ == '__main__':

    N = 10
    Nsample = 5

    subfilenames = [f'sub_{i:>02d}.h5' for i in range(N)]

    for i in range(N):
        with h5py.File(subfilenames[i], 'w') as ds:
            disp = ds.create_dataset(
                'array', data=np.random.randint(0, 5, size=(5,)), dtype='f')

    filename = 'test.h5'

    with h5py.File(filename, 'w') as ds:
        disp = ds.create_dataset('array', (N, Nsample), dtype='f')

    # Create a queue to communicate between the writing status between the
    # processes
    q = Queue()

    # Put a 0 in the queue to indicate that a worker can start writing
    q.put(0)

    # Start the timer
    t0 = time()

    # Distribute the work to the workers
    processes = []

    print(" T  sleeptime     array", flush=True)
    print("-----------------------", flush=True)

    for i in range(N):
        p = Process(target=func, args=(
            i, q, filename, subfilenames[i]))
        p.start()
        processes.append(p)

    # Wait for the workers to finish
    for p in processes:
        p.join()

    # Print time taken
    print(f'Total time taken: {time()-t0:.2f} s')

字符串
如果将脚本保存为hello.py,则可以运行并对输出进行如下排序:

python hello.py | sort


这应该会产生类似这样的东西:

T  sleeptime     array
-----------------------
0 4.336 [4. 1. 1. 0. 2.]
1 2.363 [2. 1. 1. 1. 3.]
2 2.741 [1. 2. 2. 4. 3.]
3 1.078 [1. 4. 4. 3. 0.]
4 1.327 [4. 4. 4. 4. 1.]
5 4.174 [1. 3. 1. 0. 4.]
6 2.095 [4. 1. 0. 3. 0.]
7 1.091 [3. 4. 4. 0. 4.]
8 1.601 [4. 3. 3. 1. 4.]
9 4.550 [3. 3. 3. 4. 0.]
Total time taken: 4.94 s


检查写入的HDF 5文件:

h5dump test.h5


结果应该是这样的

HDF5 "test.h5" {
GROUP "/" {
   DATASET "array" {
      DATATYPE  H5T_IEEE_F32LE
      DATASPACE  SIMPLE { ( 10, 5 ) / ( 10, 5 ) }
      DATA {
      (0,0): 4, 1, 1, 0, 2,
      (1,0): 2, 1, 1, 1, 3,
      (2,0): 1, 2, 2, 4, 3,
      (3,0): 1, 4, 4, 3, 0,
      (4,0): 4, 4, 4, 4, 1,
      (5,0): 1, 3, 1, 0, 4,
      (6,0): 4, 1, 0, 3, 0,
      (7,0): 3, 4, 4, 0, 4,
      (8,0): 4, 3, 3, 1, 4,
      (9,0): 3, 3, 3, 4, 0
      }
   }
}
}


注意,使用mpi 4py有更好的方法,但我需要用户不要担心MPI。

相关问题