我有一个很大的数据集(大约38,000条记录),我需要训练。所以我试图将循环分解成批处理。由于每个批处理都是独立的,我想并行运行批处理以加快执行速度。我如何使用多处理来做到这一点?下面是我的代码:
if __name__ == '__main__':
# Create a list of tuples with each key and the corresponding df for each key
site_data = [(site, df) for site, df in df.groupby('site')]
# set batch size
batch_size = 1000
# split dictionary into batches
batches = [site_data[i:i+batch_size] for i in range(0, len(site_data), batch_size)]
# loop through each batch
for batch_num, batch in enumerate(batches):
batch_results = []
batch_logs = []
print(f'----Batch {batch_num+1} of {len(batches)}-------')
batch_logs.append(f'\nBatch no {batch_num+1} of {len(batches)} \n')
# run loops and get results for the current batch
for i, (k, v) in enumerate(batch):
print(f'----Iteration {i+1} of {len(batch)}-------')
result, msg = run_model(k, v, param)
batch_results.append(result)
batch_logs.append(msg)
# Combine the results and save the output files
batch_results = pd.concat(batch_results)
batch_results = batch_results.reset_index(drop=True)
# Save logs to the file
log_file = f"logs/logs_{today}"
save_logs(log_file, batch_logs)
1条答案
按热度按时间gmxoilav1#
你需要将“worker”代码分割成一个单独的函数,并使用pool.apply_async来并行运行你的函数。
对于写入日志文件,请确保在主进程中执行此操作以避免竞争条件,您可以在工作进程中执行此操作,但需要一个锁才能使其正常工作。
下面演示如何将所有
worker
逻辑移动到单独的函数。