我只是被我写的一些代码弄糊涂了。我惊讶地发现:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(f, iterable))
以及
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(map(lambda x: executor.submit(f, x), iterable))
产生不同的结果。第一个产生f
返回的任何类型的列表,第二个产生concurrent.futures.Future
对象的列表,然后需要使用它们的result()
方法进行计算,以获得f
返回的值。
我主要担心的是,这意味着executor.map
不能利用concurrent.futures.as_completed
,而concurrent.futures.as_completed
似乎是一种非常方便的方法,可以用来评估对数据库的一些长时间调用的结果,因为这些结果是可用的。
我完全不清楚concurrent.futures.ThreadPoolExecutor
对象是如何工作的--天真地说,我更喜欢(稍微详细一点):
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
result_futures = list(map(lambda x: executor.submit(f, x), iterable))
results = [f.result() for f in futures.as_completed(result_futures)]
而不是更简洁的executor.map
,以便利用可能的性能增益。我这样做错了吗?
4条答案
按热度按时间ttcibm8c1#
问题是你把
ThreadPoolExecutor.map
的结果转换成了一个列表。如果你不这样做,而是直接迭代生成的生成器,结果仍然会按照原来的顺序产生,但是在所有结果都准备好之前,循环会继续。你可以用这个例子来测试:保持顺序的原因可能是因为有时候以与Map相同的顺序获得结果很重要。结果可能不会 Package 在未来的对象中,因为在某些情况下,如果需要的话,在列表上执行另一个Map以获得所有结果可能需要太长时间。毕竟在大多数情况下,它是一个简单的Map。很有可能下一个值在循环处理第一个值之前就准备好了。下面的例子说明了这一点:
在这个例子中,
do_some_stuff
可能比crunch_number
花费更长的时间,如果是这样的话,在保持map的易用性的同时,性能也不会有太大的损失。此外,由于工作线程(/processes)从列表的开头开始处理,一直处理到你提交的列表的结尾,结果应该按照迭代器已经产生的顺序完成。这意味着在大多数情况下
executor.map
就可以了,但在某些情况下,例如,如果你处理值的顺序无关紧要,并且你传递给map
的函数需要非常不同的时间来运行,那么future.as_completed
可能会更快。2exbekwf2#
如果使用
concurrent.futures.as_completed
,则可以处理每个函数的异常。在
executor.map
中,如果出现异常,整个执行程序会停止。你需要在worker函数中处理异常。u0sqgete3#
下面是一个
.submit()
与.map()
的示例。它们都立即接受作业(提交|mapped - start)。它们完成的时间相同,都是11秒(最后一个结果时间- start)。但是,.submit()
会在ThreadPoolExecutor
maxThreads=2
中的任何线程完成后立即给出结果(无序!)。而.map()
会按照提交的顺序给出结果。输出:
xpcnnkqh4#
除了这里的答案中的解释之外,直接去找来源也是有帮助的。它重申了这里另一个答案的陈述:
.map()
按照提交的顺序给出结果,而concurrent.futures.as_completed()
迭代Future
对象列表不能保证这种顺序,因为这是as_completed()
的本质.map()
在基类concurrent.futures._base.Executor
中定义:正如您提到的,还有
.submit()
,它将在子类中定义,即ProcessPoolExecutor
和ThreadPoolExecutor
,并返回一个_base.Future
示例,您需要调用.result()
来实际执行任何操作。.map()
中的重要行可以归结为:.reverse()
加上.pop()
是一种方法,它首先得到第一个提交的结果(来自iterables
),然后得到第二个提交的结果,依此类推。它们本身就是实际的结果。