linux 用python在后台监控文件描述符

jfgube3f  于 2023-08-03  发布在  Linux
关注(0)|答案(2)|浏览(154)

由于我的问题here,我必须在后台监视文件描述符。
我试过这个:

import gpiod
import asyncio
import select

async def read():
    name = "GPOUT"
    line = chip.get_line(INPUT)
    line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
    fd = line.event_get_fd()
    poll = select.poll()
    poll.register(fd)
    loop = asyncio.get_event_loop()
    while True:
        await loop.run_in_executor(None, poll.poll, None)
        event = line.event_read()
        print(event)

def main():
    asyncio.run(read())
    while True:
        input = input("Enter something: ")
        print(input)

main()

字符串
执行在调用关闭poll.poll时被阻止。我也试过这个:

import gpiod
import asyncio

def readCb(line):
    print(f"{line.consumer}: {line.event_read()}")

def read():
    name = "GPOUT"
    line = chip.get_line(INPUT)
    line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
    fd = line.event_get_fd()
    loop = asyncio.get_event_loop()
    loop.add_reader(fd, readCb, line)
    loop.run_forever()

def main():
    read()
    while True:
        input = input("Enter something: ")
        print(input)

main()


也是读取功能中的块。
对于第二个例子,我也试图使read()async并使用asyncio.run()调用它,但随后发生以下错误:

File "/usr/lib/python3.9/asyncio/base_events.py", line 578, in _check_running
    raise RuntimeError('This event loop is already running')


除了真实的的线程和子进程之外,在python中有没有可能做到这一点?
非常感谢您的帮助和最好的问候,
圆锥体

qncylg1j

qncylg1j1#

不管您从名称中可能会想到什么,poll.poll会阻塞,直到其中一个注册的fd s准备就绪。它不投票。
如果不存在事件,则从具有libgpiod的行阅读事件将被阻塞。因此,如果您不希望它阻塞,那么首先检查是否存在可用的事件-通过测试fd是否可读。
如果您想要运行的其他代码也是在fd上的I/O绑定,那么您可以让poll等待来自其中一个的事件。这是传统的async风格。
如果你的其他代码在fd上没有阻塞,那么你有两个选择,线程或轮询。通过线程化,您可以在一个Python线程中运行libgpiod调用,而在另一个Python线程中运行其他代码。通过轮询,您可以定期检查线路,查看是否发生了边缘事件。轮询通常比线程效率低,但是如果您认为Python线程是“真实的的线程”,那么您剩下的解决方案就是轮询。

jpfvwuh4

jpfvwuh42#

感谢Kent Gibson对我的other question的帮助,我能够构建以下内容:

import gpiod
import asyncio

async def read():
    name = "GPOUT"
    line = chip.get_line(INPUT)
    line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
    loop = asyncio.get_event_loop()
    while True:
        await loop.run_in_executor(None, line.event_wait, None)
        event = line.event_read()
        print(event)

def main():
    asyncio.run(read())
    while True:
        input = input("Enter something: ")
        print(input)

main()

字符串
它的作用是意图。

相关问题