因此,我将spring amqp用于以下配置:
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(false);
CachingConnectionFactory ccf = new CachingConnectionFactory(factory);
ccf.setAddresses(addresses);
...
我正在使用amqp admin手动创建队列。在一些更新的库更新之前,使用此代码一切正常:
private void recreateContainer() {
// if we are not already in recreation process
if (this.recreatingContainer.compareAndSet(false, true)) {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(
() -> {
int attempt = 0;
while (this.listenerContainer.isRunning() && attempt < MAX_STOP_WAIT_ATTEMPTS) {
try {
attempt++;
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (attempt == MAX_STOP_WAIT_ATTEMPTS) {
logger.error("Container took too long to stop");
}
logger.info(String.format("Recreating listener container after %s attempts", attempt));
startNewContainer();
this.recreatingContainer.set(false);
});
}
}
@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
Object source = event.getSource();
Assert.isInstanceOf(
MessageListenerContainer.class,
source,
"source must be instance of MessageListenerContainer");
MessageListenerContainer listenerContainer = (MessageListenerContainer) source;
// if fatal and this state model is responsible for this listenerContainer
if (event.isFatal() && this.listenerContainer == listenerContainer) {
recreateContainer();
}
}
看来这是从 ListenerContainerConsumerFailedEvent
至 ListenerContainerConsumerTerminatedEvent
现在我的代码不工作了。
如果我更新了异常类型,队列将被重新创建,但我在停止listenercontainer时遇到问题。
ERROR [2021-03-08T16:52:10.291+01:00] lambda$recreateContainer$0: Container took too long to stop
INFO [2021-03-08T16:52:10.291+01:00] lambda$recreateContainer$0: Recreating listener container after 100 attempts
现在,我得到了10个监听器,而不是5个监听器(因此,似乎监听器在重新创建队列后设法恢复,但在状态队列不存在时,我无法停止容器)。
任何帮助都将不胜感激。
1条答案
按热度按时间oalqel3c1#
似乎这段代码和
DirectMessageListenerContainer
```private void recreateContainer() {
// if we are not already in recreation process
if (this.recreatingContainer.compareAndSet(false, true)) {
this.listenerContainer.stop();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(
() -> {
int attempt = 0;
while (this.listenerContainer.isRunning() && attempt < MAX_STOP_WAIT_ATTEMPTS) {
try {
attempt++;
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (attempt == MAX_STOP_WAIT_ATTEMPTS) {
log.error("Container took too long to stop");
}
log.info(String.format("Recreating listener container after %s attempts", attempt));
startNewContainer();
this.recreatingContainer.set(false);
});
}
}
@Override
public void onApplicationEvent(ListenerContainerConsumerTerminatedEvent event) {
Object source = event.getSource();
Assert.isInstanceOf(
MessageListenerContainer.class,
source,
"source must be instance of MessageListenerContainer");
MessageListenerContainer sourceContainer = (MessageListenerContainer) source;
// if this state model is responsible for this listenerContainer
if (this.listenerContainer == sourceContainer) {
recreateContainer();
}
}