我有一个springboot应用程序,它有两个通过springcloudMap的流处理器。对于不同的主题,每个处理器都有自己的@streamlistener。一个处理器将聚合数据写入quarable状态存储。我在单元测试中遇到了通过@service(service get aggregated data from state store)获取数据的问题。出于某些原因,有时会捕获异常: org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, recently-played-store, may have migrated to another instance. at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043) at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47)
当我从另一个处理器中删除streamlistener时,所有的工作都很好而且稳定。
如何用合适的处理器绑定示例的状态存储?
1条答案
按热度按时间tpgth1q71#
我已经找到了解决我问题的办法,也许它会帮助别人。我用的不是最新版本的springcloudstream活页夹kafka-streams。我的版本是2.0.0,它有一个bughttps://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/366 . 它只在版本2.0.1中修复