从RabbitMQ中的CallBack函数返回值

2skhul33  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(254)

我在Spring Boot RabbitMQ中写了一段消费者代码。在得到消息后,我想进一步处理它。但为此,我想在回调之外访问它。我跳过了Postmapping函数中一些不相关的代码。

@CrossOrigin(origins = "*")
@RestController
public class PlottingRabbitMQ {
    private static Map<String, String> linkParams = new HashMap<>();
    static Logger logger
        = LoggerFactory.getLogger(LoginRabbitMQ.class);

private final static String PLOTTING_RESEND_QUEUE = "plotting_resend";

@JsonDeserialize
@PostMapping(value = "/plotting")
public Object createLink(@RequestBody PlottingModel plotting, @RequestHeader Map<String, String> plottingHeaders) throws Exception {

        ConnectionFactory factory_plotting_resend = new ConnectionFactory();
        factory_plotting_resend.setHost("localhost");
        Connection connection_plotting_resend = factory_plotting_resend.newConnection();
        Channel channel_plotting_resend = connection_plotting_resend.createChannel();
        channel_plotting_resend.queueDeclare(PLOTTING_RESEND_QUEUE,
                false, false,false, null);
        logger.info("[!] Waiting for messages. To exit press Ctrl+C");

        Consumer consumer_plotting_resend = new DefaultConsumer(channel_plotting_resend){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                String respBodyPlotting = new String(body, "UTF-8");
                logger.info("[x] Message Received' " + respBodyPlotting + "'");
            }
        };

        channel_plotting_resend.basicConsume(PLOTTING_RESEND_QUEUE, true, consumer_plotting_resend);
        System.out.println("Received outside callback is: " + respBodyPlotting);

就像你在底部看到的,我已经写了一个系统,我想在channel_plotting_resend回调之外打印respBodyPlotting(当我试图打印它的时候,我得到了一个错误)。在logger.info中,我收到了消息,但是我不知道如何在函数之外得到它。有人能帮助我吗?

fquxozlt

fquxozlt1#

final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
              response.offer(new String(body, StandardCharsets.UTF_8));
             }
        };

channel.basicConsume(QUEUE, true, consumer);
System.out.println("Received outside callback is: " + response.take());

相关问题