我有一个Spring Cloud Stream应用程序,它使用一个Kafka主题并最终更新一个ElasticSearch索引。
@Bean
public Consumer<Flux<Message<GraphTextKafkaRecord>>> fetchSeed() {
return messages -> messages
.map(message -> {
var ack = (Acknowledgment) message.getHeaders().get(ACKNOWLEDGEMENT_KEY);
var graphTextRecord = message.getPayload();
log.debug("Fetch message from kafka. message: {}", graphTextRecord);
return Tuples.of(ack, graphTextRecord);
})
.filter(tuple -> {
var result = appRules.test(tuple.getT2());
if (!result) {
tuple.getT1().acknowledge();
log.debug("Message: {} has been filtered due to rules defined in application", tuple.getT2());
}
return result;
})
.map(tuple -> {
GraphTextKafkaRecord kafkaRecord = tuple.getT2();
return PageQuality
.builder(tuple.getT1(), kafkaRecord.url(), kafkaRecord.pageQuality())
.pageQualityAlphaScore(kafkaRecord.pageQualityAlpha())
.statusCode(kafkaRecord.statusCode())
.build();
})
.flatMap(esHandler::insertPageQualityScore)
.retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(5)))
.subscribe();
}
这是我的相关配置:
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
auto-create-topics: false
brokers: ${PAGE_QUALITY_PROD_KAFKA_BROKERS:x.x.x.x:9092}
enable-observation: false
consumer-properties:
allow.auto.create.topics: false
bindings:
fetchSeed-in-0:
consumer:
ack-mode: manual
enable-dlq: false
poll-timeout: 21474836470
start-offset: earliest
bindings:
fetchSeed-in-0:
group: page-quality-group-prod
destination: ${PAGE_QUALITY_PROD_KAFKA_TOPIC_NAME:graph-text}
consumer:
max-attempts: 10
back-off-initial-interval: 500
back-off-max-interval: 200
back-off-multiplier: 2.0
elasticsearch:
uris: ${PAGE_QUALITY_PROD_ELASTIC_SEARCH_HOST:http://x.x.x.x:9200}
username: ${PAGE_QUALITY_PROD_ELASTIC_SEARCH_USERNAME:user}
password: ${PAGE_QUALITY_PROD_ELASTIC_SEARCH_PASSWORD:pass}
socket-timeout: 30s
connection-timeout: 30S
它运行良好,但几天后抛出以下异常:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2944)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2891)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2857)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2780)
at io.micrometer.observation.Observation.observe(Observation.java:559)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2778)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2630)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2516)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2168)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1523)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1487)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1362)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.kafka.KafkaException: Failed to execute runnable
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:75)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:461)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:425)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2877)
... 12 more
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.fetchSeed-in-0'., failedMessage=GenericMessage [payload=byte[88549], headers={kafka_offset=9184809, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@19b7a34e, deliveryAttempt=10, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=34, kafka_receivedMessageKey=[B@2d72e8c7, kafka_receivedTopic=graph-text, kafka_receivedTimestamp=1678842937842, kafka_acknowledgment=Acknowledgment for graph-text-34@9184809, contentType=application/json, kafka_groupId=page-quality-group-prod}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
at io.micrometer.observation.Observation.observe(Observation.java:492)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.lambda$onMessage$0(KafkaMessageDrivenChannelAdapter.java:464)
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.lambda$doWithRetry$0(KafkaInboundEndpoint.java:70)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:66)
... 15 more
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[88549], headers={kafka_offset=9184809, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@19b7a34e, deliveryAttempt=10, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=34, kafka_receivedMessageKey=[B@2d72e8c7, kafka_receivedTopic=graph-text, kafka_receivedTimestamp=1678842937842, kafka_acknowledgment=Acknowledgment for graph-text-34@9184809, contentType=application/json, kafka_groupId=page-quality-group-prod}]
它也来自执行器:
{
"status": "UP",
"components": {
"binders": {
"status": "UP",
"components": {
"kafka": {
"status": "UP",
"details": {
"topicsInUse": [
"graph-text"
],
"listenerContainers": [
{
"isPaused": false,
"listenerId": "KafkaConsumerDestination{consumerDestinationName='graph-text', partitions=0, dlqName='null'}.container",
"isRunning": true,
"groupId": "page-quality-group-prod",
"isStoppedAbnormally": false
}
]
}
}
}
}
}
}
正如您所看到的,它显示一切正常,但应用程序不能再使用消息。
我不知道问题出在哪里。
如果你需要更多的信息,请告诉我。
1条答案
按热度按时间pftdvrlh1#
当React式函数失败,并且您没有正确处理其中的错误时,流会中断,并且无法通过s-c-stream恢复,因为s-c-stream无法控制它。这是使用React式API的一个众所周知的限制。基本上,每当有消息时,框架都会调用命令式函数。因此我们可以完全控制失败和重试。2React函数在启动时只被调用一次,连接用户通过用户函数提供的流。3一旦流被连接,s-c-stream在后续处理中不起作用。您可以在此处阅读更多信息