pubsub:信道用户丢失

j2cgzkjk  于 2021-07-14  发布在  Java
关注(0)|答案(0)|浏览(291)

一切都在标题里。我有一个特定的通道将数据发送到pubsub,使用spring集成和有关gcp pubsub的信息
我在本地和qa环境上没有任何问题。但是,在prod中,我有以下错误:

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 
'pubSubFlow.channel#1'; nested exception is java.lang.IllegalStateException: The [bean 
'pubSubFlow.channel#1'; defined in: 'class path resource [fr/auchan/lark/tracking/api/v1
/pubsub/PubSubRequestIntegration.class]'; from source: 'bean method pubSubFlow'] doesn't 
have subscribers to accept messages

org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167) ~[spring-integration-core-5.3.5.RELEASE.jar:5.3.5.RELEASE]
org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:600) ~[spring-integration-core-5.3.5.RELEASE.jar:5.3.5.RELEASE]
org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520) ~[spring-integration-core-5.3.5.RELEASE.jar:5.3.5.RELEASE]
org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$2(FluxMessageChannel.java:83) ~[spring-integration-core-5.3.5.RELEASE.jar:5.3.5.RELEASE]
reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189) ~[reactor-core-3.3.13.RELEASE.jar:3.3.13.RELEASE]
reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:439) ~[reactor-core-3.3.13.RELEASE.jar:3.3.13.RELEASE]
reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:526) ~[reactor-core-3.3.13.RELEASE.jar:3.3.13.RELEASE]
reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) ~[reactor-core-3.3.13.RELEASE.jar:3.3.13.RELEASE]
reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ~[reactor-core-3.3.13.RELEASE.jar:3.3.13.RELEASE]
java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[na:na]
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[na:na]
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[na:na]
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[na:na]
java.base/java.lang.Thread.run(Unknown Source) ~[na:na]
Caused by: java.lang.IllegalStateException: The [bean 'pubSubFlow.channel#1'; defined in: 'class path resource [PubSubRequestIntegration.class]'; from source: 'bean method pubSubFlow'] doesn't have subscribers to accept messages
org.springframework.util.Assert.state(Assert.java:97) ~[spring-core-5.2.12.RELEASE.jar:5.2.12.RELEASE]
org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:61) ~[spring-integration-core-5.3.5.RELEASE.jar:5.3.5.RELEASE]
org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570) ~[spring-integration-core-5.3.5.RELEASE.jar:5.3.5.RELEASE]
 12 common frames omitted

下面是我的频道声明,serviceactivator的使用是pubsub指南(参见上面的链接)。

@Bean
public MessageChannel dataChannel() {
    return MessageChannels.publishSubscribe(Executors.newCachedThreadPool()).get();
}

@Bean
public MessageChannel pubSubChannel() {
    return MessageChannels.publishSubscribe(Executors.newCachedThreadPool()).get();
}

@Bean
public IntegrationFlow pubSubFlow(
        MessageChannel dataChannel,
        MessageChannel pubSubChannel) {
    return IntegrationFlows
            .from(dataChannel)
            .fluxTransform(this::toPubSubFormat)
            .channel(pubSubChannel)
            .get();
}

@Bean
@ServiceActivator(inputChannel = "pubSubChannel")
public PubSubMessageHandler sendToPubSub(PubSubTemplate pubSubTemplate) {
    PubSubMessageHandler adapter = new PubSubMessageHandler(pubSubTemplate,
            pubSubIntegrationProperties.getTopic());

    adapter.setPublishCallback(
            new ListenableFutureCallback<>() {
                @Override
                public void onFailure(Throwable throwable) {
                    log.warn("There was the following error sending the message. " + throwable);
                }

                @Override
                public void onSuccess(String result) {
                    log.debug("Message was sent via the outbound channel adapter to {} : {}", pubSubIntegrationProperties.getTopic(), result);
                }
            });
    return adapter;
}

我错过什么了吗?为什么pubsubchannel被标记为没有订户?谢谢你的帮助

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题