python线程未使用atexit退出

u4dcyp6a  于 2023-03-20  发布在  Python
关注(0)|答案(3)|浏览(182)

这是我的脚本,当我在shell中运行它时,它只是无限期地挂起,而我希望它干净地终止。

import logging
from logging import StreamHandler
import pymsteams
import queue
import threading
import atexit

class TeamsHandler(StreamHandler):
    def __init__(self, channel_url):
        super().__init__()
        self.channel_url = channel_url
        self.queue = queue.Queue()
        self.thread = threading.Thread(target=self._worker)
        self.thread.start()
        atexit.register(self.queue.put, None)

    def _worker(self):
        while True:
            record = self.queue.get()
            if record is None:
                break
            msg = self.format(record)
            print(msg)

    def emit(self, record):
        # enqueue the record to log and return control to the caller
        self.queue.put(record)

if __name__ == "__main__":
    my_logger = logging.getLogger('TestLogging')
    my_logger.setLevel(logging.DEBUG)
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.DEBUG)
    my_logger.addHandler(console_handler)

    CHANNEL_ID = "not_used_anyway"
    teamshandler = TeamsHandler(CHANNEL_ID)
    teamshandler.setFormatter(logging.Formatter('%(levelname)s %(message)s'))
    teamshandler.setLevel(logging.DEBUG)
    my_logger.addHandler(teamshandler)
    for i in range(1, 2):
        my_logger.error(f"this is an error [{i}]")
        my_logger.info(f"this is an info [{i}]")

应该由atexit发送的None记录(第28行)从未到达,因此线程永远保持打开状态。
如何通过只修改TeamsHandler来确保程序干净地退出?

pw9qyyiw

pw9qyyiw1#

我有办法了,看看:

import queue
import threading

class Worker:
    def __init__(self):
        self.queue = queue.Queue()
        threading.Thread(target=self._worker).start()

    def _worker(self):
        print("starting thread")
        while True:
            record = self.queue.get()
            if record is None:
                print("exiting")
                break
            print(f"Got message: {record}")

    def emit(self, record):
        self.queue.put(record)

class Wrapper:
    def __init__(self):
        self._worker = Worker()

    def __del__(self):
        print("Wrapper is being deleted")
        self._worker.emit(None)

    def emit(self, record):
        self._worker.emit(record)

def main():
    worker = Wrapper()
    worker.emit("foo")
    worker.emit("bar")
    print("main exits")

if __name__ == "__main__":
    main()

这里的要点是,当main退出时,workerWrapper的一个示例)将超出作用域,它的__del__方法将被调用,并且它将向一个真实的的工作对象发送停止消息。
运行此代码的结果(当然,“Got message”行可以位于不同的位置):

starting thread
main exits
Wrapper is being deleted
Got message: foo
Got message: bar
exiting
jdg4fx2g

jdg4fx2g2#

正如avysk所指出的,问题很可能是atexit处理程序触发得太晚,在等待非守护进程线程已经(应该)完成之后,这会导致死锁。
如果我是你,我只会在if __name__ == '__main__'块的末尾添加一个类似TeamsHandler.finish()的调用,并沿着以下代码行修改TeamsHandler(未测试):

_queues = []

class TeamsHandler(StreamHandler):
    def __init__(self, channel_url):
        super().__init__()
        self.channel_url = channel_url
        self.queue = queue.Queue()
        self.thread = threading.Thread(target=self._worker)
        self.thread.start()
        _queues.append(self.queue)

    def _worker(self):
        while True:
            record = self.queue.get()
            if record is None:
                break
            msg = self.format(record)
            print(msg)

    def emit(self, record):
        # enqueue the record to log and return control to the caller
        self.queue.put(record)

    @staticmethod
    def finish(self):
        for q in _queues:
            q.put(None)
        del _queues[:]
2q5ifsrm

2q5ifsrm3#

为了确保线程在程序结束时结束,应该将其设置为守护线程,这样就不需要任何atexit的东西,因为它启动得不够早,所以根本不起作用。

class TeamsHandler(StreamHandler):
    def __init__(self, channel_url):
        super().__init__()
        self.channel_url = channel_url
        self.queue = queue.Queue()
        self.thread = threading.Thread(target=self._worker)
    
        self.thread.setDaemon(True)  # INSERT THIS LINE
    
        self.thread.start()

# ...

相关问题