我有一系列的异步步骤要做,我想把它们放在一个方法中。每一步都生成一个子结果,并诱导下一个异步步骤,它再次生成一个子结果,直到到达最后一步。
看看我的代码:
class TextDetector(Detector):
lock: asyncio.Lock = asyncio.Lock()
async def detect(self, input: TextDetectorInput) -> TextDetectorResult:
pass
class TextAugmentationProcedure(AugmentationProcedure):
async def augment(
self, input: AugmentationProcedureTextInput
) -> AugmentationProcedureTextOutput:
pass
TextDetectorCoolDownAndResultType = Awaitable[Awaitable[ImageDetectorResult]]
RunTextDetectionType = AsyncIterator[tuple[DetectorIdentifier, TextDetector, TextDetectorCoolDownAndResultType]]
GetFilesTextType = AsyncIterator[tuple[InputFileIdentifier, TextInputFile, AugmentationProcedureTextOutput, RunTextDetectionType]]
CheckTextType = AsyncIterator[tuple[AugmentationProcedureIdentifier, TextAugmentationProcedure, GetFilesTextType]]
@dataclass
class TextTestCase(TestCase):
inputs: dict[InputFileIdentifier, TextInputFile]
augmentation_procedures: dict[AugmentationProcedureIdentifier, TextAugmentationProcedure]
detectors: dict[DetectorIdentifier, TextDetector]
async def get_files(self, augmentation_procedure: TextAugmentationProcedure) -> GetFilesTextType:
for input_identifier, text_file in self.inputs.items():
augmentation_input = await read_text_file(
text_file.file_name, text_file.language
)
augmentation_output = await augmentation_procedure.augment(augmentation_input)
detector_input = TextDetectorInput(
augmentation_output.language, augmentation_output.text
)
yield (input_identifier, text_file, augmentation_output, self.run_detection(detector_input))
async def run_detection(self, detector_input: TextDetectorInput) -> RunTextDetectionType:
for detector_identifier, detector in self.detectors.items():
async def cooldown_and_detect(detector: TextDetector, detector_input: TextDetectorInput):
# Acquire lock
with detector.lock
# Cooldown
cooleddown = await detector.cooldown()
return detector.detect(detector_input)
yield (detector_identifier, detector, cooldown_and_detect(detector, detector_input))
async def check(self) -> CheckImageType:
for augmentation_procedure_identifier, augmentation_procedure in self.augmentation_procedures.items():
yield (augmentation_procedure_identifier, augmentation_procedure, self.get_files(augmentation_procedure))
字符串
基本上,我想在TextTestCase
的示例上调用方法check(...)
时得到子结果。有趣的部分是run_detection()
。对于每个检测器,都应该通知调用者。之后,获取一个锁。然后调用detector.cooldown()
并等待。如果等待,则应该通知调用者并调用detector.detect()
。当结果可用时,则应该通知调用方并且应该释放锁。
目前,我用以下方式调用check()
:
test_case = TextTestCase()
async for (augmentation_procedure_identifier, augmentation_procedure, augmentation_results) in test_case.check():
async for (file_identifier, image_file, augmentation_output_awaitable, detectors) in augmentation_results:
results.append([image_file.file_name, str(procedure), "Augmenting...", ""])
live.update(update_table_with_results(results))
async for (detector_identifier, detector, cooldown_awaitable) in detectors:
try:
detection_awaitable = await cooldown_awaitable
detection_result = await detection_awaitable
# TODO: Do stuff here
except:
pass
# Error occured
# TODO: print error
型
因为cooldown_and_detect()
* 返回 * 一个可等待的detector.detect()
,上下文管理器显然会在可等待的detector.detect()
返回时释放锁,也就是在detector.cooldown()
被等待并且detector.detect()
被触发之后。但是我想在detector.detect()
被等待之后释放它,但是我仍然想把控制流传递给调用者。
1条答案
按热度按时间rqqzpn5f1#
从上面的代码中理解你想要的东西确实有点混乱,但是通过添加适当的回调,一切都应该是可能的。也就是说,如果你的任务可以在一个可以并行运行的普通非循环流中描述,你应该只使用
await
和上下文管理器,并且只使用一个await
.TaskGroup,或者调用gather
,其中包含您希望并行运行的所有“根级别”任务。否则,由于您希望自定义管理中间资源,例如由
lock
保护的资源,因此可以显式调用acquire
,并在相应步骤的done回调中释放锁,而不是将锁用作上下文管理器(with
块),或者无条件地在finally
块上释放它。正如我之前所写的,你的例子看起来比它所能写的更做作,所以一个基于你的代码的很好的例子可以被写出来-请阅读关于发布minimal reproducible example before asking your next question -的文章。
也就是说,如果我能正确理解您的意思,那么沿着下面的代码为您的内部
cooldown_and_detect
协同例程所做的一些事情可能会产生您想要的效果。字符串