我有一个简单的集成流程,基于cron作业从数据库轮询数据,在DirectChannel上发布,然后进行拆分和转换,在另一个执行器服务通道上发布,执行一些操作,最后发布到输出通道,它是使用dsl风格编写的。
另外,我有一个端点,在那里我可能会接收一个http请求来触发这个流,此时我发送消息到上面提到的通道之一来触发这个流。
我想确保如果流已经由于cron作业或其他请求而运行,则不会发生手动触发。
我使用了StandardIntegrationFlow的isRunning方法,但它似乎不是线程安全的。
我还尝试使用.wireTap(myService)和.handle(myService),其中此服务有一个atomicBoolean标志,但它是针对每条消息设置的,这不是一个解决方案。
我想知道流是否在没有我的干预的情况下运行,如果不支持,我如何在整个流而不是每个消息上应用原子布尔逻辑。
如何在测试中模拟竞态条件,以确保我的实现能够防止这种情况?
1条答案
按热度按时间j7dteeu81#
IntegrationFlow
只是配置阶段的一个逻辑容器。它确实有那些生命周期方法,但只是用于内部框架逻辑。即使它们在那里,它们也没有帮助,因为如果你想通过一些事件或输入消息为端点做些什么,端点总是在运行。很难控制所有这些,因为正如您所解释的,它处于异步状态。即使我们可以在流的开始停止
SourcePollingChannelAdapter
,让您的手动调用做一些事情,这并不意味着其他线程中的消息不再处理。AtomicBoolean
在这里也无能为力,原因是相同的:即使您在MessageSourceMutator.beforeReceive()
中将其设置为true
,并在其afterReceive()
中重置回false
(当message为空时),也不意味着您在其他线程中下推的消息已经处理。您可能会考虑在批处理结束时使用聚合器进行
AtomicBoolean
重置,因为您提到了从DB中提取数据,因此您可能可以跟踪下游的每个轮询的大量记录。这样,您的手动调用可以跳过,直到聚合器收集到该批处理的结果。您还需要考虑在允许手动操作的时刻停止
SourcePollingChannelAdapter
,这样就不会有任何与cron的进一步竞争条件。