python-3.x ThreadPoolExecutor().map与ThreadPoolExecutor().submit有何不同?

3htmauhk  于 2023-03-24  发布在  Python
关注(0)|答案(4)|浏览(226)

我只是被我写的一些代码弄糊涂了。我惊讶地发现:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(f, iterable))

以及

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(map(lambda x: executor.submit(f, x), iterable))

产生不同的结果。第一个产生f返回的任何类型的列表,第二个产生concurrent.futures.Future对象的列表,然后需要使用它们的result()方法进行计算,以获得f返回的值。
我主要担心的是,这意味着executor.map不能利用concurrent.futures.as_completed,而concurrent.futures.as_completed似乎是一种非常方便的方法,可以用来评估对数据库的一些长时间调用的结果,因为这些结果是可用的。
我完全不清楚concurrent.futures.ThreadPoolExecutor对象是如何工作的--天真地说,我更喜欢(稍微详细一点):

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    results = [f.result() for f in futures.as_completed(result_futures)]

而不是更简洁的executor.map,以便利用可能的性能增益。我这样做错了吗?

ttcibm8c

ttcibm8c1#

问题是你把ThreadPoolExecutor.map的结果转换成了一个列表。如果你不这样做,而是直接迭代生成的生成器,结果仍然会按照原来的顺序产生,但是在所有结果都准备好之前,循环会继续。你可以用这个例子来测试:

import time
import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(4)
s = range(10)
for i in e.map(time.sleep, s):
    print(i)

保持顺序的原因可能是因为有时候以与Map相同的顺序获得结果很重要。结果可能不会 Package 在未来的对象中,因为在某些情况下,如果需要的话,在列表上执行另一个Map以获得所有结果可能需要太长时间。毕竟在大多数情况下,它是一个简单的Map。很有可能下一个值在循环处理第一个值之前就准备好了。下面的例子说明了这一点:

import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor
data = some_huge_list()
results = executor.map(crunch_number, data)
finals = []

for value in results:
    finals.append(do_some_stuff(value))

在这个例子中,do_some_stuff可能比crunch_number花费更长的时间,如果是这样的话,在保持map的易用性的同时,性能也不会有太大的损失。
此外,由于工作线程(/processes)从列表的开头开始处理,一直处理到你提交的列表的结尾,结果应该按照迭代器已经产生的顺序完成。这意味着在大多数情况下executor.map就可以了,但在某些情况下,例如,如果你处理值的顺序无关紧要,并且你传递给map的函数需要非常不同的时间来运行,那么future.as_completed可能会更快。

2exbekwf

2exbekwf2#

如果使用concurrent.futures.as_completed,则可以处理每个函数的异常。

import concurrent.futures
iterable = [1,2,3,4,6,7,8,9,10]

def f(x):
    if x == 2:
        raise Exception('x')
    return x

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    # -> using `executor.submit()` **requires** calling
    #      `concurrent.futures.as_completed()` <-
    #
    for future in concurrent.futures.as_completed(result_futures):
        try:
            print('resutl is', future.result())
        except Exception as e:
            print('e is', e, type(e))
# resutl is 3
# resutl is 1
# resutl is 4
# e is x <class 'Exception'>
# resutl is 6
# resutl is 7
# resutl is 8
# resutl is 9
# resutl is 10

executor.map中,如果出现异常,整个执行程序会停止。你需要在worker函数中处理异常。

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    # -> Do not call `concurrent.futures.as_completed()`
    #    when using `executor.map()` <-
    #
    for each in executor.map(f, iterable):
        print(each)
# if there is any exception, executor.map would stop
u0sqgete

u0sqgete3#

下面是一个.submit().map()的示例。它们都立即接受作业(提交|mapped - start)。它们完成的时间相同,都是11秒(最后一个结果时间- start)。但是,.submit()会在ThreadPoolExecutormaxThreads=2中的任何线程完成后立即给出结果(无序!)。而.map()会按照提交的顺序给出结果。

import time
import concurrent.futures

def worker(i):
    time.sleep(i)
    return i,time.time()

e = concurrent.futures.ThreadPoolExecutor(2)
arrIn = range(1,7)[::-1]
print arrIn

f = []
print 'start submit',time.time()
for i in arrIn:
    f.append(e.submit(worker,i))
print 'submitted',time.time()
for r in concurrent.futures.as_completed(f):
    print r.result(),time.time()
print

f = []
print 'start map',time.time()
f = e.map(worker,arrIn)
print 'mapped',time.time()
for r in f:
    print r,time.time()

输出:

[6, 5, 4, 3, 2, 1]
start submit 1543473934.47
submitted 1543473934.47
(5, 1543473939.473743) 1543473939.47
(6, 1543473940.471591) 1543473940.47
(3, 1543473943.473639) 1543473943.47
(4, 1543473943.474192) 1543473943.47
(1, 1543473944.474617) 1543473944.47
(2, 1543473945.477609) 1543473945.48

start map 1543473945.48
mapped 1543473945.48
(6, 1543473951.483908) 1543473951.48
(5, 1543473950.484109) 1543473951.48
(4, 1543473954.48858) 1543473954.49
(3, 1543473954.488384) 1543473954.49
(2, 1543473956.493789) 1543473956.49
(1, 1543473955.493888) 1543473956.49
xpcnnkqh

xpcnnkqh4#

除了这里的答案中的解释之外,直接去找来源也是有帮助的。它重申了这里另一个答案的陈述:

  • .map()按照提交的顺序给出结果,而
  • 使用concurrent.futures.as_completed()迭代Future对象列表不能保证这种顺序,因为这是as_completed()的本质

.map()在基类concurrent.futures._base.Executor中定义:

class Executor(object):
    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]  # <!!!!!!!!

        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()  # <!!!!!!!!
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()  # <!!!!!!!!
                    else:
                        yield fs.pop().result(end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

正如您提到的,还有.submit(),它将在子类中定义,即ProcessPoolExecutorThreadPoolExecutor,并返回一个_base.Future示例,您需要调用.result()来实际执行任何操作。
.map()中的重要行可以归结为:

fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs.reverse()
while fs:
    yield fs.pop().result()

.reverse()加上.pop()是一种方法,它首先得到第一个提交的结果(来自iterables),然后得到第二个提交的结果,依此类推。它们本身就是实际的结果。

相关问题