python-3.x 如何测量异步发电机的运行时间?

pkln4tw6  于 2023-05-19  发布在  Python
关注(0)|答案(2)|浏览(191)

我想测量发生器花费的时间(阻塞主循环的时间)。
假设我有以下两个生成器:

async def run():
    for i in range(5):
        await asyncio.sleep(0.2)
        yield i
    return

async def walk():
    for i in range(5):
        time.sleep(0.3)
        yield i
    return

我想测量run每次迭代花费大约0.0s,而walk至少使用0.3s
我想使用类似于this的东西,但无法让它为我工作。
澄清:
我想排除在任何await部分花费的时间。如果因为某种原因协程停止了,那么我不想考虑这段时间。

pxyaymoc

pxyaymoc1#

所以-一件事(虽然有点)是在常规的协同例程中测量时间-我用装饰器来实现它。
然而,当你进一步进入异步生成器时,它是另一个野兽-我仍在试图弄清楚-它是公共暴露的异步迭代器方法(__anext__asend等...)与返回对象中的传统迭代器方法的混合,我还不能弄清楚(我刚刚打开PEP 525,看看我是否能理解它)。
至于常规的协同例程,有一个问题:如果你创建了一个可等待类(我的decorator),asyncio将要求它的__await__方法返回一个__next__方法,这个方法将被调用。但是原生Python协同例程没有__next__:asyncio在这些上调用send()--所以我必须做这个“转置”,以便能够测量时间(在协同例程中)。

import asyncio
import time

class ProfileCoro:
    def __init__(self, corofunc):
        self.corofunc = corofunc

    def __call__(self, *args, **kw):
        # WARNING: not parallel execution-safe: fix by
        # keeping "self.ellapsed" in a proper contextvar
        self.ellapsed = 0
        self.coro = self.corofunc(*args, **kw)
        return self

    def __await__(self):
        return self

    def __iter__(self):
        return self

    def __next__(self):
        return self.send(None)

    def throw(self, exc):
        print(f"Arghh!, got an {exc}")
        self.coro.throw(exc)

    def send(self, arg):
        start = time.monotonic()
        try:
            result = self.coro.send(arg)
        except StopIteration:
            duration = time.monotonic() - start
            self.ellapsed += duration
            print(f"Ellapsed time in execution of {self.corofunc.__name__}: {self.ellapsed:.05f}s")
            raise
        duration = time.monotonic() - start
        self.ellapsed += duration
        return result

    def __repr__(self):
        return f"<ProfileCoro wrapper for {self.corofunc}>"

@ProfileCoro
async def run():
    for i in range(5):
        await asyncio.sleep(0.2)
        # yield i
    return 1

@ProfileCoro
async def walk():
    for i in range(5):
        time.sleep(0.3)
        #yield i
    return 3

async def main():
    await run()
    await walk()
    return

asyncio.run(main())

如果我能弄清楚如何 Package 异步生成器,* 也许 * 会继续。
(我认为大多数现有的分析工具使用语言中可用的工具进行调试和跟踪(通过sys.settrace()启用:所有的东西在回调中都是“可见的”,不用担心 Package 所有的内部调用(由async机制和asyncio循环进行)
...所以,这里是在异步生成器中捕获时间的代码。
它会得到一个好的路径-如果有复杂的等待类,实现或使用asendathrow,这不会做-但对于一个简单的异步生成器函数plugget到async for语句,它现在工作:
免责声明:在下面的代码中可能有未使用的代码,甚至是未使用的状态--我来回了很多次才让它工作(很多是因为我没有尝试__anext__本身必须是异步的)。尽管如此,还是这样:

import asyncio
import time

from functools import wraps

async def _a():
    yield 1

async_generator_asend_type = type(_a().__anext__())

class ProfileCoro:
    def __init__(self, corofunc):
        self.corofunc = corofunc
        self.measures = 0

    def measure_ellapsed(func):
        @wraps(func)
        def wrapper(self, *args, **kw):
            self.measures += 1
            if self.measures > 1:
                try:
                    return func(self, *args, **kw)
                finally:
                    self.measures -= 1
            start = time.monotonic()
            try:
                result = func(self, *args, **kw)
            except (StopIteration, StopAsyncIteration):
                self.ellapsed += time.monotonic() - start
                #self.print_ellapsed()
                raise
            finally:
                self.measures -= 1
            self.ellapsed += time.monotonic() - start

            return result
        return wrapper

    def print_ellapsed(self):
        name = getattr(self.corofunc, "__name__", "inner_iterator")
        print(f"Ellapsed time in execution of {name}: {self.ellapsed:.05f}s")

    def __call__(self, *args, **kw):
        # WARNING: not parallel execution-safe: fix by
        # keeping "self.ellapsed" in a proper contextvar
        self.ellapsed = 0
        self.measures = 0
        if not isinstance(self.corofunc, async_generator_asend_type):
            self.coro = self.corofunc(*args, **kw)
        else:
            self.coro = self.corofunc
        return self

    def __await__(self):
        return self

    def __iter__(self):
        return self

    @measure_ellapsed
    def __next__(self):
        target = self.coro
        if hasattr(target, "__next__"):
            return target.__next__()
        elif hasattr(target, "send"):
            return target.send(None)

    async def athrow(self, exc):
        print(f"Arghh!, got an async-iter-mode {exc}")
        return await self.async_iter.athrow(exc)

    def throw(self, exc):
        print(f"Arghh!, got an {exc}")
        self.coro.throw(exc)

    @measure_ellapsed
    def send(self, arg):
        return self.coro.send(arg)

    def __aiter__(self):
        return self

    #async def asend(self, value):
        ...

    async def aclose(self):
        return await self.async_iter.close()

    def close(self):
        return self.async_iter.close()

    async def __anext__(self):
        if not hasattr(self, "async_iter"):
            self.async_iter = aiter(self.coro)
        self.inner_profiler = ProfileCoro(self.async_iter.__anext__())
        #start = time.monotonic()
        try:
            result = await self.inner_profiler()
        except StopAsyncIteration:
            #self.print_ellapsed()
            raise
        finally:
            self.ellapsed += self.inner_profiler.ellapsed
        return result

    def __repr__(self):
        return f"<ProfileCoro wrapper for {self.corofunc}>"

@ProfileCoro
async def run():
    for i in range(5):
        await asyncio.sleep(0.05)
        # yield i
    return 1

@ProfileCoro
async def walk():
    for i in range(5):
        time.sleep(0.05)
        #yield i
    return 3

@ProfileCoro
async def irun():
    for i in range(5):
        await asyncio.sleep(0.05)
        yield i

@ProfileCoro
async def iwalk():
    for i in range(5):
        time.sleep(0.05)
        yield i

async def main():
    await run()
    run.print_ellapsed()
    await walk()
    walk.print_ellapsed()
    async for _ in irun():
        print(".", end="", flush=True)
    irun.print_ellapsed()
    async for _ in iwalk():
        pass
    iwalk.print_ellapsed()
    return

asyncio.run(main())
myzjeezk

myzjeezk2#

为什么不使用Profiler?
yappi非常擅长分析,尤其是coroutine s。

相关问题