kafka流处理批处理数据以重置聚合

hs1rzwqc  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(378)

我的kafka主题“datasource”中有一些数据,其模式如下(此处为演示简化):

{ "deal" : -1, "location": "", "value": -1, "type": "init" }
{ "deal": 123456, "location": "Mars", "value": 100.0, "type": "batch" },
{ "deal" 123457, "location": "Earth", "value", 200.0, "type": "batch" },
{ "deal": -1, "location": "", "value", -1, "type": "commit" }

这些数据来自批量运行,我们获取所有交易并重新计算其价值。把它想象成一个一天开始的过程——在这一点上,这里有一组所有地点的新数据。当init和commit消息没有发送到真正的主题时,它们就会被生产者过滤掉。
在白天,随着情况的变化,会有更新。这将提供新数据(在本例中,我们可以忽略覆盖数据,因为这将通过重新运行批处理来处理):

{ "deal": 123458, "location", "Mars", "value": 150.0, "type": "update" }

这些数据以kstream“positions”的形式进入应用程序。
另一个主题“位置”列出了可能的位置。它们作为一个可扩展的位置拉入java kafka streams应用程序:

{ "id": 1, "name": "Mars" },
{ "id": 2, "name": "Earth"}

计划是使用Java9KafkaStreams应用程序按位置分组来聚合这些值。输出应该类似于:

{ "id": 1, "location": "Earth", "sum": 250.0 },
{ "id": 2, "location": "Mars": "sum": 200.0 }

这是我迄今为止的工作:

StreamsBuilder builder = new StreamsBuilder();

/**snip creating serdes, settings up stores, boilerplate**/

final GlobalKTable<Integer, Location> locations = builder.globalTable(
                LOCATIONS_TOPIC, 
                /* serdes, materialized, etc */
                );

final KStream<Integer, PositionValue> positions = builder.stream(
                POSITIONS_TOPIC,
                /* serdes, materialized, etc */
            );

/* The real thing is more than just a name, so a transformer is used to match locations to position values, and filter ones that we don't care about */
KStream<Location, PositionValue> joined = positions
                .transform(() -> new LocationTransformer(), POSITION_STORE) 
                .peek((location, positionValue) -> { 
                    LOG.debugv("Processed position {0} against location {1}", positionValue, location);
                });

/**This is where it is grouped and aggregated here**/
joined.groupByKey(Grouped.with(locationSerde, positionValueSerde))
            .aggregate(Aggregation::new, /* initializer */
                       (location, positionValue, aggregation) -> aggregation.updateFrom(location, positionValue), /* adder */
                Materialized.<Location, Aggregation>as(aggrStoreSupplier)
                    .withKeySerde(locationSerde)
                    .withValueSerde(aggregationSerde)
            );

Topology topo = builder.build();

我的问题是,这是聚合一切-所以每天一批,加上更新,然后下一个每天一批,所有得到添加。基本上,我需要一种方式说“这里是下一组批处理数据,根据这个重置”。我不知道怎么做-救命啊!
谢谢

wlp8pajw

wlp8pajw1#

所以,如果我理解正确的话,您希望聚合数据,但仅限于最后一天,并丢弃其余的数据。
我建议您聚合到一个中介类中,该类包含流中的所有值,并且还具有用于过滤其他日期的数据的逻辑。如果我理解正确的话,那就是在“batch”类型的最后一个数据之前丢弃所有数据。
虽然在Kotlin,我做了一个类似的解决方案,你可以看看,如果你需要。

ia2d9nvy

ia2d9nvy2#

您可以做一些事情,但我建议使用时间窗口流。您可以将时间设置为1天的滚动窗口,并在该流上执行一次僭越。最终,您将在ktable中的自己的窗口中聚合每天。这样,您就不必担心丢弃数据了(尽管您可以这样做),而且每天都是分开的。
这里有几个很好的例子说明它们是如何工作的:https://www.programcreek.com/java-api-examples/?api=org.apache.kafka.streams.kstream.timewindows

相关问题