我们正在使用Spring RabbitMQ Binder来划分队列。我们正在使用队列,然后基于我们的PartitionKeyExtractorStrategy
实现,我们将消息发送到队列分区。对我们来说,进入队列分区的消息保持它们的顺序是很重要的,但由于某些原因,它们没有。我们从PartitionKeyExtractorStrategy
实现的日志中看到,从主队列使用的消息的顺序是正确的。分区生产者是否会异步地或使用多个通道向队列分区发送消息,从而不时地破坏顺序?
这是我们的应用程序。yml配置:
spring:
cloud:
stream:
bindings:
mainQueue:
destination: TopicExchange
group: MainQueue
consumer:
partitioned: false
concurrency: 1
maxAttempts: 1
partitionProducer:
destination: TopicExchange
producer:
partitionCount: ${REPLICAS}
partitionKeyExtractorName: userIdKeyExtractor
...
rabbit:
bindings:
mainQueue:
consumer:
bindingRoutingKeyDelimiter: ","
bindingRoutingKey: routingKey1, routingKey2
declareExchange: true
queueNameGroupOnly: true
exclusive: true
prefetch: 100
batchSize: 100
transacted: true
autoBindDlq: false
republishToDlq: false
requeueRejected: true
partitionProducer:
producer:
declareExchange: true
partitionConsumer:
consumer:
declareExchange: true
queueNameGroupOnly: true
prefetch: 100
txSize: 1
transacted: true
autoBindDlq: false
republishToDlq: false
requeueRejected: true
enableBatching: true
batchSize: 1
receiveTimeout: 100
正如您在上面看到的,我们已经尝试使主队列消费者事务化,但它没有解决我们的问题。
1条答案
按热度按时间rjee0c151#
默认情况下,使用
CachingConnectionFactory
;由于通道是缓存的,因此无法保证在同一线程上发布时使用同一通道;在高容量、多线程环境中,这可能导致无序传递。您可以通过定义
ThreadLocalChannelConnectionFactory
@Bean
来避免此问题;则 Boot 将不再自动配置CCF,而是使用您的bean。https://docs.spring.io/spring-amqp/docs/current/reference/html/#choosing-factory