python-3.x Asyncio:如何处理多个打开的文件操作系统错误

92vpleto  于 2023-07-01  发布在  Python
关注(0)|答案(2)|浏览(210)

我正在尝试运行~500个异步子进程。我在下面的main函数中将文件作为listp_coros传递。

async def run_check(shell_command):
    p = await asyncio.create_subprocess_shell(shell_command,
                    stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    fut = p.communicate()
    try:
        pcap_run = await asyncio.wait_for(fut, timeout=5)
    except asyncio.TimeoutError:
        p.kill()
        await p.communicate()

def get_coros():
    for pcap_loc in print_dir_cointent():
        for pcap_check in get_pcap_executables():
            tmp_coro = (run_check('{args}'
            .format(e=sys.executable, args=args)))
            if tmp_coro != False:
                coros.append(tmp_coro)
     return coros

async def main(self):
    ## Here p_coros has over 500 files
    p_coros = get_coros()
    for f in asyncio.as_completed(p_coros):
        res = await f



loop = asyncio.get_event_loop()
loop.run_until_complete(get_coros())
loop.close()

我认为这里的问题是asyncio.as_completed,因为它试图并行打开所有文件,因为如果我删除asyncio.as_completed,它可以正常工作,但需要很多时间。我想处理打开文件的问题 OSError(24,'Too many open files') 而不浪费太多时间。
日志:

Exception ignored when trying to write to the signal wakeup fd:
BlockingIOError: [Errno 11] Resource temporarily unavailable

ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending coro=<ClassificationCheck.run_check() running at ./regression.py:74> wait_for=<Future finished exception=RuntimeError('Event loop is closed',)> cb=[as_completed.<locals>._on_completion() at /usr/lib/python3.5/asyncio/tasks.py:478]>

追溯:

Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "./regression.py", line 74, in run_check
    stdin=PIPE, stdout=PIPE, stderr=STDOUT)
  File "/usr/lib/python3.5/asyncio/subprocess.py", line 197, in create_subprocess_shell
    stderr=stderr, **kwds)
  File "/usr/lib/python3.5/asyncio/base_events.py", line 1049, in subprocess_shell
    protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 184, in _make_subprocess_transport
    **kwargs)
  File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 40, in __init__
    stderr=stderr, bufsize=bufsize, **kwargs)
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 640, in _start
    stdin, stdin_w = self._loop._socketpair()
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 53, in _socketpair
    return socket.socketpair()
  File "/usr/lib/python3.5/socket.py", line 478, in socketpair
    a, b = _socket.socketpair(family, type, proto)
OSError: [Errno 24] Too many open files
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<ClassificationCheck.run_check() done, defined at ./regression.py:72> exception=OSError(24, 'Too many open files')>
k97glaaz

k97glaaz1#

由于我传递了很多文件进行异步工作,它抛出了OS错误。我处理它的方式是创建一个列表列表,每个子列表包含固定数量的PCAP,不会导致 *OS错误 *,然后一次传递一个列表。
因此,我了解到,在继续处理更多文件之前,关闭已经打开的文件是很重要的。

def get_coros(pcap_list):
    for pcap_loc in pcap_list:
        for pcap_check in get_pcap_executables():
            tmp_coro = (run_check('{args}'
            .format(e=sys.executable, args=args)))
            if tmp_coro != False:
                coros.append(tmp_coro)
     return coros

async def main():
    pcap_list_gen = print_dir_cointent() # Passing a list of lists
    for pcap_list in pcap_list_gen:
        p_coros = get_coros(pcap_list)
        for f in asyncio.as_completed(p_coros):
            res = await f
xt0899hw

xt0899hw2#

我发现,即使使用with语句打开文件,并且许多I/O正在并发执行,有时也会发生此问题。因此,在块的末尾显式关闭文件句柄帮助我避免了这个错误。

相关问题