我正在使用GCP和Cloud PubSub开发一个聊天服务器。
我想用Spring Integration从一个示例向Cloud PubSub中的主题发送消息,但它发送了太多消息。
DEFAULT 2023-06-14T18:53:50.785528Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:50.785542Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
ERROR 2023-06-14T18:53:51.690026Z org.springframework.messaging.MessageHandlingException: Missing header 'gcp_pubsub_original_message' for method parameter type [interface com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage], failedMessage=GenericMessage [payload={"roomId":1,"userId":1,"message":"zxcv"}, headers={replyChannel=nullChannel, errorChannel=, id=8e1e8628-36cf-ebd5-bd36-925668f6644c, timestamp=1686768830200}] at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:111) at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:108) at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1075) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:558) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:476) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:354) at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:114) at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136) at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222) at org.springframework.integration.dispatcher.BroadcastingDispatcher.…
DEFAULT 2023-06-14T18:53:52.215535Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.215551Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.355030Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.355046Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.516490Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.516506Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.662022Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.662037Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.789312Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.789327Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.991170Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.991186Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.115730Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.115747Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.241358Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.241373Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.358250Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.358270Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.504470Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.504486Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.637282Z 2023-06-15T03:53:53.631+09:00 WARN 1 --- [bscriber-SE-1-0] c.g.c.p.v.StreamingSubscriberConnection : failed to send operations
ERROR 2023-06-14T18:53:53.637335Z com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92) ~[gax-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67) ~[api-common-2.6.3.jar!/:2.6.3] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808) ~[guava-31.1-jre.jar!/:na] at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.53.0.jar!/:1.53.0] at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) ~[gax-grpc-2.23.3.jar!/:2.23.3] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal…
DEFAULT 2023-06-14T18:53:53.637519Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.637531Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.649037Z 2023-06-15T03:53:53.648+09:00 INFO 1 --- [bscriber-SE-1-0] c.g.c.p.v.StreamingSubscriberConnection : Permanent error invalid ack id message, will not resend
DEFAULT 2023-06-14T18:53:53.786696Z 2023-06-15T03:53:53.784+09:00 WARN 1 --- [bscriber-SE-1-1] c.g.c.p.v.StreamingSubscriberConnection : failed to send operations
ERROR 2023-06-14T18:53:53.786742Z com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92) ~[gax-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67) ~[api-common-2.6.3.jar!/:2.6.3] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808) ~[guava-31.1-jre.jar!/:na] at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.53.0.jar!/:1.53.0] at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) ~[gax-grpc-2.23.3.jar!/:2.23.3] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal…
DEFAULT 2023-06-14T18:53:53.786965Z 2023-06-15T03:53:53.786+09:00 INFO 1 --- [bscriber-SE-1-1] c.g.c.p.v.StreamingSubscriberConnection : Permanent error invalid ack id message, will not resend
DEFAULT 2023-06-14T18:53:53.827465Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.827481Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.973948Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.973962Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.116426Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.116443Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.237137Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.237158Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
这是该示例的代码。
// Create an inbound channel adapter to listen to the subscription `chat-sub` and send
// messages to the input message channel.
@Bean
fun inboundChannelAdapter(
@Qualifier("inputMessageChannel") messageChannel: MessageChannel?,
pubSubTemplate: PubSubTemplate?
): PubSubInboundChannelAdapter? {
val adapter = PubSubInboundChannelAdapter(pubSubTemplate, "chat-sub")
adapter.outputChannel = messageChannel
adapter.ackMode = AckMode.AUTO
adapter.payloadType = String::class.java
return adapter
}
// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
fun messageReceiver(
payload: String,
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) message: BasicAcknowledgeablePubsubMessage
) {
val model = gson.fromJson(payload, ChatMessageToPubSub::class.java)
messagingTemplate.convertAndSend("/sub/message/" + model.roomId.toString(), ChatMessageToClient(model.userId, model.message))
message.ack()
//LOGGER.info("Message arrived via an inbound channel adapter from chat-sub! Payload: $payload")
}
// Create an outbound channel adapter to send messages from the input message channel to the
// topic `chat`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
fun messageSender(pubsubTemplate: PubSubTemplate?): MessageHandler? {
val adapter = PubSubMessageHandler(pubsubTemplate, "chat")
adapter.setSuccessCallback { ackId, message -> println("send success : " + message.payload) }
adapter.setFailureCallback { cause, message -> println("send fail : " + message.payload) }
return adapter
}
@MessagingGateway(defaultRequestChannel = "inputMessageChannel")
interface PubSubOutBoundGateway {
@kotlin.jvm.Throws(MessagingException::class)
fun sendToPubsub(text : String)
}
因此,我在客户端关闭示例时收到了大约200条消息。
第一个错误是
org.springframework.messaging.MessageHandlingException: Missing header 'gcp_pubsub_original_message' for method parameter type [interface com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage]
我以为这是消息重发的关键,所以我尝试添加GcpPubSubHeaders.ORIGINAL_MESSAGE头。但是我找不到实现BasicAcknowledgeablePubsubMessage来生成对象的类。
我现在错过了什么?
或者有什么方法可以在异常发生时停止重试?
1条答案
按热度按时间umuewwlo1#
这是因为发送到主题的消息返回到inputMessageChannel,因此messageSender适配器再次工作。
所以我把入站通道和出站通道分开,所以代码看起来像这样。
那就成功了