如何每小时聚合数据?

js81xvg6  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(441)

每当用户喜欢我们网站上的某些内容时,我们收集事件,我们计划每小时提交一次内容的聚合收藏夹,并更新数据库中的总收藏夹数。
我们在评估Kafka流。遵循字数计算的例子。我们的拓扑结构很简单,生成一个主题a,读取聚合数据并将其提交给另一个主题b。然后每小时使用主题b中的事件并在db中提交。

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
   public StreamsConfig kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "favorite-streams");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
    return new StreamsConfig(props);
}

@Bean
public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {
    StreamsBuilder builder = streamBuilder();
    KStream<String, String> source = builder.stream(topic);
    source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
            .groupBy((key, value) -> value)
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store")).toStream()
            .to(topic + "-grouped", Produced.with(Serdes.String(), Serdes.Long()));

    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, kStreamsConfigs());
    streams.start();
    return source;
}

@Bean
public StreamsBuilder streamBuilder() {
    return new StreamsBuilder();
}

但是,当我使用这个主题b时,它会从一开始就为我提供聚合数据。我的问题是,我们是否可以有一些规定,其中我可以使用前几个小时的分组数据,然后提交给db,然后kakfa忘记前几个小时的数据,并每小时提供新的数据,而不是累计总和。设计拓扑是正确的还是我们可以做得更好?

kgqe7b3p

kgqe7b3p1#

如果希望每小时获得一个聚合结果,可以使用窗口大小为1小时的窗口聚合。

stream.groupBy(...)
      .windowedBy(TimeWindow.of(1 *3600 * 1000))
      .count(...)

查看文档了解更多详细信息:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing
输出类型为 Windowed<String> 钥匙(不是 String ). 你需要提供一个自定义 Window<String> serde,或转换密钥类型。参考sessionwindowsexample。

相关问题