python-3.x 对实时方案的新任务使用并发future

sd2nnvve  于 2023-03-04  发布在  Python
关注(0)|答案(4)|浏览(135)

使用Python 3的concurrent.futures模块进行并行工作相当容易,如下所示。

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
    for future in concurrent.futures.as_completed(future_to):
        data = future.result()

在Queue中插入和检索项也非常方便。

q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
   q.get()

我有一个脚本在后台运行以监听更新,现在,理论上假设,当这些更新到达时,我会将它们排队,并使用ThreadPoolExecutor并发地处理它们。
现在,所有这些组件都各自独立地工作,并且是有意义的,但是我如何将它们一起使用呢?我不知道是否可以真实的地从队列中提供ThreadPoolExecutor工作,除非要工作的数据是预先确定的。
简而言之,我所要做的就是,接收更新,比如说每秒4条消息,将它们推到队列中,然后让我的并发未来处理它们。如果我不这样做,那么我就被缓慢的顺序方法卡住了。
让我们看看下面Python文档中的规范示例:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

URLS的列表是固定的。是否有可能实时地提供这个列表,并让工作者在它们经过时处理它,也许是出于管理目的从队列中处理它?我有点困惑我的方法是否 * 实际上是可能的 *?

b4lqfgs4

b4lqfgs41#

Python文档中的示例,扩展为从队列中获取工作,需要注意的是,这段代码使用concurrent.futures.wait而不是concurrent.futures.as_completed,以允许在等待其他工作完成的同时启动新的工作。

import concurrent.futures
import urllib.request
import time
import queue

q = queue.Queue()

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def feed_the_workers(spacing):
    """ Simulate outside actors sending in work to do, request each url twice """
    for url in URLS + URLS:
        time.sleep(spacing)
        q.put(url)
    return "DONE FEEDING"

def load_url(url, timeout):
    """ Retrieve a single page and report the URL and contents """
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

    # start a future for a thread which sends work in through the queue
    future_to_url = {
        executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}

    while future_to_url:
        # check for status of the futures which are currently working
        done, not_done = concurrent.futures.wait(
            future_to_url, timeout=0.25,
            return_when=concurrent.futures.FIRST_COMPLETED)

        # if there is incoming work, start a new future
        while not q.empty():

            # fetch a url from the queue
            url = q.get()

            # Start the load operation and mark the future with its URL
            future_to_url[executor.submit(load_url, url, 60)] = url

        # process any completed futures
        for future in done:
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
            else:
                if url == 'FEEDER DONE':
                    print(data)
                else:
                    print('%r page is %d bytes' % (url, len(data)))

            # remove the now completed future
            del future_to_url[future]

获取每个url两次的输出:

'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
'http://www.bbc.co.uk/' page is 193780 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
DONE FEEDING
'http://www.bbc.co.uk/' page is 193605 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://europe.wsj.com/' page is 874649 bytes
'http://europe.wsj.com/' page is 874649 bytes
qxgroojn

qxgroojn2#

在工作中,我发现了一个需要并行处理无限数据流的情况,于是我创建了一个小型库,灵感来自Stephen Rauch已经给出的出色答案。
我最初通过考虑两个单独的线程来解决这个问题,一个线程向队列提交工作,另一个线程监视队列中任何已完成的任务,并为新工作的进入腾出更多空间。这与Stephen Rauch的提议类似,他使用在单独线程中运行的feed_the_workers函数来使用流。
通过与我的一位同事交谈,他帮助我认识到,如果定义一个缓冲迭代器,允许您控制每次准备向线程池提交更多工作时从输入流中释放多少元素,那么您可以在单个线程中完成所有任务。
因此,我们引入BufferedIter

class BufferedIter(object):
    def __init__(self, iterator):
        self.iter = iterator

    def nextN(self, n):
        vals = []
        for _ in range(n):
            vals.append(next(self.iter))
        return vals

这允许我们以如下方式定义流处理器

import logging
import queue
import signal
import sys
import time
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED

level = logging.DEBUG
log = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
handler.setLevel(level)
log.addHandler(handler)
log.setLevel(level)

WAIT_SLEEP = 1  # second, adjust this based on the timescale of your tasks

def stream_processor(input_stream, task, num_workers):

    # Use a queue to signal shutdown.
    shutting_down = queue.Queue()

    def shutdown(signum, frame):
        log.warning('Caught signal %d, shutting down gracefully ...' % signum)
        # Put an item in the shutting down queue to signal shutdown.
        shutting_down.put(None)

    # Register the signal handler
    signal.signal(signal.SIGTERM, shutdown)
    signal.signal(signal.SIGINT, shutdown)

    def is_shutting_down():
        return not shutting_down.empty()

    futures = dict()
    buffer = BufferedIter(input_stream)
    with ThreadPoolExecutor(num_workers) as executor:
        num_success = 0
        num_failure = 0
        while True:
            idle_workers = num_workers - len(futures)

            if not is_shutting_down():
                items = buffer.nextN(idle_workers)
                for data in items:
                    futures[executor.submit(task, data)] = data

            done, _ = wait(futures, timeout=WAIT_SLEEP, return_when=ALL_COMPLETED)
            for f in done:
                data = futures[f]
                try:
                    f.result(timeout=0)
                except Exception as exc:
                    log.error('future encountered an exception: %r, %s' % (data, exc))
                    num_failure += 1
                else:
                    log.info('future finished successfully: %r' % data)
                    num_success += 1

                del futures[f]

            if is_shutting_down() and len(futures) == 0:
                break

        log.info("num_success=%d, num_failure=%d" % (num_success, num_failure))

下面我们展示了一个如何使用流处理器的示例

import itertools

def integers():
    """Simulate an infinite stream of work."""
    for i in itertools.count():
        yield i

def task(x):
    """The task we would like to perform in parallel.
    With some delay to simulate a time consuming job.
    With a baked in exception to simulate errors.
    """
    time.sleep(3)
    if x == 4:
        raise ValueError('bad luck')
    return x * x

stream_processor(integers(), task, num_workers=3)

此示例的输出如下所示

2019-01-15 22:34:40,193 future finished successfully: 1
2019-01-15 22:34:40,193 future finished successfully: 0
2019-01-15 22:34:40,193 future finished successfully: 2
2019-01-15 22:34:43,201 future finished successfully: 5
2019-01-15 22:34:43,201 future encountered an exception: 4, bad luck
2019-01-15 22:34:43,202 future finished successfully: 3
2019-01-15 22:34:46,208 future finished successfully: 6
2019-01-15 22:34:46,209 future finished successfully: 7
2019-01-15 22:34:46,209 future finished successfully: 8
2019-01-15 22:34:49,215 future finished successfully: 11
2019-01-15 22:34:49,215 future finished successfully: 10
2019-01-15 22:34:49,215 future finished successfully: 9
^C <=== THIS IS WHEN I HIT Ctrl-C
2019-01-15 22:34:50,648 Caught signal 2, shutting down gracefully ...
2019-01-15 22:34:52,221 future finished successfully: 13
2019-01-15 22:34:52,222 future finished successfully: 14
2019-01-15 22:34:52,222 future finished successfully: 12
2019-01-15 22:34:52,222 num_success=14, num_failure=1
r7xajy2e

r7xajy2e3#

我真的很喜欢@pedro上面有趣的方法。但是,当处理成千上万个文件时,我注意到在最后会抛出一个StopIteration,并且总是会跳过一些文件。我不得不做了一点修改,如下所示。非常有用的答案。

class BufferedIter(object):
    def __init__(self, iterator):
        self.iter = iterator

    def nextN(self, n):
        vals = []
        try:
            for _ in range(n):
                vals.append(next(self.iter))
            return vals, False
        except StopIteration as e:
            return vals, True

--拨打如下电话

...
if not is_shutting_down():
   items, is_finished = buffer.nextN(idle_workers)
   if is_finished:
        stop()
...

--其中stop是一个函数,它只告诉用户关闭

def stop():
    shutting_down.put(None)
bprjcwpo

bprjcwpo4#

不必严格使用队列就可以获得执行器的好处。新任务从主线程提交。未完成的future被跟踪并等待,直到所有future完成。

import concurrent.futures
import sys
import time

sys.setrecursionlimit(64)  # This is only for demonstration purposes to trigger a RecursionError. Do not set in practice.

def slow_factorial(n: int) -> int:
    time.sleep(0.01)
    if n == 0:
        return 1
    else:
        return n * slow_factorial(n-1)

initial_inputs = [0, 1, 5, 20, 200, 100, 50, 51, 55, 40, 44, 21, 222, 333, 202, 1000, 10, 9000, 9009, 99, 9999]

for executor_class in (concurrent.futures.ThreadPoolExecutor, concurrent.futures.ProcessPoolExecutor):
    for max_workers in (4, 8, 16, 32):
        start_time = time.monotonic()
        with executor_class(max_workers=max_workers) as executor:
            futures_to_n = {executor.submit(slow_factorial, n): n for n in initial_inputs}
            while futures_to_n:
                futures_done, futures_not_done = concurrent.futures.wait(futures_to_n, return_when=concurrent.futures.FIRST_COMPLETED)
                # Note: Length of futures_done is often > 1.
                for future in futures_done:
                    n = futures_to_n.pop(future)
                    try:
                        factorial_n = future.result()
                    except RecursionError:
                        n_smaller = int(n ** 0.9)
                        future = executor.submit(slow_factorial, n_smaller)
                        futures_to_n[future] = n_smaller
                        # print(f'Failed to compute factorial of {n}. Trying to compute factorial of a smaller number {n_smaller} instead.')
                    else:
                        # print(f'Factorial of {n} is {factorial_n}.')
                        pass
        used_time = time.monotonic() - start_time
        executor_type = executor_class.__name__.removesuffix('PoolExecutor').lower()
        print(f'Workflow took {used_time:.1f}s with {max_workers} {executor_type} workers.')
    print()

输出:

Workflow took 9.4s with 4 thread workers.
Workflow took 6.3s with 8 thread workers.
Workflow took 5.4s with 16 thread workers.
Workflow took 5.2s with 32 thread workers.

Workflow took 9.0s with 4 process workers.
Workflow took 5.9s with 8 process workers.
Workflow took 5.1s with 16 process workers.
Workflow took 4.9s with 32 process workers.

为了更清楚起见,取消注解两个print语句。根据上面的输出,工人越多,速度越快。

相关问题