java 流云流抛出消息调度异常:调度程序有时没有订阅者

6ojccjat  于 2023-03-16  发布在  Java
关注(0)|答案(1)|浏览(124)

我有一个Spring Cloud Stream应用程序,它使用一个Kafka主题并最终更新一个ElasticSearch索引。

@Bean
public Consumer<Flux<Message<GraphTextKafkaRecord>>> fetchSeed() {
    return messages -> messages
            .map(message -> {
                var ack = (Acknowledgment) message.getHeaders().get(ACKNOWLEDGEMENT_KEY);
                var graphTextRecord = message.getPayload();
                log.debug("Fetch message from kafka. message: {}", graphTextRecord);
                return Tuples.of(ack, graphTextRecord);
            })
            .filter(tuple -> {
                var result = appRules.test(tuple.getT2());
                if (!result) {
                    tuple.getT1().acknowledge();
                    log.debug("Message: {} has been filtered due to rules defined in application", tuple.getT2());
                }
                return result;
            })
            .map(tuple -> {
                GraphTextKafkaRecord kafkaRecord = tuple.getT2();
                return PageQuality
                        .builder(tuple.getT1(), kafkaRecord.url(), kafkaRecord.pageQuality())
                        .pageQualityAlphaScore(kafkaRecord.pageQualityAlpha())
                        .statusCode(kafkaRecord.statusCode())
                        .build();
            })
            .flatMap(esHandler::insertPageQualityScore)
            .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(5)))
            .subscribe();
}

这是我的相关配置:

spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          auto-create-topics: false
          brokers: ${PAGE_QUALITY_PROD_KAFKA_BROKERS:x.x.x.x:9092}
          enable-observation: false
          consumer-properties:
            allow.auto.create.topics: false
        bindings:
          fetchSeed-in-0:
            consumer:
              ack-mode: manual
              enable-dlq: false
              poll-timeout: 21474836470
              start-offset: earliest
      bindings:
        fetchSeed-in-0:
          group: page-quality-group-prod
          destination: ${PAGE_QUALITY_PROD_KAFKA_TOPIC_NAME:graph-text}
          consumer:
            max-attempts: 10
            back-off-initial-interval: 500
            back-off-max-interval: 200
            back-off-multiplier: 2.0
  elasticsearch:
    uris: ${PAGE_QUALITY_PROD_ELASTIC_SEARCH_HOST:http://x.x.x.x:9200}
    username: ${PAGE_QUALITY_PROD_ELASTIC_SEARCH_USERNAME:user}
    password: ${PAGE_QUALITY_PROD_ELASTIC_SEARCH_PASSWORD:pass}
    socket-timeout: 30s
    connection-timeout: 30S

它运行良好,但几天后抛出以下异常:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2944)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2891)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2857)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2780)
    at io.micrometer.observation.Observation.observe(Observation.java:559)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2778)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2630)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2516)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2168)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1523)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1487)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1362)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.kafka.KafkaException: Failed to execute runnable
    at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:75)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:461)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:425)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2877)
    ... 12 more
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.fetchSeed-in-0'., failedMessage=GenericMessage [payload=byte[88549], headers={kafka_offset=9184809, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@19b7a34e, deliveryAttempt=10, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=34, kafka_receivedMessageKey=[B@2d72e8c7, kafka_receivedTopic=graph-text, kafka_receivedTimestamp=1678842937842, kafka_acknowledgment=Acknowledgment for graph-text-34@9184809, contentType=application/json, kafka_groupId=page-quality-group-prod}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
    at io.micrometer.observation.Observation.observe(Observation.java:492)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.lambda$onMessage$0(KafkaMessageDrivenChannelAdapter.java:464)
    at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.lambda$doWithRetry$0(KafkaInboundEndpoint.java:70)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
    at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:66)
    ... 15 more
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[88549], headers={kafka_offset=9184809, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@19b7a34e, deliveryAttempt=10, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=34, kafka_receivedMessageKey=[B@2d72e8c7, kafka_receivedTopic=graph-text, kafka_receivedTimestamp=1678842937842, kafka_acknowledgment=Acknowledgment for graph-text-34@9184809, contentType=application/json, kafka_groupId=page-quality-group-prod}]

它也来自执行器:

{
  "status": "UP",
  "components": {
    "binders": {
      "status": "UP",
      "components": {
        "kafka": {
          "status": "UP",
          "details": {
            "topicsInUse": [
              "graph-text"
            ],
            "listenerContainers": [
              {
                "isPaused": false,
                "listenerId": "KafkaConsumerDestination{consumerDestinationName='graph-text', partitions=0, dlqName='null'}.container",
                "isRunning": true,
                "groupId": "page-quality-group-prod",
                "isStoppedAbnormally": false
              }
            ]
          }
        }
      }
    }
  }
}

正如您所看到的,它显示一切正常,但应用程序不能再使用消息。
我不知道问题出在哪里。
如果你需要更多的信息,请告诉我。

pftdvrlh

pftdvrlh1#

当React式函数失败,并且您没有正确处理其中的错误时,流会中断,并且无法通过s-c-stream恢复,因为s-c-stream无法控制它。这是使用React式API的一个众所周知的限制。基本上,每当有消息时,框架都会调用命令式函数。因此我们可以完全控制失败和重试。2React函数在启动时只被调用一次,连接用户通过用户函数提供的流。3一旦流被连接,s-c-stream在后续处理中不起作用。您可以在此处阅读更多信息

相关问题