我使用springcloudstreamapi中的聚合函数从主题创建一个物化视图。如下所示:
public void process(KStream<Object, Object> input){
input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized)
然后我查询我创建的statestore:
ReadOnlyWindowStore<Object, Object> windowStore =
queryService.getQueryableStoreType("test", QueryableStoreTypes.windowStore());
现在我的问题是,如何确定在process方法处理新事件之后,这个statestore是否已更新?他们的活动是我可以听的还是我可以创造一个?
1条答案
按热度按时间7z5jn7bk1#
你的计划是:
事实上,最后
aggregate()
返回一个KTable
对象。如果通过禁用缓存Materialized
您可以了解到KTable
通过: