Python3:限制线程产生的队列

5ssjco0h  于 2023-01-27  发布在  Python
关注(0)|答案(1)|浏览(193)

我正在运行的I/O绑定和CPU绑定任务。其中multiprocessing用于从multiprocessing.Queue()爬行网站,解析它的HTML提取链接,和threading用于读取一个文本文件,其中包含从巨型市场网站的子域列表(shopee.com和tokopedia.com)

import multiprocessing
from threading import Thread
from multiprocessing import cpu_count, Process, Queue

tasker = Queue(cpu_count()*10)

def consumer():
    while True:
        task = tasker.get()
        if task is None:
            break
        print(task)

def adding_task():
    with open('file.txt', 'r') as f:
        for line in f:
            tasker.put(line.strip())
    for i in range(cpu_count()):
        tasker.put(None)

def producer():
    add_task = Thread(target = adding_task)
    add_task.start()
    procs = []
    for i in range(cpu_count()):
        p = Process(target = consumer)
        p.start()
        procs.append(p)
    for p in procs:
        p.join()
    
    #Suspected problem.
    add_task.join()

producer()

问题是:Queue的增长速度超过了multiprocessing完成任务的速度。目前,我正在使用此命令检查任务是否为空:

import multiprocessing
from threading import Thread
from netaddr import IPNetwork
from multiprocessing import cpu_count, Process, Queue

tasker = Queue(cpu_count()*10)
ips = '173.245.48.0/20'

def consumer():
    while True:
        task = tasker.get()
        if task is None:
            break
        print(task)

#Check if Queue is empty or full: tasker.full()
def check_tasker():
    while True:
        if tasker.empty():
            break
        pass

def adding_task():
    for ip in IPNetwork(ips):
        check_tasker()
        tasker.put(str(ip))
    for i in range(cpu_count()):
        tasker.put(None)

def producer():
    add_task = Thread(target = adding_task)
    add_task.start()
    procs = []
    for i in range(cpu_count()):
        p = Process(target = consumer)
        p.start()
        procs.append(p)
    for p in procs:
        p.join()
    
    add_task.join()
    exit()

producer()

如果multiprocessing.Queue已经达到N个任务,是否有更好的方法暂时停止threading?如果减少,是否继续?

3bygqnnd

3bygqnnd1#

队列已绑定:

tasker = Queue(cpu_count()*10)

这是您唯一需要的限制。如果队列达到最大容量,容量限制 * 已经 * 阻止添加新任务。(操作系统级别对底层管道大小的限制可能会阻止在队列达到标称容量之前添加项目。)

相关问题