使用Python 3的concurrent.futures
模块进行并行工作相当容易,如下所示。
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
for future in concurrent.futures.as_completed(future_to):
data = future.result()
在Queue中插入和检索项也非常方便。
q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
q.get()
我有一个脚本在后台运行以监听更新,现在,理论上假设,当这些更新到达时,我会将它们排队,并使用ThreadPoolExecutor
并发地处理它们。
现在,所有这些组件都各自独立地工作,并且是有意义的,但是我如何将它们一起使用呢?我不知道是否可以真实的地从队列中提供ThreadPoolExecutor
工作,除非要工作的数据是预先确定的。
简而言之,我所要做的就是,接收更新,比如说每秒4条消息,将它们推到队列中,然后让我的并发未来处理它们。如果我不这样做,那么我就被缓慢的顺序方法卡住了。
让我们看看下面Python文档中的规范示例:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
URLS
的列表是固定的。是否有可能实时地提供这个列表,并让工作者在它们经过时处理它,也许是出于管理目的从队列中处理它?我有点困惑我的方法是否 * 实际上是可能的 *?
4条答案
按热度按时间b4lqfgs41#
Python文档中的示例,扩展为从队列中获取工作,需要注意的是,这段代码使用
concurrent.futures.wait
而不是concurrent.futures.as_completed
,以允许在等待其他工作完成的同时启动新的工作。获取每个
url
两次的输出:qxgroojn2#
在工作中,我发现了一个需要并行处理无限数据流的情况,于是我创建了一个小型库,灵感来自Stephen Rauch已经给出的出色答案。
我最初通过考虑两个单独的线程来解决这个问题,一个线程向队列提交工作,另一个线程监视队列中任何已完成的任务,并为新工作的进入腾出更多空间。这与Stephen Rauch的提议类似,他使用在单独线程中运行的
feed_the_workers
函数来使用流。通过与我的一位同事交谈,他帮助我认识到,如果定义一个缓冲迭代器,允许您控制每次准备向线程池提交更多工作时从输入流中释放多少元素,那么您可以在单个线程中完成所有任务。
因此,我们引入
BufferedIter
类这允许我们以如下方式定义流处理器
下面我们展示了一个如何使用流处理器的示例
此示例的输出如下所示
r7xajy2e3#
我真的很喜欢@pedro上面有趣的方法。但是,当处理成千上万个文件时,我注意到在最后会抛出一个StopIteration,并且总是会跳过一些文件。我不得不做了一点修改,如下所示。非常有用的答案。
--拨打如下电话
--其中stop是一个函数,它只告诉用户关闭
bprjcwpo4#
不必严格使用队列就可以获得执行器的好处。新任务从主线程提交。未完成的future被跟踪并等待,直到所有future完成。
输出:
为了更清楚起见,取消注解两个
print
语句。根据上面的输出,工人越多,速度越快。