我是Apache·Kafka的新手。我阅读了steam应用程序的代码,偶然发现了聚合操作。我试着自己去理解它,如果我的解释是正确的,我需要确认。
下面提供了从主题和聚合中读取的代码段,
// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStreamBuilder builder = new KStreamBuilder();
// read from the topic 'bank-transactions' as `KStream`. I provided the producer below
KStream<String, JsonNode> bankTransactions = builder.stream(Serdes.String(), jsonSerde, "bank-transactions");
// we define the grouping and aggregation here
KTable<String, JsonNode> bankBalance = bankTransactions.groupByKey(Serdes.String(), jsonSerde)
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
jsonSerde,
"bank-balance-agg"
);
数据流到 bank-transactions
主题如下:,
public static ProducerRecord<String, String> newRandomTransaction(String name) {
// creates an empty json {}
ObjectNode transaction = JsonNodeFactory.instance.objectNode();
Integer amount = ThreadLocalRandom.current().nextInt(0, 100);
// Instant.now() is to get the current time using Java 8
Instant now = Instant.now();
// we write the data to the json document
transaction.put("name", name);
transaction.put("amount", amount);
transaction.put("time", now.toString());
return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}
初始余额如下:,
// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
initialBalance.put("count", 0);
initialBalance.put("balance", 0);
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());
这个 newBalance
方法获取事务和余额并返回新的余额,
private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
// create a new balance json object
ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
newBalance.put("count", balance.get("count").asInt() + 1);
newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());
Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();
Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
newBalance.put("time", newBalanceInstant.toString());
return newBalance;
}
我有两个关于分组和聚合的问题,
答。是 groupByKey
是按 Serdes.String()
以及 jsonSerde
是否只对steam数据执行序列化和反序列化?这个 Serdes.String()
中的名称字符串 newRandomTransaction
方法。
b。我的主张是正确的 key, transaction
内部 aggregation
线路功能 (key, transaction, balance) -> newBalance(transaction, balance)
是从 bank-transactions
主题和主题 balance
是从 initialBalance
从上一行开始。对吗?
在调试应用程序时,我也感到困惑,尽管它可以无缝运行。
1条答案
按热度按时间ygya80vv1#
groupbykey是否由serdes.string()进行分组,而jsonserde只对steam数据执行序列化和反序列化?
是的,groupbykey是按键分组的,这些键可以作为字符串进行反序列化和比较
我的Assert是行(key,transaction,balance)聚合函数中的key,transaction->newbalance(transaction,balance)是从bank transactions主题读取的,余额来自前一行的initialbalance
几乎。初始值设定项位于第一个参数上,是的,但是聚合的结果在应用程序的整个执行过程中进行,无休止地聚合。
换句话说,你从
initialBalance
总是,那么对于每个相同的键,你加上transaction
的余额balance
为了那把钥匙。如果您还没有看到该键被重复,只有这样它才会被添加到初始余额中是的,您的输入主题是由kstreams指定的
builder.stream
方法