java—这种聚合在kafka流中是如何工作的?

iezvtpos  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(410)

我是Apache·Kafka的新手。我阅读了steam应用程序的代码,偶然发现了聚合操作。我试着自己去理解它,如果我的解释是正确的,我需要确认。
下面提供了从主题和聚合中读取的代码段,

// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

KStreamBuilder builder = new KStreamBuilder();

// read from the topic 'bank-transactions' as `KStream`. I provided the producer below 
KStream<String, JsonNode> bankTransactions = builder.stream(Serdes.String(), jsonSerde, "bank-transactions");

// we define the grouping and aggregation here 
KTable<String, JsonNode> bankBalance = bankTransactions.groupByKey(Serdes.String(), jsonSerde)
    .aggregate(
            () -> initialBalance,
            (key, transaction, balance) -> newBalance(transaction, balance),
            jsonSerde,
            "bank-balance-agg"
    );

数据流到 bank-transactions 主题如下:,

public static ProducerRecord<String, String> newRandomTransaction(String name) {
    // creates an empty json {}
    ObjectNode transaction = JsonNodeFactory.instance.objectNode();

    Integer amount = ThreadLocalRandom.current().nextInt(0, 100);

    // Instant.now() is to get the current time using Java 8
    Instant now = Instant.now();

    // we write the data to the json document
    transaction.put("name", name);
    transaction.put("amount", amount);
    transaction.put("time", now.toString());

    return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}

初始余额如下:,

// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();

initialBalance.put("count", 0);
initialBalance.put("balance", 0);
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());

这个 newBalance 方法获取事务和余额并返回新的余额,

private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
    // create a new balance json object
    ObjectNode newBalance = JsonNodeFactory.instance.objectNode();

    newBalance.put("count", balance.get("count").asInt() + 1);
    newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());

    Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
    Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();

    Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
    newBalance.put("time", newBalanceInstant.toString());

    return newBalance;
}

我有两个关于分组和聚合的问题,
答。是 groupByKey 是按 Serdes.String() 以及 jsonSerde 是否只对steam数据执行序列化和反序列化?这个 Serdes.String() 中的名称字符串 newRandomTransaction 方法。
b。我的主张是正确的 key, transaction 内部 aggregation 线路功能 (key, transaction, balance) -> newBalance(transaction, balance) 是从 bank-transactions 主题和主题 balance 是从 initialBalance 从上一行开始。对吗?
在调试应用程序时,我也感到困惑,尽管它可以无缝运行。

ygya80vv

ygya80vv1#

groupbykey是否由serdes.string()进行分组,而jsonserde只对steam数据执行序列化和反序列化?
是的,groupbykey是按键分组的,这些键可以作为字符串进行反序列化和比较
我的Assert是行(key,transaction,balance)聚合函数中的key,transaction->newbalance(transaction,balance)是从bank transactions主题读取的,余额来自前一行的initialbalance
几乎。初始值设定项位于第一个参数上,是的,但是聚合的结果在应用程序的整个执行过程中进行,无休止地聚合。
换句话说,你从 initialBalance 总是,那么对于每个相同的键,你加上 transaction 的余额 balance 为了那把钥匙。如果您还没有看到该键被重复,只有这样它才会被添加到初始余额中
是的,您的输入主题是由kstreams指定的 builder.stream 方法

相关问题