如何确定在SpringCloudStreamAPI中更新了密钥库

ix0qys7i  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(243)

我使用springcloudstreamapi中的聚合函数从主题创建一个物化视图。如下所示:

public void process(KStream<Object, Object> input){
input
  .peek((key, value) ->{...}
  .map((key, value) -> {...}
  .groupByKey()
  .windowedBy(TimeWindows.of(5000))
  .aggregate(Initializer, Aggregator, Materialized)

然后我查询我创建的statestore:

ReadOnlyWindowStore<Object, Object> windowStore =
  queryService.getQueryableStoreType("test", QueryableStoreTypes.windowStore());

现在我的问题是,如何确定在process方法处理新事件之后,这个statestore是否已更新?他们的活动是我可以听的还是我可以创造一个?

7z5jn7bk

7z5jn7bk1#

你的计划是:

input
  .peek((key, value) ->{...}
  .map((key, value) -> {...}
  .groupByKey()
  .windowedBy(TimeWindows.of(5000))
  .aggregate(Initializer, Aggregator, Materialized)

事实上,最后 aggregate() 返回一个 KTable 对象。如果通过禁用缓存 Materialized 您可以了解到 KTable 通过:

input
  .peek((key, value) ->{...}
  .map((key, value) -> {...}
  .groupByKey()
  .windowedBy(TimeWindows.of(5000))
  .aggregate(Initializer, Aggregator, Materialized) // disable caching via Materialized
  .toStream()
  .foreach(...) // react to every update to the KTable

相关问题