控制Azure Service Bus消息侦听器在Spring Boot 时开始或停止从主题或队列进行侦听

0ejtzxu1  于 2023-02-04  发布在  Spring
关注(0)|答案(2)|浏览(222)
    • bounty将在4天后过期**。回答此问题可获得+50的声誉奖励。Kush Patel正在寻找来自声誉良好来源的答案:从Azure服务总线读取数据的方法有3种:1. spring-cloud-azure-starter-integration-servicebus

2.Spring云Azure启动器服务总线jms
3.Spring Cloud蓝流绑定服务总线
但我只想使用第一个选项实现。但我找不到使用该选项的解决方案。应该有一些标志或参数或属性可以控制Azure服务总线使用者。

我希望实现的目标-* Azure Service Bus消息侦听器开始/停止从队列/主题接收消息。*

以下是详细解释。
目前,我已将Azure服务总线集成到我的应用程序中,我们将在 Spring 启动应用程序启动时立即侦听消息。现在,我希望修改此逻辑。默认情况下,Azure服务总线消息侦听器将被禁用。在ApplicationReadyEvent上,我希望执行某个任务,然后再次启用Azure服务总线消息侦听器以开始从主题或队列侦听。
那么我如何才能做到呢?

    • 应用程序. yml**
spring:
  cloud:
    azure:
      servicebus:
        namespace: **********
        
xxx:
  azure:
    servicebus:
      connection: ***********
      queue: **********
    • 蓝色配置. java**
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;

@Configuration
public class AzureConfiguration{

    @Value("${xxx.azure.servicebus.connection}")
    private String serviceBusConnection;

    @Value("${xxx.azure.servicebus.queue}")
    private String serviceBusQueue;

    private static final String SERVICE_BUS_INPUT_CHANNEL = "yyyyy";
    private static final String SENSOR_DATA_CHANNEL = "zzzzz";
    private static final String SERVICE_BUS_LISTENER_CONTAINER = "aaaaa";

    @Bean(name = SERVICE_BUS_LISTENER_CONTAINER)
    public ServiceBusMessageListenerContainer serviceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory) {

        ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
        containerProperties.setConnectionString(serviceBusConnection);
        containerProperties.setEntityName(serviceBusQueue);
        containerProperties.setAutoComplete(true);
        return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
    }

    @Bean
    public ServiceBusInboundChannelAdapter serviceBusInboundChannelAdapter(
            @Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
            @Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {

        ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
        adapter.setOutputChannel(inputChannel);
        
        return adapter;
    }

    @Bean(name = SERVICE_BUS_INPUT_CHANNEL)
    public MessageChannel serviceBusInputChannel() {

        return new DirectChannel();
    }

    @Bean(name = SENSOR_DATA_CHANNEL)
    public MessageChannel sensorDataChannel() {

        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow serviceBusMessageFlow() {

        return IntegrationFlows.from(SERVICE_BUS_INPUT_CHANNEL)
                .<byte[], String>transform(String::new)
                .channel(SENSOR_DATA_CHANNEL)
                .get();
    }
}
    • 应用事件侦听器服务. java**
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Slf4j
@Service
@AllArgsConstructor
public class AppEventListenerService{

   
    @EventListener(ApplicationReadyEvent.class)
    public void OnApplicationStarted() {
        log.debug("Enter OnApplicationStarted");
        // By Default Azure Service Bus Message Listener will be disable
        // do some task
        // Enable Azure Bus Message Listener
        log.debug("Exit OnApplicationStarted");
    }
}

在www.example.com的上述代码中,AppEventListenerService.java ,
//启用Azure总线消息侦听器-在这里,我希望启动ServiceBusConsumer以从主题/队列接收消息。

qv7cva1a

qv7cva1a1#

  • 在这里,我有一个使用JMS来使用服务总线消息的解决方案
  • 使用JMS的原因是,当我们使用@JMSListener时,我们可以开始停止它。
  • 现在,要使用ServiceBus实现JMS,请参阅MSDOC
  • 现在,您必须Autowired这个JmsListenerEndpointRegistry对象并停止侦听器。
@Autowired  
JmsListenerEndpointRegistry registry;

要停止JMS,必须使用stop函数:

registry.stop();
  • 这里我创建了两个API,它们将启动/停止JMS和消息接收器:
@Component  
@RestController  
public class Reciever {  
  
    @Autowired  
  JmsListenerEndpointRegistry registry;  
  
  @GetMapping("/stop")  
    public String readBlobFile ()  
    {  
        registry.stop();  
 return "Stopped" ;  
  }  
  
    @GetMapping("/start")  
    public String readBlobFile1 ()  
    {  
        registry.start();  
 return "StARTED" ;  
  }  

    private static final String QUEUE_NAME = "test";  
 private final Logger logger = LoggerFactory.getLogger(Reciever.class);  
  
  
  
  @JmsListener(destination = QUEUE_NAME, containerFactory = "jmsListenerContainerFactory")  
    public void receiveMessage(String s) {  
        logger.info("Received message: {}", s);  
  }  
}
  • 现在,首先我调用/stop API,它将停止JMS,只有在调用/start API后,消息才会开始到来。

输出:

ig9co6j1

ig9co6j12#

当您使用注册为bean的集成流时,启动/停止它的最简单方法是将其自动连接为StandardIntegrationFlow并调用相应的方法,如下所示:

@Slf4j
@Service
@DependsOn({"serviceBusMessageFlow"})
@RequiredArgsConstructor
public class AppEventListenerService {
    private final StandardIntegrationFlow serviceBusMessageFlow;

    @EventListener(ApplicationReadyEvent.class)
    public void OnApplicationStarted() {
        log.debug("Enter OnApplicationStarted");
        // Disable Azure Bus Message Listener
        serviceBusMessageFlow.stop();

        // do some task

        // Enable Azure Bus Message Listener
        serviceBusMessageFlow.start();

        log.debug("Exit OnApplicationStarted");
    }
}

注意,可能需要@DependsOn注解来强制流bean在事件监听器bean之前初始化。另外,应该注意,一些消息可能碰巧在流初始化之后和监听器触发之前通过。

相关问题