javadsl等价于用xml配置的spring集成kafka端点

pqwbnv8z  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(347)

对于kafka出站通道适配器,我有以下xml配置:

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-producer-context-ref="kafkaProducerContext"
                                    auto-startup="true"
                                    channel="activityOutputChannel">
    <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>

</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

这很管用。我正试图在javadsl中复制这一点,但我不能做得太远。到目前为止,我只有这个:

.handle(Kafka.outboundChannelAdapter(kafkaConfig)
        .addProducer(producerMetadata, brokerAddress)
        .get());

我不知道如何添加 taskExecutor 以及 poller 使用dsl。
关于如何将这些整合到我的整体计划中有什么见解吗 IntegrationFlow 非常感谢。

pxyaymoc

pxyaymoc1#

spring集成组件(例如。 <int-kafka:outbound-channel-adapter> )由两个豆子组成: AbstractEndpoint 接受来自 input-channel 以及 MessageHandler 处理消息。
所以, Kafka.outboundChannelAdapter() 是关于 MessageHandler . 任何其他特定于端点的属性都由第二个 Consumer<GenericEndpointSpec<H>> endpointConfigurer 的论点 .handle() eip方法:

.handle(Kafka.outboundChannelAdapter(kafkaConfig)
    .addProducer(producerMetadata, brokerAddress),
           e -> e.id("kafkaOutboundChannelAdapter")
                 .poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS)
                                        .receiveTimeout(0)
                                        .taskExecutor(this.taskExecutor)));

有关更多信息,请参阅参考手册。

相关问题