我已经用Kafka实现了spring批处理远程分区,其中3个工作机运行在不同的JVM上,在初始化作业存储库之前,我需要在每个工作机上设置租户。
在分区期间,我从管理器JVM向executionContext传递了一个租户代码值。在工作器JVM中,我能够从stepExecution读取此参数,但在jobRepository初始化期间,我需要读取此值。
在入站流量以下使用
@Bean // request coming from manager
public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
return IntegrationFlows
.from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("requestForWorkers")))
.channel(requestForWorkers())
.get();
}
我的理解是,当工人消费者读到Kafka的信息时,我能读到这个价值吗?
1条答案
按热度按时间31moq8wy1#
我可以通过动态创建步骤、作业存储库、入站、出站流bean来解决远程分区上的多租户问题。