当一个kstream失败时,出现“kafkastreams is not running”状态为error“它影响在另一个kstream中进行的interactivequeries调用为什么?

fdx2calv  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(261)

我在连续处理相对大量的消息时遇到了这个问题。这是我的完整设置:
spring cloud stream kafka streams(boot 2.3.1,scs horsham.sr6,kafka client 2.5.0)应用程序,具有两个功能性kstream
kstream0接收来自主题x的消息,并在数据库中进行一些处理保存
kstream1接收来自主题a的消息并更新状态存储store1
kstream2从主题b接收消息。使用 InteractiveQuery 并在此基础上将消息保存到mongodb数据库中
合流云中的Kafka簇
假设如下:
store1完全填充了kstream2所需的数据
scs应用程序只有一个示例
问题:
我在主题b中发布了约5000条消息。kstream2开始接收它们,检查状态存储并正确处理大量消息。
在某个点上,独立于其他元素的kstream0失败,将kafkastreams的状态变为错误
在这一刻之后,kstream2不断地获取记录,但是调用 interactiveQueryService.getQueryableStore(STORE1, keyValueStore<Id, Something>()) 失败,然后重试(我设置了一个状态存储retry max attempts:20和backoff period:1500),直到它最终进入kstream2的catch块失败:

@Bean
    fun processSomething() = Consumer<KStream<Id, AThing>> { aThing ->
       .foreach { key, value ->
           try{
                val x = interactiveQueryService.getQueryableStore(STORE1, keyValueStore<Id, Something>())
                //save to db
           } catch (ex: RuntimeException) {
               logger.error("Error processing metric update", ex)
           }

尽管我绕过了这个错误,但例外是 KafkaStreams is not running. State is ERROR 所以kstream2什么也不做
如果我重新启动应用程序,我会再次正常工作一段时间,直到一段时间后失败。
错误不是由于错误的消息格式或处理管道中的任何内容造成的。它似乎总是在状态存储重试之后出现。
堆栈跟踪:

java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
    at org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:316) ~[kafka-streams-2.5.0.jar:na]
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1182) ~[kafka-streams-2.5.0.jar:na]
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1169) ~[kafka-streams-2.5.0.jar:na]
    at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getQueryableStore$0(InteractiveQueryService.java:100) ~[spring-cloud-stream-binder-kafka-streams-3.0.6.RELEASE.jar:3.0.6.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService$$Lambda$2955/0000000000000000.doWithRetry(Unknown Source) ~[na:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.5.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.5.RELEASE.jar:na]
    at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.getQueryableStore(InteractiveQueryService.java:92) ~[spring-cloud-stream-binder-kafka-streams-3.0.6.RELEASE.jar:3.0.6.RELEASE]
    at xxx.service.metric.deployment.MyService.getStateStore1Data(MyService.kt:45) ~[main/:na]
...
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) ~[kafka-streams-2.5.0.jar:na]

为什么kstream0的崩溃会影响在kstream2中对state store1的调用?我怎样才能减轻这一点?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题