Camel 应如何配置Azure服务总线以成批发送消息?

3okqufwl  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(173)

目标是使用来自kafka的记录并重定向到Azure服务总线。kafka使用者可以在调试时进行配置和确认,以便在每次轮询kafka主题时使用数千条消息。
另一方面,Azure服务总线将始终发布一条消息。因此,当此集成运行时,Kafka日志将显示收到了数千条消息,然后Azure服务总线日志将遍历每条消息,一次向队列发送一条消息。每次迭代需要花费几分钟,这会显著降低流程速度。
组件documentation指出,默认情况下批量发送设置,但对于如何实现这一点并不清楚。

public class SampleKafkaConsumer extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    log.info("About to start route: Kafka Server -> Log ");
    from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
             + "&maxPollRecords={{consumer.maxPollRecords}}"
             + "&consumersCount={{consumer.consumersCount}}"
             + "&seekTo={{consumer.seekTo}}"
             + "&groupId={{consumer.group}}"
             + "&lingerMs={{consumer.lingerMs}}"
             + "&producerBatchSize={{consumer.producerBatchSize}}"
             + "&saslJaasConfig={{consumer.saslJaasConfig}}"
             + "&saslMechanism={{consumer.saslMechanism}}"
             + "&securityProtocol={{consumer.securityProtocol}}")
             .routeId("Kafka")
             .to("azure-servicebus:topic?connectionString={{producer.connectionString}}&producerOperation=sendMessages");

  }
}

有什么见解吗?

b4lqfgs4

b4lqfgs41#

Azure服务总线文档试图暗示的正确解决方案是将结果作为对象或消息的列表进行批处理,并将其传递到Azure生产者。
在ApacheCamel中实现这一点的关键是使用Aggregator。这与下面代码中的完成间隔相结合,允许管道将消息聚集在一起,并成批发送到服务总线。

public class SampleKafkaConsumer extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    log.info("About to start route: Kafka Server -> Log ");
    from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
             + "&maxPollRecords={{consumer.maxPollRecords}}"
             + "&consumersCount={{consumer.consumersCount}}"
             + "&seekTo={{consumer.seekTo}}"
             + "&groupId={{consumer.group}}"
             + "&lingerMs={{consumer.lingerMs}}"
             + "&producerBatchSize={{consumer.producerBatchSize}}"
             + "&saslJaasConfig={{consumer.saslJaasConfig}}"
             + "&saslMechanism={{consumer.saslMechanism}}"
             + "&securityProtocol={{consumer.securityProtocol}}")
             .routeId("Kafka")
             .aggregate(new AzureAggregationStrategy()).constant(true)
             .completionInterval(300L)
             .to("azure-servicebus:topic?connectionString={{producer.connectionString}}&producerOperation=sendMessages");

  }
}

相关问题