python 使用Rollcio正确调度嵌套函数

1zmg4dgp  于 12个月前  发布在  Python
关注(0)|答案(2)|浏览(124)

我在安排与Duncio的工作时遇到了麻烦。我有一个这样的代码:

import asyncio

async def stream():
    char_string = "Hi. Hello. Hello."

    for char in char_string:
        await asyncio.sleep(0.1)  # something time consuming happening here
        print("got char:", char)
        yield char

async def sentences_generator():
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            yield sentence
            sentence = ""

async def process_sentence(sentence: str):
    print("waiting for processing sentence: ", sentence)
    await asyncio.sleep(len(sentence)*0.1)
    print("sentence processed!")

async def main():
    i=0
    async for sentence in sentences_generator():
        print("processing sentence: ", i)
        await process_sentence(sentence)
        i += 1

asyncio.run(main())

字符串
这是我的输出:

got char: H
got char: i
got char: .
got sentence:  Hi.
processing sentence:  0
waiting for processing sentence:  Hi.
sentence processed!
got char:  
got char: H
got char: e
got char: y
got char: .
got sentence:   Hey.
processing sentence:  1
waiting for processing sentence:   Hey.
sentence processed!
got char:  
got char: H
got char: e
got char: l
got char: l
got char: o
got char: .
got sentence:   Hello.
processing sentence:  2
waiting for processing sentence:   Hello.
sentence processed!


这不是最佳的。当process_sentence正在等待asyncio.sleep()(代表其他一些耗时的过程)时,它应该已经从流中获取了下一个字符。所以,我希望输出如下:

got char: H
got char: i
got char: .
got sentence:  Hi.
processing sentence:  0
waiting for processing sentence:  Hi.
got char:  # (space char)
got char: H
sentence processed!
got char: e
got char: y
got char: .
got sentence:   Hey.
processing sentence:  1
waiting for processing sentence:   Hey.
got char  # (space char)
got char H
got char: e
sentence processed!
got char: l
got char: l
got char: o
got char: .
got sentence:   Hello.
processing sentence:  2
waiting for processing sentence:   Hello.
sentence processed!


我怎么才能做到呢?

tag5nh1u

tag5nh1u1#

你可以通过创建独立运行的异步任务来实现你想要的输出。在你的代码中,你没有正确地创建任务。

注意:您的char_string不应该是"Hi. Hello. Thank you.",而应该是"Hi. Hey. Hello."

下面的代码将给予您所需的输出,即在process_sentence()等待时处理流中的下一个字符:Here's the link to your running code

import asyncio

async def stream(queue):
    char_string = "Hi. Hey. Hello."

    for char in char_string:
        await asyncio.sleep(0.1)  # something time-consuming happening here
        print("got char:", char)
        await queue.put(char)

    # Signal the end of the stream
    await queue.put(None)

async def sentences_generator(queue):
    sentence = ""
    while True:
        char = await queue.get()
        if char is None:
            break
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence:", sentence)
            yield sentence
            sentence = ""

async def process_sentence(sentence: str):
    print("waiting for processing sentence:", sentence)
    await asyncio.sleep(len(sentence) * 0.1)
    print("sentence processed!")

async def main():
    q = asyncio.Queue()

    # Run the stream coroutine concurrently
    stream_task = asyncio.create_task(stream(q))

    counter = 0
    async for sentence in sentences_generator(q):
        print("processing sentence:", counter)
        counter += 1
        sentence_task = asyncio.create_task(process_sentence(sentence))
        await sentence_task

    # Wait for the stream to finish
    await stream_task

# Run the coroutine
asyncio.run(main())

字符串
运行后,将生成以下输出:

got char: H
got char: i
got char: .
got sentence: Hi.
processing sentence: 0
waiting for processing sentence: Hi.
got char:  # <empty char>
got char: H
sentence processed!
got char: e
got char: y
got char: .
got sentence:  Hey.
processing sentence: 1
waiting for processing sentence:  Hey.
got char:  # <empty char>
got char: H
got char: e
got char: l
sentence processed!
got char: l
got char: o
got char: .
got sentence:  Hello.
processing sentence: 2
waiting for processing sentence:  Hello.
sentence processed!

dldeef67

dldeef672#

这并不是最佳的。当process_sentence正在等待receipt.sleep()时,它应该已经从流中获取了下一个字符。
在这个设置中,这是不可能的。await基本上阻止了正在运行的任务/协程。
在下面的代码中:

async def main():
    i = 0
    async for sentence in sentences_generator():
        print("processing sentence: ", i)
        await process_sentence(sentence)
        i += 1

字符串
什么时候你会看到"got char:"字符串?当第一行执行(async for)时,但这里await process_sentence(sentence)阻止了main()
你可以做的是使用队列。生产字符的任务(生产者)可以将它们推入队列,而使用字符的任务(消费者)可以从队列中获取项目。这样,即使消费者在获取字符之前仍然等待一段时间,生产者也会填充队列。它们将独立工作。

import asyncio

async def stream():
    char_string = "Hi. Hello. Thank you."
    for char in char_string:
        await asyncio.sleep(0.1)
        print("got char:", char)
        yield char

async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            await q.put(sentence)
            sentence = ""
    flag.set()

async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
    global i
    while not (q.empty() and flag.is_set()):
        item = await q.get()
        print("processing sentence: ", i)
        print("waiting for processing sentence: ", item)
        await asyncio.sleep(len(item) * 0.1)
        print("sentence processed!")
        i += 1

async def main():
    global i
    i = 1
    event = asyncio.Event()
    queue = asyncio.Queue[str]()
    producer = sentences_generator(queue, event)
    consumer = process_sentence(queue, event)
    await asyncio.gather(producer, consumer)

asyncio.run(main())

相关问题