publish-subscribe通道都指向kafka,这会导致重复的kafkaproducercontext

nimxete2  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(282)

我尝试使用springintegration将数据从一个通道发送到两个不同的kafka队列,这些数据在发送到各自队列的过程中经过不同的转换。问题是我显然有重复的生产者上下文,我不知道为什么。
以下是我的流程配置:

flow -> flow
        .channel(“firstChannel")
        .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                .subscribe(f -> f
                                .transform(firstTransformer::transform)
                                .channel(MessageChannels.queue(50))
                                .handle(Kafka.outboundChannelAdapter(kafkaConfig)
                                        .addProducer(firstMetadata(), brokerAddress), e -> e.id(“firstKafkaOutboundChannelAdapter")
                                        .autoStartup(true)
                                        .poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
                                        .get())
                )
                .subscribe(f -> f
                                .transform(secondTransformer::transform)
                                .channel(MessageChannels.queue(50))
                                .handle(Kafka.outboundChannelAdapter(kafkaConfig)
                                        .addProducer(secondMetadata(), brokerAddress), e -> e.id(“secondKafkaOutboundChannelAdapter")
                                        .autoStartup(true)
                                        .poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
                                        .get())
                ));

例外情况如下:
无法注册对象[org.springframework.integration.kafka.support]。kafkaproducercontext@3163987e]在bean名称“not\u specified”下:已经有对象[org.springframework.integration.kafka.support]。kafkaproducercontext@15f193b8]束缚
我试过用不同的 kafkaConfig 物体,但那没用。与此同时 ProducerMetadata 示例是不同的,从不同的第一个参数可以看出 addProducer . 它们在其他元数据中提供了相应的目标队列的名称。
听起来有一些正在创建的隐式bean定义相互冲突。
如何解决这两个异常 KafkaProducerContext 什么?

inn6fuwd

inn6fuwd1#

你不应该使用 .get() 在那些上面 KafkaProducerMessageHandlerSpec 让框架为你设计环境。
问题是因为 KafkaProducerMessageHandlerSpec implements ComponentsRegistration 没有人关心:

public Collection<Object> getComponentsToRegister() {
    this.kafkaProducerContext.setProducerConfigurations(this.producerConfigurations);
    return Collections.<Object>singleton(this.kafkaProducerContext);
}

手动后 .get() 调用。
我同意,这是一个一些不便,我们应该找到一些更好的解决方案,为最终应用,但目前还没有选择,除非遵循 Spec 框架组件的样式,如 Kafka.outboundChannelAdapter() .
希望我明白。
更新
好吧,这绝对是我们这边的问题。我们很快就会解决的:https://jira.spring.io/browse/intext-216httpshttp://jira.spring.io/browse/intext-217
同时,您的解决方法如下:

KafkaProducerContext kafkaProducerContext = (KafkaProducerContext) kafkaProducerMessageHandlerSpec.getComponentsToRegister().iterator().next();
 kafkaProducerContext.setBeanName(null);

你应该搬到哪里去

Kafka.outboundChannelAdapter(kafkaConfig)
                                    .addProducer(firstMetadata(), brokerAddress)

到单独的 private 方法来访问它 kafkaProducerContext .

相关问题