- bounty将在4天后过期**。回答此问题可获得+50的声誉奖励。Kush Patel正在寻找来自声誉良好来源的答案:从Azure服务总线读取数据的方法有3种:1. spring-cloud-azure-starter-integration-servicebus
2.Spring云Azure启动器服务总线jms
3.Spring Cloud蓝流绑定服务总线
但我只想使用第一个选项实现。但我找不到使用该选项的解决方案。应该有一些标志或参数或属性可以控制Azure服务总线使用者。
我希望实现的目标-* Azure Service Bus消息侦听器开始/停止从队列/主题接收消息。*
以下是详细解释。
目前,我已将Azure服务总线集成到我的应用程序中,我们将在 Spring 启动应用程序启动时立即侦听消息。现在,我希望修改此逻辑。默认情况下,Azure服务总线消息侦听器将被禁用。在ApplicationReadyEvent
上,我希望执行某个任务,然后再次启用Azure服务总线消息侦听器以开始从主题或队列侦听。
那么我如何才能做到呢?
- 应用程序. yml**
spring:
cloud:
azure:
servicebus:
namespace: **********
xxx:
azure:
servicebus:
connection: ***********
queue: **********
- 蓝色配置. java**
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
@Configuration
public class AzureConfiguration{
@Value("${xxx.azure.servicebus.connection}")
private String serviceBusConnection;
@Value("${xxx.azure.servicebus.queue}")
private String serviceBusQueue;
private static final String SERVICE_BUS_INPUT_CHANNEL = "yyyyy";
private static final String SENSOR_DATA_CHANNEL = "zzzzz";
private static final String SERVICE_BUS_LISTENER_CONTAINER = "aaaaa";
@Bean(name = SERVICE_BUS_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer serviceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory) {
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setConnectionString(serviceBusConnection);
containerProperties.setEntityName(serviceBusQueue);
containerProperties.setAutoComplete(true);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}
@Bean
public ServiceBusInboundChannelAdapter serviceBusInboundChannelAdapter(
@Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean(name = SERVICE_BUS_INPUT_CHANNEL)
public MessageChannel serviceBusInputChannel() {
return new DirectChannel();
}
@Bean(name = SENSOR_DATA_CHANNEL)
public MessageChannel sensorDataChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow serviceBusMessageFlow() {
return IntegrationFlows.from(SERVICE_BUS_INPUT_CHANNEL)
.<byte[], String>transform(String::new)
.channel(SENSOR_DATA_CHANNEL)
.get();
}
}
- 应用事件侦听器服务. java**
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
@AllArgsConstructor
public class AppEventListenerService{
@EventListener(ApplicationReadyEvent.class)
public void OnApplicationStarted() {
log.debug("Enter OnApplicationStarted");
// By Default Azure Service Bus Message Listener will be disable
// do some task
// Enable Azure Bus Message Listener
log.debug("Exit OnApplicationStarted");
}
}
在www.example.com的上述代码中,AppEventListenerService.java ,
//启用Azure总线消息侦听器-在这里,我希望启动ServiceBusConsumer以从主题/队列接收消息。
2条答案
按热度按时间qv7cva1a1#
@JMSListener
时,我们可以开始停止它。Autowired
这个JmsListenerEndpointRegistry
对象并停止侦听器。要停止JMS,必须使用stop函数:
/stop
API,它将停止JMS,只有在调用/start
API后,消息才会开始到来。输出:
ig9co6j12#
当您使用注册为bean的集成流时,启动/停止它的最简单方法是将其自动连接为
StandardIntegrationFlow
并调用相应的方法,如下所示:注意,可能需要
@DependsOn
注解来强制流bean在事件监听器bean之前初始化。另外,应该注意,一些消息可能碰巧在流初始化之后和监听器触发之前通过。