python Astroscrapy在多处理Jupyter上不工作

km0tfn4u  于 2022-12-17  发布在  Python
关注(0)|答案(2)|浏览(119)

bounty将于明天到期。回答此问题可获得+50的声望奖励。James Huang希望吸引更多人关注此问题。

我这里有一些多处理代码,试图同时运行多个astroscrappy进程。然而,当它实际上必须调用astroscrappy时,一切都停止了。我在jupyter笔记本上运行这个。

def a_test(i, q):
    import astroscrappy
    print(1)
    path = i
    s = fits.getdata(path)
    print(2)

    print(2.5)
    a = astroscrappy.detect_cosmics(s)
    print(3)
    q.put([a, i])
bundle = []
import multiprocessing as mp
queue = mp.Manager().Queue()

processes = [] 
for k, item in enumerate(paths):
    processes.append(mp.Process(target=a_test, args=(item, queue)))
    
# Run processes
for p in processes:
    p.start()
for p in processes:
    bundle.append(queue.get())

它只能打印出1,2,2.5,但是不能打印出调用astroscrapy后的3。你知道为什么它不能工作吗?

h7appiyu

h7appiyu1#

这段代码产生了多个进程,每个进程都必须有适当的终止时间。为每个进程调用join()就是这样做的。我测试了下面的3个文件的代码,并且能够观察到进程的并发性。在我的机器上执行时间是7-9秒。

import time
import astroscrappy
from astropy.io import fits
import multiprocessing as mp

bundle = []
processes = []
paths = [r"C:\Users\xxxx\FITS\sample.fits", r"C:\Users\xxxx\FITS\sample1.fits", r"C:\Users\xxxx\FITS\sample2.fits"]

def a_test(i,q):    
    print(1)
    path = i
    s = fits.getdata(path)
    print(2)

    print(2.5)
    a = astroscrappy.detect_cosmics(s)
    print(3)    
    q.put([a, path])

if __name__ == '__main__':  
    start = time.time()          
    queue = mp.Manager().Queue()    
    for item in paths:
        processes.append(mp.Process(target=a_test, args=(item,queue)))

    # # Run processes
    for p in processes:
        p.start()
    for p in processes:        
        bundle.append(queue.get())

    #wait for child processes to finish
    for p in processes:      
        p.join()   
    
    end = time.time()

    print(f'Execution time: {end - start} seconds')

输出:

1
2
2.5
1
2
2.5
1
2
2.5
3
3
3
Execution time: 9.224327564239502 seconds

这段代码不能在jupyter notebook中运行,要了解为什么multiprocessing不能在jupyter notebook中正常工作,你可以参考this discussion
但是有一个变通方法,要让这段代码在Jupyter notebook中工作,需要从一个单独的python文件调用a_test,例如,我创建了一个名为 * functest.py * 的python文件(在您的notebook运行的同一目录中),代码如下:

import astroscrappy
from astropy.io import fits

def a_test(*args):        
    path = args[0]   
    s = fits.getdata(path)
    a = astroscrappy.detect_cosmics(s)
    return [a,path]

现在,在你的笔记本电脑中运行下面的代码。注意,我使用了Pool而不是Process,并且输出将不会有来自a_test的print语句,如1,2,2,5等。我特意将它们从a_test中删除,因为它们不会打印到Jupyter笔记本电脑输出。相反,我打印出bundle来验证处理。

import time
import multiprocessing as mp
import functest as f

paths = [r"C:\Users\xxxx\FITS\sample.fits", r"C:\Users\xxxx\FITS\sample1.fits", r"C:\Users\xxxx\FITS\sample2.fits"]

def main():
    t1 = time.time()
    pool_size = len(paths)
    with mp.Pool(processes=pool_size) as pool:
        bundle = pool.map(f.a_test, [item for item in paths]) 
    print(bundle)
    print(f"Execution time: {time.time() - t1} seconds")

if __name__ == '__main__':
    main()

输出:

[[(array([[False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       ...,
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False]]), array([[4.8158903, 4.808729 , 4.7792015, ..., 4.6927767, 4.7188225,
        4.706318 ],
       [4.765932 , 4.75....
Execution time: 8.333278179168701

multiprocessing的另一个替代品是concurrent.futures模块,它在jupyter notebook上运行没有任何问题,下面的代码可以在jupyter notebook上运行,我可以把执行时间降低到5-6秒。

import time
import astroscrappy
from astropy.io import fits
from concurrent.futures import ThreadPoolExecutor

paths = [r"C:\Users\xxxx\FITS\sample.fits", r"C:\Users\xxxx\FITS\sample1.fits", r"C:\Users\xxxx\FITS\sample2.fits"]

def a_test1(*args):    
    print(1)
    path = args[0]
    s = fits.getdata(path)
    print(2)
    print(2.5)
    a = astroscrappy.detect_cosmics(s)
    print(3)
    return [a, path]

def main():
    t1 = time.time()
    n_threads = len(paths)    
    with ThreadPoolExecutor(n_threads) as executor:        
        futures = [executor.submit(a_test1, item) for item in paths]
        bundle = [future.result() for future in futures]
    print(bundle)  
    print(f"Execution time: {time.time() - t1}")

if __name__ == '__main__':  
    main()

输出:

1
1
1
2
2.5
2
2.5
2
2.5
3
3
3
[[(array([[False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       ...,
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False]]), array([[4.8158903, 4.808729 , 4.7792015, ..., 4.6927767, 4.7188225,
        4.706318 ]....
Execution time: 5.936185836791992

另一个选择是使用threading模块。这里有一个可以在jupyter中运行的简单示例。我得到的执行时间大约为5-6秒。

import time
import astroscrappy
from astropy.io import fits
from threading import Thread

paths = [r"C:\Users\xxxx\FITS\sample.fits", r"C:\Users\xxxx\FITS\sample1.fits", r"C:\Users\xxxx\FITS\sample2.fits"]
thread_objs = []

def a_test(i):    
    print(1)
    path = i   
    s = fits.getdata(path)
    print(2)
    print(2.5)
    a = astroscrappy.detect_cosmics(s)
    print(3)    

def main():
    t1 = time.time()
   
    for item in paths:
        thread_objs.append(Thread(target=a_test, args=(item,)))

    # run each thread
    for thread in thread_objs:
        thread.start()
        
    # wait for each thread to finish
    for thread in thread_objs:
        thread.join() 
    print(f"Execution time: {time.time() - t1}")

main()

输出:

1
1
1
2
2.5
2
2.5
2
2.5
3
3
3
Execution time: 6.320343971252441

请注意,如果您不需要在该函数之外处理a_test的结果,那么您就不需要从该函数返回任何内容,这将进一步保存时间。
我也用joblib运行了一些测试,重构了您的代码,并在下面分享了一些测试结果:

from joblib import Parallel, delayed
import astroscrappy
from astropy.io import fits

paths = [r"C:\Users\xxxxx\FITS\sample.fits", r"C:\Users\xxxxx\FITS\sample1.fits", r"C:\Users\xxxx\FITS\sample2.fits"]

def a_test(i):    
    print(1)
    path = i   
    s = fits.getdata(path)
    print(2)
    print(2.5)
    a = astroscrappy.detect_cosmics(s)
    print(3) 
    return ([a,i])
    
def main():
    t1 = time.time()
    a = Parallel(n_jobs=len(paths))(delayed(a_test)(i) for i in paths)    
    print(f"Execution time: {time.time() - t1}")

main()

输出:

Execution time: 6.360047101974487

print语句的输出没有出现在jupyter notebook的输出中,但是执行时间在6-7秒的范围内。
总之,我没有观察到任何方法在执行时间上有显著的时间差异。这也可能是因为我尝试了一个小数据集(只有3个文件)。然而,concurrent.futures始终显示出稍好的结果。您可以尝试所有这些方法,并比较哪一种方法最适合您的用例。

bxpogfeg

bxpogfeg2#

使用joblib的Parallel,我能够使这段代码运行得更快,而不会卡住。

def a_test_parallel(i):
    import astroscrappy
    print(1)
    path = i
    s = fits.getdata(path)
    print(2)

    print(2.5)
    a = astroscrappy.detect_cosmics(s)
    print(3)

    return ([a,i])

a = Parallel(n_jobs=15)(delayed(a_test_parallel)(i) for i in paths[:100])

我运行了几次,与不使用并行的代码相比,它的运行速度几乎快了两倍,仍然不知道为什么多处理不起作用,但至少它起作用了。
编辑:在较大的数据集上运行并做了一些额外的调整后,这里的代码实际上并没有运行速度的两倍快,实际上比同步代码要慢一些。

相关问题