Spring RabbitMQ活页夹:分区生成器是否使用多个通道来发布路由到队列分区之一消息?

ubbxdtey  于 2022-12-10  发布在  Spring
关注(0)|答案(1)|浏览(114)

我们正在使用Spring RabbitMQ Binder来划分队列。我们正在使用队列,然后基于我们的PartitionKeyExtractorStrategy实现,我们将消息发送到队列分区。对我们来说,进入队列分区的消息保持它们的顺序是很重要的,但由于某些原因,它们没有。我们从PartitionKeyExtractorStrategy实现的日志中看到,从主队列使用的消息的顺序是正确的。分区生产者是否会异步地或使用多个通道向队列分区发送消息,从而不时地破坏顺序?
这是我们的应用程序。yml配置:

spring:
  cloud:
    stream:
      bindings:
        mainQueue:
          destination: TopicExchange
          group: MainQueue
          consumer:
            partitioned: false
            concurrency: 1
            maxAttempts: 1
        partitionProducer:
          destination: TopicExchange
          producer:
            partitionCount: ${REPLICAS}
            partitionKeyExtractorName: userIdKeyExtractor
...

rabbit:
  bindings:
    mainQueue:
      consumer:
        bindingRoutingKeyDelimiter: ","
        bindingRoutingKey: routingKey1, routingKey2
        declareExchange: true
        queueNameGroupOnly: true
        exclusive: true
        prefetch: 100
        batchSize: 100
        transacted: true
        autoBindDlq: false
        republishToDlq: false
        requeueRejected: true
    partitionProducer:
      producer:
        declareExchange: true
    partitionConsumer:
      consumer:
        declareExchange: true
        queueNameGroupOnly: true
        prefetch: 100
        txSize: 1
        transacted: true
        autoBindDlq: false
        republishToDlq: false
        requeueRejected: true
        enableBatching: true
        batchSize: 1
        receiveTimeout: 100

正如您在上面看到的,我们已经尝试使主队列消费者事务化,但它没有解决我们的问题。

rjee0c15

rjee0c151#

默认情况下,使用CachingConnectionFactory;由于通道是缓存的,因此无法保证在同一线程上发布时使用同一通道;在高容量、多线程环境中,这可能导致无序传递。
您可以通过定义ThreadLocalChannelConnectionFactory@Bean来避免此问题;则 Boot 将不再自动配置CCF,而是使用您的bean。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#choosing-factory

相关问题