我正在运行的I/O绑定和CPU绑定任务。其中multiprocessing
用于从multiprocessing.Queue()
爬行网站,解析它的HTML提取链接,和threading
用于读取一个文本文件,其中包含从巨型市场网站的子域列表(shopee.com和tokopedia.com)
import multiprocessing
from threading import Thread
from multiprocessing import cpu_count, Process, Queue
tasker = Queue(cpu_count()*10)
def consumer():
while True:
task = tasker.get()
if task is None:
break
print(task)
def adding_task():
with open('file.txt', 'r') as f:
for line in f:
tasker.put(line.strip())
for i in range(cpu_count()):
tasker.put(None)
def producer():
add_task = Thread(target = adding_task)
add_task.start()
procs = []
for i in range(cpu_count()):
p = Process(target = consumer)
p.start()
procs.append(p)
for p in procs:
p.join()
#Suspected problem.
add_task.join()
producer()
问题是:Queue
的增长速度超过了multiprocessing
完成任务的速度。目前,我正在使用此命令检查任务是否为空:
import multiprocessing
from threading import Thread
from netaddr import IPNetwork
from multiprocessing import cpu_count, Process, Queue
tasker = Queue(cpu_count()*10)
ips = '173.245.48.0/20'
def consumer():
while True:
task = tasker.get()
if task is None:
break
print(task)
#Check if Queue is empty or full: tasker.full()
def check_tasker():
while True:
if tasker.empty():
break
pass
def adding_task():
for ip in IPNetwork(ips):
check_tasker()
tasker.put(str(ip))
for i in range(cpu_count()):
tasker.put(None)
def producer():
add_task = Thread(target = adding_task)
add_task.start()
procs = []
for i in range(cpu_count()):
p = Process(target = consumer)
p.start()
procs.append(p)
for p in procs:
p.join()
add_task.join()
exit()
producer()
如果multiprocessing.Queue
已经达到N个任务,是否有更好的方法暂时停止threading
?如果减少,是否继续?
1条答案
按热度按时间3bygqnnd1#
队列已绑定:
这是您唯一需要的限制。如果队列达到最大容量,容量限制 * 已经 * 阻止添加新任务。(操作系统级别对底层管道大小的限制可能会阻止在队列达到标称容量之前添加项目。)