我正在写一个程序,它每隔N秒启动一个线程来产生"工作",并将其添加到一个队列中,然后,我有一个线程池来处理队列中的项目。
下面的程序运行得非常好,直到我注解掉/删除第97行(主函数中的time.sleep(0.5)
)。一旦我这样做了,它会生成一个RuntimeError,试图优雅地停止程序(通过向主进程发送SIGINT或SIGTERM)。它甚至在0.1s这样的极小睡眠时间下也能正常工作,但根本没有问题。
我试着研究"重入",但不幸的是它有点超出了我的理解范围。
有谁能帮我弄明白这一点吗?
- 代码:**
import random
import signal
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
from queue import Empty, Queue, SimpleQueue
from typing import Any
class UniqueQueue:
"""
A thread safe queue which can only ever contain unique items.
"""
def __init__(self) -> None:
self._q = Queue()
self._items = []
self._l = threading.Lock()
def get(self, block: bool = False, timeout: float | None = None) -> Any:
with self._l:
try:
item = self._q.get(block=block, timeout=timeout)
except Empty:
raise
else:
self._items.pop(0)
return item
def put(self, item: Any, block: bool = False, timeout: float | None = None) -> None:
with self._l:
if item in self._items:
return None
self._items.append(item)
self._q.put(item, block=block, timeout=timeout)
def size(self) -> int:
return self._q.qsize()
def empty(self) -> bool:
return self._q.empty()
def stop_app(sig_num, sig_frame) -> None:
# global stop_app_event
print("Signal received to stop the app")
stop_app_event.set()
def work_generator(q: UniqueQueue) -> None:
last_execution = time.time()
is_first_execution = True
while not stop_app_event.is_set():
elapsed_seconds = int(time.time() - last_execution)
if elapsed_seconds <= 10 and not is_first_execution:
time.sleep(0.5)
continue
last_execution = time.time()
is_first_execution = False
print("Generating work...")
for _ in range(100):
q.put({"n": random.randint(0, 500)})
def print_work(w) -> None:
print(f"{datetime.now()}: {w}")
def main():
# Create a work queue
work_queue = UniqueQueue()
# Create a thread to generate the work and add to the queue
t = threading.Thread(target=work_generator, args=(work_queue,))
t.start()
# Create a thread pool, get work from the queue, and submit to the pool for processing
pool = ThreadPoolExecutor(max_workers=20)
futures: list[Future] = []
while True:
print("Processing work...")
if stop_app_event.is_set():
print("stop_app_event is set:", stop_app_event.is_set())
for future in futures:
future.cancel()
break
print("Queue Size:", work_queue.size())
try:
while not work_queue.empty():
work = work_queue.get()
future = pool.submit(print_work, work)
futures.append(future)
except Empty:
pass
time.sleep(0.5)
print("Stopping the work generator thread...")
t.join(timeout=10)
print("Work generator stopped")
print("Stopping the thread pool...")
pool.shutdown(wait=True)
print("Thread pool stopped")
if __name__ == "__main__":
stop_app_event = threading.Event()
signal.signal(signalnum=signal.SIGINT, handler=stop_app)
signal.signal(signalnum=signal.SIGTERM, handler=stop_app)
main()
1条答案
按热度按时间enyaitl31#
这是因为您在信号处理程序
stop_app()
中调用了print()
。在C语言中,信号处理程序在后台线程中执行,但在Python中,它在主线程中执行(参见参考文献)。在您的示例中,当执行
print()
调用时,另一个print()
被调用,因此术语“reentrant”非常合适。当前的IO堆栈禁止reentrant调用。(如果您感兴趣,请参见实现)您可以通过使用
os.write()
和sys.stdout
来解决这个问题,如下所示。