kafka流式提交kgroupedtable的最新消息

nkoocmlb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(310)

我有Kafka流应用程序如下:

static KafkaStreams build(AppConfig appConfig, SerdesHelper serdes) {
  final KStreamBuilder builder = new KStreamBuilder();

  builder
      .table(serdes.sourceKeySerde, serdes.sourceValueSerde, appConfig.sourceTopic)
      .groupBy(StreamBuilder::groupByMapper, serdes.intSerde, serdes.longSerde)
      .aggregate(
          StreamBuilder::initialize,
          StreamBuilder::add,
          StreamBuilder::subtract,
          serdes.sinkValueSerde)
      .to(serdes.intSerde, serdes.sinkValueSerde, appConfig.sinkTopic);

  return new KafkaStreams(builder, appConfig.streamConfig);
}

我的具体例子如下

((k, v)) -> ((k), v[])

在使用只有两个唯一键的3.000.000条消息的虚拟数据运行时,我在 sinkTopic 在不到一分钟,我希望得到4/6(根据当时我设法停止应用程序)。
如何确保只有具有最新分组值的密钥才会提交回kafka,而不是每个中间消息?

rbl8hiat

rbl8hiat1#

这是流处理,而不是批处理。没有“最新分组值”——输入是无限的,因此输出是无限的。。。
你只能减少中间产物的数量
增加ktable缓存大小(但这对于您的情况来说似乎不是问题,因为您只有2个唯一的密钥,因此如果不禁用缓存或
增加提交间隔

相关问题