我是Kafka流的新手,我正在尝试使用groupby函数将一些流数据聚合到一个ktable中。问题如下:
生成的消息是json消息,格式如下:
{ "current_ts": "2019-12-24 13:16:40.316952",
"primary_keys": ["ID"],
"before": null,
"tokens": {"txid":"3.17.2493",
"csn":"64913009"},
"op_type":"I",
"after": { "CODE":"AAAA41",
"STATUS":"COMPLETED",
"ID":24},
"op_ts":"2019-12-24 13:16:40.316941",
"table":"S_ORDER"}
我想隔离json字段“after”,然后创建一个“key”=“id”的ktable,并为整个json“after”赋值。
首先,我创建了一个kstream来隔离“after”json,它工作正常。
kstream代码块:(不要注意if语句,因为“before”和“after”的格式相同。)
KStream<String, String> s_order_list = s_order
.mapValues(value -> {
String time;
JSONObject json = new JSONObject(value);
if (json.getString("op_type").equals("I")) {
time = "after";
}else {
time = "before";
}
JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
return json2.toString();
});
正如预期的那样,输出如下:
...
null {"CODE":"AAAA48","STATUS":"SUBMITTED","ID":6}
null {"CODE":"AAAA16","STATUS":"COMPLETED","ID":1}
null {"CODE":"AAAA3","STATUS":"SUBMITTED","ID":25}
null {"CODE":"AAAA29","STATUS":"SUBMITTED","ID":23}
...
之后,我实现了一个ktable-to-groupby-id的json。
K表代码块:
KTable<String, String> s_table = s_order_list
.groupBy((key, value) -> {
JSONObject json = new JSONObject(value);
return json.getString("ID");
});
我想制造一个错误 KTable<String, String>
但我在创造 GroupedStream<Object,String>
.
Required type: KTable<String,String>
Provided:KGroupedStream<Object,String>
no instance(s) of type variable(s) KR exist so that KGroupedStream<KR, String> conforms to KTable<String, String>
总之,问题是什么是kgroupedstreams以及如何正确地实现ktable?
1条答案
按热度按时间tuwxkamq1#
在groupby processor之后,可以使用有状态处理器,比如aggregate或reduce(处理器返回ktable)。你可以这样做:
stringaggregate看起来像: