Spring Boot Cloud PubSub [Sping Boot ]中的主题发送消息过多

ezykj2lf  于 2023-06-22  发布在  Spring
关注(0)|答案(1)|浏览(104)

我正在使用GCP和Cloud PubSub开发一个聊天服务器。
我想用Spring Integration从一个示例向Cloud PubSub中的主题发送消息,但它发送了太多消息。

DEFAULT 2023-06-14T18:53:50.785528Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:50.785542Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
ERROR 2023-06-14T18:53:51.690026Z org.springframework.messaging.MessageHandlingException: Missing header 'gcp_pubsub_original_message' for method parameter type [interface com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage], failedMessage=GenericMessage [payload={"roomId":1,"userId":1,"message":"zxcv"}, headers={replyChannel=nullChannel, errorChannel=, id=8e1e8628-36cf-ebd5-bd36-925668f6644c, timestamp=1686768830200}] at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:111) at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:108) at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1075) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:558) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:476) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:354) at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:114) at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136) at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222) at org.springframework.integration.dispatcher.BroadcastingDispatcher.…
DEFAULT 2023-06-14T18:53:52.215535Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.215551Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.355030Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.355046Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.516490Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.516506Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.662022Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.662037Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.789312Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.789327Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.991170Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.991186Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.115730Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.115747Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.241358Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.241373Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.358250Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.358270Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.504470Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.504486Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.637282Z 2023-06-15T03:53:53.631+09:00 WARN 1 --- [bscriber-SE-1-0] c.g.c.p.v.StreamingSubscriberConnection : failed to send operations
ERROR 2023-06-14T18:53:53.637335Z com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92) ~[gax-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67) ~[api-common-2.6.3.jar!/:2.6.3] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808) ~[guava-31.1-jre.jar!/:na] at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.53.0.jar!/:1.53.0] at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) ~[gax-grpc-2.23.3.jar!/:2.23.3] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal…
DEFAULT 2023-06-14T18:53:53.637519Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.637531Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.649037Z 2023-06-15T03:53:53.648+09:00 INFO 1 --- [bscriber-SE-1-0] c.g.c.p.v.StreamingSubscriberConnection : Permanent error invalid ack id message, will not resend
DEFAULT 2023-06-14T18:53:53.786696Z 2023-06-15T03:53:53.784+09:00 WARN 1 --- [bscriber-SE-1-1] c.g.c.p.v.StreamingSubscriberConnection : failed to send operations
ERROR 2023-06-14T18:53:53.786742Z com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92) ~[gax-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67) ~[api-common-2.6.3.jar!/:2.6.3] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808) ~[guava-31.1-jre.jar!/:na] at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.53.0.jar!/:1.53.0] at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) ~[gax-grpc-2.23.3.jar!/:2.23.3] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal…
DEFAULT 2023-06-14T18:53:53.786965Z 2023-06-15T03:53:53.786+09:00 INFO 1 --- [bscriber-SE-1-1] c.g.c.p.v.StreamingSubscriberConnection : Permanent error invalid ack id message, will not resend
DEFAULT 2023-06-14T18:53:53.827465Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.827481Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.973948Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.973962Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.116426Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.116443Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.237137Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.237158Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}

这是该示例的代码。

// Create an inbound channel adapter to listen to the subscription `chat-sub` and send
    // messages to the input message channel.
    @Bean
    fun inboundChannelAdapter(
        @Qualifier("inputMessageChannel") messageChannel: MessageChannel?,
        pubSubTemplate: PubSubTemplate?
    ): PubSubInboundChannelAdapter? {
        val adapter = PubSubInboundChannelAdapter(pubSubTemplate, "chat-sub")
        adapter.outputChannel = messageChannel
        adapter.ackMode = AckMode.AUTO
        adapter.payloadType = String::class.java
        return adapter
    }

    // Define what happens to the messages arriving in the message channel.
    @ServiceActivator(inputChannel = "inputMessageChannel")
    fun messageReceiver(
        payload: String,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) message: BasicAcknowledgeablePubsubMessage
    ) {
        val model = gson.fromJson(payload, ChatMessageToPubSub::class.java)
        
        messagingTemplate.convertAndSend("/sub/message/" + model.roomId.toString(), ChatMessageToClient(model.userId, model.message))
        message.ack()
        //LOGGER.info("Message arrived via an inbound channel adapter from chat-sub! Payload: $payload")
    }

    // Create an outbound channel adapter to send messages from the input message channel to the
    // topic `chat`.
    @Bean
    @ServiceActivator(inputChannel = "inputMessageChannel")
    fun messageSender(pubsubTemplate: PubSubTemplate?): MessageHandler? {
        val adapter = PubSubMessageHandler(pubsubTemplate, "chat")
        adapter.setSuccessCallback { ackId, message -> println("send success : " + message.payload) }
        adapter.setFailureCallback { cause, message -> println("send fail : " + message.payload) }
        return adapter
    }
@MessagingGateway(defaultRequestChannel = "inputMessageChannel")
interface PubSubOutBoundGateway {
    @kotlin.jvm.Throws(MessagingException::class)
    fun sendToPubsub(text : String)
}

因此,我在客户端关闭示例时收到了大约200条消息。
第一个错误是

org.springframework.messaging.MessageHandlingException: Missing header 'gcp_pubsub_original_message' for method parameter type [interface com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage]

我以为这是消息重发的关键,所以我尝试添加GcpPubSubHeaders.ORIGINAL_MESSAGE头。但是我找不到实现BasicAcknowledgeablePubsubMessage来生成对象的类。
我现在错过了什么?
或者有什么方法可以在异常发生时停止重试?

umuewwlo

umuewwlo1#

这是因为发送到主题的消息返回到inputMessageChannel,因此messageSender适配器再次工作。
所以我把入站通道和出站通道分开,所以代码看起来像这样。

@Bean
    fun inputMessageChannel(): MessageChannel? {
        return PublishSubscribeChannel()
    }

    @Bean
    fun outputMessageChannel(): MessageChannel? {
        return PublishSubscribeChannel()
    }

    // Create an inbound channel adapter to listen to the subscription `chat-sub` and send
    // messages to the input message channel.
    @Bean
    fun inboundChannelAdapter(
        @Qualifier("inputMessageChannel") messageChannel: MessageChannel?,
        pubSubTemplate: PubSubTemplate?
    ): PubSubInboundChannelAdapter? {
        val adapter = PubSubInboundChannelAdapter(pubSubTemplate, "chat-sub")
        adapter.outputChannel = messageChannel
        adapter.ackMode = AckMode.MANUAL
        adapter.payloadType = String::class.java
        return adapter
    }

    // Define what happens to the messages arriving in the message channel.
    @ServiceActivator(inputChannel = "inputMessageChannel")
    fun messageReceiver(
        payload: String,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) message: BasicAcknowledgeablePubsubMessage
    ) {
        val model = gson.fromJson(payload, ChatMessageToPubSub::class.java)
        println("received message : " + message.pubsubMessage.data.toStringUtf8())
        println(message.pubsubMessage.attributesMap)
        message.ack()
        messagingTemplate.convertAndSend("/sub/message/" + model.roomId.toString(), ChatMessageToClient(model.userId, model.message))
        //LOGGER.info("Message arrived via an inbound channel adapter from chat-sub! Payload: $payload")
    }

    // Create an outbound channel adapter to send messages from the input message channel to the
    // topic `chat`.
    @Bean
    @ServiceActivator(inputChannel = "outputMessageChannel")
    fun messageSender(pubsubTemplate: PubSubTemplate?): MessageHandler? {
        val adapter = PubSubMessageHandler(pubsubTemplate, "chat")
        adapter.setSuccessCallback { ackId, message -> println("send success : " + ackId + " - " + message.payload) }
        adapter.setFailureCallback { cause, message -> println("send fail : " + cause.stackTraceToString() + " - " + message.payload) }
        adapter.setPublishTimeoutExpressionString("500")
        return adapter
    }
@MessagingGateway(defaultRequestTimeout = "500", defaultRequestChannel = "outputMessageChannel", errorChannel = "errorChannel")
interface PubSubOutBoundGateway {
    @kotlin.jvm.Throws(MessagingException::class, MessageHandlingException::class)
    fun sendToPubsub(text : String)
}

那就成功了

相关问题