python-3.x 运行时错误:〈_io. BufferedWriter name =''>内部的可重入调用< stdout>'&gt;

tkqqtvp1  于 2023-02-10  发布在  Python
关注(0)|答案(1)|浏览(322)

我正在写一个程序,它每隔N秒启动一个线程来产生"工作",并将其添加到一个队列中,然后,我有一个线程池来处理队列中的项目。
下面的程序运行得非常好,直到我注解掉/删除第97行(主函数中的time.sleep(0.5))。一旦我这样做了,它会生成一个RuntimeError,试图优雅地停止程序(通过向主进程发送SIGINT或SIGTERM)。它甚至在0.1s这样的极小睡眠时间下也能正常工作,但根本没有问题。
我试着研究"重入",但不幸的是它有点超出了我的理解范围。
有谁能帮我弄明白这一点吗?

    • 代码:**
import random
import signal
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
from queue import Empty, Queue, SimpleQueue
from typing import Any

class UniqueQueue:
    """
    A thread safe queue which can only ever contain unique items.
    """

    def __init__(self) -> None:
        self._q = Queue()
        self._items = []
        self._l = threading.Lock()

    def get(self, block: bool = False, timeout: float | None = None) -> Any:
        with self._l:
            try:
                item = self._q.get(block=block, timeout=timeout)
            except Empty:
                raise
            else:
                self._items.pop(0)
                return item

    def put(self, item: Any, block: bool = False, timeout: float | None = None) -> None:
        with self._l:
            if item in self._items:
                return None
            self._items.append(item)
            self._q.put(item, block=block, timeout=timeout)

    def size(self) -> int:
        return self._q.qsize()

    def empty(self) -> bool:
        return self._q.empty()

def stop_app(sig_num, sig_frame) -> None:
    # global stop_app_event
    print("Signal received to stop the app")
    stop_app_event.set()

def work_generator(q: UniqueQueue) -> None:
    last_execution = time.time()
    is_first_execution = True
    while not stop_app_event.is_set():
        elapsed_seconds = int(time.time() - last_execution)
        if elapsed_seconds <= 10 and not is_first_execution:
            time.sleep(0.5)
            continue
        last_execution = time.time()
        is_first_execution = False
        print("Generating work...")
        for _ in range(100):
            q.put({"n": random.randint(0, 500)})

def print_work(w) -> None:
    print(f"{datetime.now()}: {w}")

def main():
    # Create a work queue
    work_queue = UniqueQueue()

    # Create a thread to generate the work and add to the queue
    t = threading.Thread(target=work_generator, args=(work_queue,))
    t.start()

    # Create a thread pool, get work from the queue, and submit to the pool for processing
    pool = ThreadPoolExecutor(max_workers=20)
    futures: list[Future] = []
    while True:
        print("Processing work...")
        if stop_app_event.is_set():
            print("stop_app_event is set:", stop_app_event.is_set())
            for future in futures:
                future.cancel()
            break
        print("Queue Size:", work_queue.size())
        try:
            while not work_queue.empty():
                work = work_queue.get()
                future = pool.submit(print_work, work)
                futures.append(future)
        except Empty:
            pass
        time.sleep(0.5)

    print("Stopping the work generator thread...")
    t.join(timeout=10)
    print("Work generator stopped")
    print("Stopping the thread pool...")
    pool.shutdown(wait=True)
    print("Thread pool stopped")

if __name__ == "__main__":
    stop_app_event = threading.Event()
    signal.signal(signalnum=signal.SIGINT, handler=stop_app)
    signal.signal(signalnum=signal.SIGTERM, handler=stop_app)
    main()
enyaitl3

enyaitl31#

这是因为您在信号处理程序stop_app()中调用了print()
在C语言中,信号处理程序在后台线程中执行,但在Python中,它在主线程中执行(参见参考文献)。在您的示例中,当执行print()调用时,另一个print()被调用,因此术语“reentrant”非常合适。当前的IO堆栈禁止reentrant调用。(如果您感兴趣,请参见实现)
您可以通过使用os.write()sys.stdout来解决这个问题,如下所示。

import sys
import os
...
def stop_app(sig_num, sig_frame):
    os.write(sys.stdout.fileno(), b"Signal received to stop the app\n")
    stop_app_event.set()

相关问题