多处理-迭代循环遍历文件夹中的文件

35g0bw71  于 2021-08-20  发布在  Java
关注(0)|答案(1)|浏览(409)

我必须循环浏览30个zip文件夹,每个zip文件夹有50000-90000个jpeg文件。理想情况下,我会遍历每个zip文件夹,因为解压缩每个文件夹会花费太长时间。对于每个文件,我需要打开每个文件,从中提取关键信息,并将信息存储到列表中。基于如何在包含多个文件的文件夹上执行多线程处理,我尝试启用多线程处理以使事情更快,但是,我无法理解。在下面的示例中,我正在尝试让它在一个文件夹中工作,然后我需要弄清楚如何使它在所有30个zip文件夹中循环。

  1. import os
  2. from zipfile import ZipFile
  3. data_list = []
  4. def image_processor(file):
  5. with ZipFile("files101.zip") as zip_file:
  6. with zip_file.open(file, "r") as img_file:
  7. img_data = img_file.readlines(1) # data is available in beginning of each file
  8. # Extract data #1
  9. pattern_1 = r'IMG:\d{,3}'
  10. if re.findall(pattern_1, str(img_data)):
  11. img_extract = re.findall(pattern_1, str(img_data))[0]
  12. else:
  13. img_extract = np.nan
  14. # Extract timestamp
  15. time_pattern = r'Time:\s\d{2}-\d{2}-\d{4}\s\s\d{2}:\d{2}:\d{2}'
  16. if re.findall(time_pattern, str(img_data)):
  17. time_extract = re.findall(time_pattern, str(img_data))[0]
  18. else:
  19. time_extract = np.nan
  20. # Create list
  21. return data_list.append([img_extract, time_extract])
  22. os.chdir(r"C:\\Users\\xxxxxx\\Desktop\\zip")
  23. for folder in os.listdir():
  24. file_list = ZipFile("files101.zip", "r").namelist()
  25. with ProcessPool(processes=8) as pool:
  26. pool.map(image_processor, file_list)

所发生的是,我的代码只是永远运行,就像它没有启用多处理一样。如果我需要多线程,我有六个内核。如有任何建议,将不胜感激。

iq0todco

iq0todco1#

您缺少几个导入项。但我马上注意到的主要项目有: for folder in os.listdir(): 您正在当前目录中的每个文件和目录上循环,但循环没有引用任何这些文件/目录;您正在重复处理files101.zip。
根据您的配置,您似乎正在windows下运行 chdir 指挥部。创建新流程的代码必须在 if __name__ == '__main__':
处理池中的每个进程都有自己的地址空间,并将附加到 data_list . 返回 data_list 返回到主进程,让主进程将所有返回值附加到主列表中是可行的,但必须确保 image_processor 函数以一个空的 data_list 每一个电话。
我假设您希望处理当前目录中所有扩展名为.zip的文件(其中大约有30个)。我将修改您的处理过程,使提交到处理池工作单元的每个任务不是zip归档中的单个文件(在这种情况下,您将提交30*50000个任务),而是整个归档。因此,您的主要处理功能不再是 image_processor 而是 zip_processor . 我对该文件做了一些其他更改,这些更改对我来说是有意义的(我希望我没有破坏任何内容):

  1. import os
  2. import glob
  3. from zipfile import ZipFile
  4. import re
  5. import numpy as np
  6. from multiprocessing import Pool
  7. def zip_processor(zipfile):
  8. with ZipFile(zipfile) as zip_file:
  9. data_list = []
  10. for file in zip_file.namelist():
  11. with zip_file.open(file, "r") as img_file:
  12. # take element 0 from returned list and convert to a string
  13. img_data = img_file.readlines(1)[0].decode() # data is available in beginning of each file
  14. # Extract data #1
  15. pattern_1 = r'IMG:\d{,3}'
  16. # why do findall if you are only using the first occurrence?
  17. m = re.search(pattern1, img_data)
  18. img_extract = m.group(0) if m else np.nan
  19. # Extract timestamp
  20. time_pattern = r'Time:\s\d{2}-\d{2}-\d{4}\s\s\d{2}:\d{2}:\d{2}'
  21. m = re.search(time_pattern, img_data)
  22. time_extract = m.group(0) if m else np.nan
  23. # Create list
  24. data_list.append([img_extract, time_extract])
  25. return data_list
  26. # required for Windows:
  27. if __name__ == '__main__':
  28. os.chdir(r"C:\\Users\\xxxxxx\\Desktop\\zip")
  29. # Default pool size:
  30. with Pool() as pool:
  31. results = pool.imap(zip_processor, glob.iglob('*.zip'))
  32. data_list = []
  33. for result in results:
  34. data_list.extend(result)

现在,由于涉及到大量i/o,使用多线程可能会运行得很好,在这种情况下,更大的池大小将是有利的。进行以下更改:

  1. # from multiprocessing import Pool
  2. from multiprocessing.pool import ThreadPool
  3. ... # etc.
  4. if __name__ == '__main__':
  5. os.chdir(r"C:\\Users\\xxxxxx\\Desktop\\zip")
  6. zip_list = glob.glob('*.zip')
  7. with ThreadPool(len(zip_list)) as pool:
  8. results = pool.imap(zip_processor, zip_list)
  9. ... # etc.
展开查看全部

相关问题