rabbitmq 如何使用spring boot在rabbit mq中创建动态队列?

s4n0splo  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(2)|浏览(254)

我需要一些帮助。
我正在开发一个spring boot应用程序,我想把消息发布到rabbitMQ。我想把它发送到一个队列,这个队列是在消息本身中命名的。这样我就想动态地创建队列。我只找到了使用“静态”队列的例子。
我研究了一些东西,但没有发现任何东西。我是RabbitMQ的新手,学习了基本概念。我对Spring也相当陌生。
RabbotMQ配置

@Configuration
public class RabbitMQConfig {

    @Value("amq.direct")
    String exchange;

    @Value("queue-name") // Don't want to do this
    String queueName;

    @Value("routing-key") // Or this
    String routingkey;

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(exchange);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingkey);
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate template(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
}

消息发送者

@Service
public class RabbitMQSender {

    @Autowired
    private AmqpTemplate template;

    @Value("amq.direct")
    private String exchange;

    public void send(MessageDTO message) {
        template.convertAndSend(exchange, message);

    }
}
ht4b089n

ht4b089n1#

我来了一个方案:
您需要在您的配置中创建一个AmqpAdmin:

@Bean
public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory);
}

然后将其添加到服务中:

@Autowired
private AmqpAdmin admin;

最后,您可以使用它来创建队列和绑定。

Queue queue = new Queue(queueName, durable, false, false);
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, EXCHANGE, routingKey, null);
admin.declareQueue(queue);
admin.declareBinding(binding);

我找到了解决方案here

wlp8pajw

wlp8pajw2#

不确定你使用的是哪个版本的RabbitMQ,但是你的原始代码很接近。这也可以。

@Bean
Queue fanoutQueue() {
    // empty name, durable false, exclusive false, autoDelete false
    return new Queue("", false, false, true);
}

@Bean
FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanout-exchange", true, false);
}

@Bean
Binding fanoutBinding(Queue fanoutQueue, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
}

相关问题