在两个不同@streamlistener示例之间嵌入kafka迁移状态存储

i5desfxk  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(457)

我有一个springboot应用程序,它有两个通过springcloudMap的流处理器。对于不同的主题,每个处理器都有自己的@streamlistener。一个处理器将聚合数据写入quarable状态存储。我在单元测试中遇到了通过@service(service get aggregated data from state store)获取数据的问题。出于某些原因,有时会捕获异常: org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, recently-played-store, may have migrated to another instance. at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043) at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47) 当我从另一个处理器中删除streamlistener时,所有的工作都很好而且稳定。
如何用合适的处理器绑定示例的状态存储?

tpgth1q7

tpgth1q71#

我已经找到了解决我问题的办法,也许它会帮助别人。我用的不是最新版本的springcloudstream活页夹kafka-streams。我的版本是2.0.0,它有一个bughttps://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/366 . 它只在版本2.0.1中修复

相关问题