如何了解Spring集成流程的运行状态

q3qa4bjr  于 2023-02-22  发布在  Spring
关注(0)|答案(1)|浏览(169)

我有一个简单的集成流程,基于cron作业从数据库轮询数据,在DirectChannel上发布,然后进行拆分和转换,在另一个执行器服务通道上发布,执行一些操作,最后发布到输出通道,它是使用dsl风格编写的。
另外,我有一个端点,在那里我可能会接收一个http请求来触发这个流,此时我发送消息到上面提到的通道之一来触发这个流。
我想确保如果流已经由于cron作业或其他请求而运行,则不会发生手动触发。
我使用了StandardIntegrationFlow的isRunning方法,但它似乎不是线程安全的。
我还尝试使用.wireTap(myService)和.handle(myService),其中此服务有一个atomicBoolean标志,但它是针对每条消息设置的,这不是一个解决方案。
我想知道流是否在没有我的干预的情况下运行,如果不支持,我如何在整个流而不是每个消息上应用原子布尔逻辑。
如何在测试中模拟竞态条件,以确保我的实现能够防止这种情况?

j7dteeu8

j7dteeu81#

IntegrationFlow只是配置阶段的一个逻辑容器。它确实有那些生命周期方法,但只是用于内部框架逻辑。即使它们在那里,它们也没有帮助,因为如果你想通过一些事件或输入消息为端点做些什么,端点总是在运行。
很难控制所有这些,因为正如您所解释的,它处于异步状态。即使我们可以在流的开始停止SourcePollingChannelAdapter,让您的手动调用做一些事情,这并不意味着其他线程中的消息不再处理。AtomicBoolean在这里也无能为力,原因是相同的:即使您在MessageSourceMutator.beforeReceive()中将其设置为true,并在其afterReceive()中重置回false(当message为空时),也不意味着您在其他线程中下推的消息已经处理。
您可能会考虑在批处理结束时使用聚合器进行AtomicBoolean重置,因为您提到了从DB中提取数据,因此您可能可以跟踪下游的每个轮询的大量记录。这样,您的手动调用可以跳过,直到聚合器收集到该批处理的结果。
您还需要考虑在允许手动操作的时刻停止SourcePollingChannelAdapter,这样就不会有任何与cron的进一步竞争条件。

相关问题