我有一个问题,我尝试左加入2流。mergedkey有100多个相同密钥的列表,datastream只有1个与mergedkey相同密钥的列表。我想输入mergedkey与datastream合并后的endstream值。
//get DataStream
final KStream<String, GenericRecord> DataStream = builder.stream("Datastreams");
// Transform merged to Equals Keys to DataStream.Iot
final KStream<String, GenericRecord> mergedKey = mergedFoIObs
.map((key, value) -> KeyValue.pair(value.get("Datastream").toString(), value));
// Join the DataStream with MergedStream
final KStream<String, String> mergedFoIObsData = mergedKey.leftJoin(
DataStream,
(value, data) -> {
try {
if(data != null{
value.put("Datastream", data.toString());
JSONObject jo = (JSONObject) new JSONParser().parse(value.toString());
return jo.toJSONString();}
return null
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}, JoinWindows.of(10000));
但我的问题是,在endstream中,我只得到一个具有正确值的列表,而其他列表的值为null。
taht意味着第一轮之后数据为空。
当我从datastream转换到ktable时,我遇到了一个问题:我没有得到正确的列表,但是只有37个列表,所以60个列表错了。
我希望你能帮助我。
2条答案
按热度按时间eit6fx6z1#
对于kstream kstream join,是否加入取决于记录的时间戳。查看此博客文章了解更多详细信息:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
对于kstream ktable连接,这取决于ktable记录何时加载到ktable中——kafka streams尝试基于时间戳同步加载,但这是一种尽力而为的方法。因此,当ktable仍然为空时,可能会首先处理一些kstream记录。只有在处理完ktable记录后(即ktable更新并包含该记录),连续的kstream记录才会成功加入。
请注意,下一个kafka版本2.1将改进这个时间戳同步,并提供更强的保证,用户甚至可以配置保证的严格程度。
brccelvz2#
此外,对于此kstream的每个不满足连接 predicate 的输入记录,将使用另一个流的null值调用提供的valuejoiner
所以当数据(右值)为null时不应该返回null,应该返回value(左值)。
leftjoin上的kafka文档链接