如何在python中使用multiprocessing并行运行多个批处理?

ui7jx7zq  于 2023-03-21  发布在  Python
关注(0)|答案(1)|浏览(112)

我有一个很大的数据集(大约38,000条记录),我需要训练。所以我试图将循环分解成批处理。由于每个批处理都是独立的,我想并行运行批处理以加快执行速度。我如何使用多处理来做到这一点?下面是我的代码:

if __name__ == '__main__':
    # Create a list of tuples with each key and the corresponding df for each key
    site_data = [(site, df) for site, df in df.groupby('site')]
    
    # set batch size
    batch_size = 1000
    
    # split dictionary into batches
    batches = [site_data[i:i+batch_size] for i in range(0, len(site_data), batch_size)]
       
    # loop through each batch
    for batch_num, batch in enumerate(batches):
        batch_results = []
        batch_logs = []
        
        print(f'----Batch {batch_num+1} of {len(batches)}-------')
        batch_logs.append(f'\nBatch no {batch_num+1} of {len(batches)} \n')

        # run loops and get results for the current batch
        for i, (k, v) in enumerate(batch):
            print(f'----Iteration {i+1} of {len(batch)}-------')

            result, msg = run_model(k, v, param)
            batch_results.append(result)
            batch_logs.append(msg)

        # Combine the results and save the output files
        batch_results = pd.concat(batch_results)
        batch_results = batch_results.reset_index(drop=True)

        # Save logs to the file
        log_file = f"logs/logs_{today}"
        save_logs(log_file, batch_logs)
gmxoilav

gmxoilav1#

你需要将“worker”代码分割成一个单独的函数,并使用pool.apply_async来并行运行你的函数。
对于写入日志文件,请确保在主进程中执行此操作以避免竞争条件,您可以在工作进程中执行此操作,但需要一个锁才能使其正常工作。
下面演示如何将所有worker逻辑移动到单独的函数。

from multiprocessing import Pool

def worker_func(batch_num, batch):
    batch_results = []
    batch_logs = []

    print(f'----Batch {batch_num + 1} of {len(batches)}-------')
    batch_logs.append(f'\nBatch no {batch_num + 1} of {len(batches)} \n')

    # run loops and get results for the current batch
    for i, (k, v) in enumerate(batch):
        print(f'----Iteration {i + 1} of {len(batch)}-------')

        result, msg = run_model(k, v, param)
        batch_results.append(result)
        batch_logs.append(msg)

    # Combine the results and save the output files
    batch_results = pd.concat(batch_results)
    batch_results = batch_results.reset_index(drop=True)
    return batch_logs

if __name__ == '__main__':
    # Create a list of tuples with each key and the corresponding df for each key
    site_data = [(site, df) for site, df in df.groupby('site')]

    # set batch size
    batch_size = 1000

    # split dictionary into batches
    batches = [
        site_data[i:i + batch_size] for i in range(0, len(site_data), batch_size)
    ]

    with Pool() as pool:
        futures = []
        # loop through each batch
        for batch_num, batch in enumerate(batches):
            futures.append(pool.apply_async(worker_func, batch_num, batch))

        for future in futures:
            batch_logs = future.get()  # wait for work to be done
            # Save logs to the file
            log_file = f"logs/logs_{today}"
            # write to log from main process to avoid race conditions
            save_logs(log_file, batch_logs)

相关问题