我在连续处理相对大量的消息时遇到了这个问题。这是我的完整设置:
spring cloud stream kafka streams(boot 2.3.1,scs horsham.sr6,kafka client 2.5.0)应用程序,具有两个功能性kstream
kstream0接收来自主题x的消息,并在数据库中进行一些处理保存
kstream1接收来自主题a的消息并更新状态存储store1
kstream2从主题b接收消息。使用 InteractiveQuery
并在此基础上将消息保存到mongodb数据库中
合流云中的Kafka簇
假设如下:
store1完全填充了kstream2所需的数据
scs应用程序只有一个示例
问题:
我在主题b中发布了约5000条消息。kstream2开始接收它们,检查状态存储并正确处理大量消息。
在某个点上,独立于其他元素的kstream0失败,将kafkastreams的状态变为错误
在这一刻之后,kstream2不断地获取记录,但是调用 interactiveQueryService.getQueryableStore(STORE1, keyValueStore<Id, Something>())
失败,然后重试(我设置了一个状态存储retry max attempts:20和backoff period:1500),直到它最终进入kstream2的catch块失败:
@Bean
fun processSomething() = Consumer<KStream<Id, AThing>> { aThing ->
.foreach { key, value ->
try{
val x = interactiveQueryService.getQueryableStore(STORE1, keyValueStore<Id, Something>())
//save to db
} catch (ex: RuntimeException) {
logger.error("Error processing metric update", ex)
}
尽管我绕过了这个错误,但例外是 KafkaStreams is not running. State is ERROR
所以kstream2什么也不做
如果我重新启动应用程序,我会再次正常工作一段时间,直到一段时间后失败。
错误不是由于错误的消息格式或处理管道中的任何内容造成的。它似乎总是在状态存储重试之后出现。
堆栈跟踪:
java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
at org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:316) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1182) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1169) ~[kafka-streams-2.5.0.jar:na]
at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getQueryableStore$0(InteractiveQueryService.java:100) ~[spring-cloud-stream-binder-kafka-streams-3.0.6.RELEASE.jar:3.0.6.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService$$Lambda$2955/0000000000000000.doWithRetry(Unknown Source) ~[na:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.getQueryableStore(InteractiveQueryService.java:92) ~[spring-cloud-stream-binder-kafka-streams-3.0.6.RELEASE.jar:3.0.6.RELEASE]
at xxx.service.metric.deployment.MyService.getStateStore1Data(MyService.kt:45) ~[main/:na]
...
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) ~[kafka-streams-2.5.0.jar:na]
为什么kstream0的崩溃会影响在kstream2中对state store1的调用?我怎样才能减轻这一点?
暂无答案!
目前还没有任何答案,快来回答吧!