Spring-amqp-QueueBuilder应该保存对Queue的引用,而不是String name

wfypjpf4  于 2023-03-28  发布在  Spring
关注(0)|答案(1)|浏览(131)

我有2个匿名队列。它们都是用___Declarables extends AbstractAmqpDeclarables设置的,在启动时示例化为@Bean。
但是在启动时- rabbitMQ还没有给队列命名,所以两个队列中的一个得到两个绑定,另一个队列没有得到绑定。如果QueueBuilder持有对queueObject的引用而没有命名,这个对象的创建可能会延迟到足以让rabbitMQ生成名称。
这是针对rabbitMQ生成的amq.gen-<random>的名称,而不是spring-amqp生成的spring.gen-<random>的名称
编辑:如果我调用QueueBuilder.durable();而不是QueueBuilder.durable(queueName);,则生成的名称将变为spring.gen-<random>而不是amq.gen-<random>
最小可重现性示例:
1.制片人

@Bean("amqpEmitterEventPublisherAdapter")
public EventPublisher<OurChangeEvent, Boolean> emitterEventsPublisherAdapter(
@Qualifier("amqpTemplate") RabbitTemplate rabbitTemplate
) {
final var publisher = new OurAmqpProducerImpl(rabbitTemplate);
return new OurEventAmqpPublisherAdapter(
    publisher,
    amqpConfigProps.getGlobalExchangeName(),
    ourEventsConfigProps.getOurEventsBindingPrefix(),
    ourEventsConfigProps.getOurEmitEventsBindingDataSuffix(),
    amqpConfigProps.isEnabled()
);
}

1.生产者-不同的后缀-一个新的绑定

@Bean("amqpEmitterEventPublisherAdapter2")
public EventPublisher<OurChangeEvent, Boolean> emitterEventsPublisherAdapter2(
@Qualifier("amqpTemplate") RabbitTemplate rabbitTemplate
) {
final var publisher = new OurAmqpProducerImpl(rabbitTemplate);
return new OurEventAmqpPublisherAdapter(
    publisher,
    amqpConfigProps.getGlobalExchangeName(),
    ourEventsConfigProps.getOurEventsBindingPrefix(),
    ourEventsConfigProps.getOurDifferentSuffix(),
    amqpConfigProps.isEnabled()
);
}

1.可声明项-这就是问题所在。队列的名称是“"。使用1个可声明项可以。使用2个可声明项不行。

@Bean("emitEventsDeclarables")
    public Declarables emitEventsDeclarables(
        @Qualifier("mainAmqpAdmin") RabbitAdmin admin,
        @Qualifier("GlobalAmqpExchange") Exchange exchange
    ) {
        final var bindingKey =
            ourEventsConfigProps.getOurEventsBindingPrefix() +
                ".*." +
                ourEventsConfigProps.getOurEmitEventsBindingDataSuffix();
        final var cfg = new OurEventsExclusiveDeclarables(
            exchange,
            ourEventsConfigProps.getOurEmitEventsQueueName(), //=""
            bindingKey,
            true
        );

        final var declarables = cfg.declarables();
        for (Declarable d : declarables.getDeclarables()) {
            d.setAdminsThatShouldDeclare(admin);
        }
        return declarables;
    }

1.可声明的队列名称也为“”

@Bean("emitEventsDeclarables2")
public Declarables emitEventsDeclarables2(
@Qualifier("mainAmqpAdmin") RabbitAdmin admin,
@Qualifier("GlobalAmqpExchange") Exchange exchange
) {
final var bindingKey =
    ourEventsConfigProps.getOurEventsBindingPrefix() +
        ".*." +
        ourEventsConfigProps.getOurDifferentSuffix();
final var cfg = new OurDifferentExclusiveDeclarables(
    exchange,
    ourEventsConfigProps.getOurEmitEventsQueueName(), //=""
    bindingKey,
    true
);

final var declarables = cfg.declarables();
for (Declarable d : declarables.getDeclarables()) {
    d.setAdminsThatShouldDeclare(admin);
}
return declarables;
}

因此,我们得到了2个带有匿名队列名称的声明。现在Spring-amqp-QueryBuilder变得混乱了。2个匿名队列中的一个获得了两个绑定,而另一个队列没有获得绑定。

public class OurEventsExclusiveDeclarables extends AbstractAmqpDeclarables {
    private final boolean isSingleActiveConsumer;

    public OurEventsExclusiveDeclarables(
        Exchange exchange,
        String queueName,
        String bindingKey,
        boolean isSingleActiveConsumer
    ) {
        this.exchange = exchange;
        this.queueName = queueName;
        this.bindingKey = bindingKey;
        this.isSingleActiveConsumer = isSingleActiveConsumer;
    }

    /** Exclusive queue with no name.
     * @return Declarables
     */
    @Override
    protected Declarables declarables() {
        QueueBuilder queueBuilder = 
 //Here is the problem - QueryBuilder is built too soon. RabbitMQ has not had time to generate a name
QueueBuilder.durable(queueName).exclusive();
//This will result in a spring generated name - not a RabbitMQ generated name
//QueueBuilder.durable().exclusive();
        System.out.println("binding queue name:" + queueName+ " to exchange:" + exchange.getName() + " with binding key:" + bindingKey);
        if (isSingleActiveConsumer) {
            queueBuilder.singleActiveConsumer();
        }
        final Queue queue = queueBuilder.build();
        final Binding binding = BindingBuilder.bind(queue).to(exchange).with(bindingKey).noargs();
        return new Declarables(queue, binding);
    }
}
uqdfh47h

uqdfh47h1#

是的-看起来BindingBuilder.DestinationConfigurer需要保持对Queue对象的引用,而不是在构建过程中提取名称。
请在GitHub上打开一个问题。
修复将在下一个版本中,同时,这是一个解决方案:

@Bean
Binding b1(Queue q1, DirectExchange ex) {
    return new Binding(q1.getName(), DestinationType.QUEUE, "ex", "foo", Collections.emptyMap()) {

        @Override
        public String getDestination() {
            return q1.getActualName();
        }

    };
}

相关问题