python 控制流返回到调用者的嵌套的Pencio协程

plupiseo  于 11个月前  发布在  Python
关注(0)|答案(1)|浏览(120)

我有一系列的异步步骤要做,我想把它们放在一个方法中。每一步都生成一个子结果,并诱导下一个异步步骤,它再次生成一个子结果,直到到达最后一步。
看看我的代码:

class TextDetector(Detector):
    lock: asyncio.Lock = asyncio.Lock()

    async def detect(self, input: TextDetectorInput) -> TextDetectorResult: 
        pass

class TextAugmentationProcedure(AugmentationProcedure):
    async def augment(
        self, input: AugmentationProcedureTextInput
    ) -> AugmentationProcedureTextOutput:
            pass
    
TextDetectorCoolDownAndResultType = Awaitable[Awaitable[ImageDetectorResult]]
RunTextDetectionType = AsyncIterator[tuple[DetectorIdentifier, TextDetector, TextDetectorCoolDownAndResultType]]
GetFilesTextType = AsyncIterator[tuple[InputFileIdentifier, TextInputFile, AugmentationProcedureTextOutput, RunTextDetectionType]]
CheckTextType = AsyncIterator[tuple[AugmentationProcedureIdentifier, TextAugmentationProcedure, GetFilesTextType]]

@dataclass
class TextTestCase(TestCase):

    inputs: dict[InputFileIdentifier, TextInputFile]

    augmentation_procedures: dict[AugmentationProcedureIdentifier, TextAugmentationProcedure]
    detectors: dict[DetectorIdentifier, TextDetector]
    
    async def get_files(self, augmentation_procedure: TextAugmentationProcedure) -> GetFilesTextType:
        for input_identifier, text_file in self.inputs.items():
            augmentation_input = await read_text_file(
                text_file.file_name, text_file.language
            )

            augmentation_output = await augmentation_procedure.augment(augmentation_input)
        
            detector_input = TextDetectorInput(
                        augmentation_output.language, augmentation_output.text
            )

            yield (input_identifier, text_file, augmentation_output, self.run_detection(detector_input))

    async def run_detection(self, detector_input: TextDetectorInput) -> RunTextDetectionType:
        for detector_identifier, detector in self.detectors.items():
            async def cooldown_and_detect(detector: TextDetector, detector_input: TextDetectorInput):
                # Acquire lock
                with detector.lock
                    # Cooldown
                    cooleddown = await detector.cooldown()
                    
                    return detector.detect(detector_input)
            
            yield (detector_identifier, detector, cooldown_and_detect(detector, detector_input))
    
    async def check(self) -> CheckImageType:
        for augmentation_procedure_identifier, augmentation_procedure in self.augmentation_procedures.items():
            yield (augmentation_procedure_identifier, augmentation_procedure, self.get_files(augmentation_procedure))

字符串
基本上,我想在TextTestCase的示例上调用方法check(...)时得到子结果。有趣的部分是run_detection()。对于每个检测器,都应该通知调用者。之后,获取一个锁。然后调用detector.cooldown()并等待。如果等待,则应该通知调用者并调用detector.detect()。当结果可用时,则应该通知调用方并且应该释放锁。
目前,我用以下方式调用check()

test_case = TextTestCase()

async for (augmentation_procedure_identifier, augmentation_procedure, augmentation_results) in test_case.check():
    async for (file_identifier, image_file, augmentation_output_awaitable, detectors) in augmentation_results:
        
        results.append([image_file.file_name, str(procedure), "Augmenting...", ""])
        live.update(update_table_with_results(results))

        async for (detector_identifier, detector, cooldown_awaitable) in detectors:
            try:
                detection_awaitable = await cooldown_awaitable
                detection_result = await detection_awaitable
            
                # TODO: Do stuff here
            except:
                pass
                # Error occured
                # TODO: print error


因为cooldown_and_detect() * 返回 * 一个可等待的detector.detect(),上下文管理器显然会在可等待的detector.detect()返回时释放锁,也就是在detector.cooldown()被等待并且detector.detect()被触发之后。但是我想在detector.detect()被等待之后释放它,但是我仍然想把控制流传递给调用者。

rqqzpn5f

rqqzpn5f1#

从上面的代码中理解你想要的东西确实有点混乱,但是通过添加适当的回调,一切都应该是可能的。也就是说,如果你的任务可以在一个可以并行运行的普通非循环流中描述,你应该只使用await和上下文管理器,并且只使用一个await.TaskGroup,或者调用gather,其中包含您希望并行运行的所有“根级别”任务。
否则,由于您希望自定义管理中间资源,例如由lock保护的资源,因此可以显式调用acquire,并在相应步骤的done回调中释放锁,而不是将锁用作上下文管理器(with块),或者无条件地在finally块上释放它。
正如我之前所写的,你的例子看起来比它所能写的更做作,所以一个基于你的代码的很好的例子可以被写出来-请阅读关于发布minimal reproducible example before asking your next question -的文章。
也就是说,如果我能正确理解您的意思,那么沿着下面的代码为您的内部cooldown_and_detect协同例程所做的一些事情可能会产生您想要的效果。

async def cooldown_and_detect(detector, detector_input: TextDetectorInput):
    # Acquire lock
    await detector.lock.acquire()
    try:    # Cooldown
        cooleddown = await detector.cooldown()
    except Exception as error:
        # something wrong with cooldown - release lcok
        detector.lock.release()
        raise
    detector_task = asyncio.create_task(detector.detect(detector_input))
    detector_task.add_done_callback(lambda task: detector.lock.release())
    return detector_task

字符串

相关问题