kstream leftjoin具有相同密钥的kstream

8cdiaqws  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(333)

我有一个问题,我尝试左加入2流。mergedkey有100多个相同密钥的列表,datastream只有1个与mergedkey相同密钥的列表。我想输入mergedkey与datastream合并后的endstream值。

//get DataStream 
        final KStream<String, GenericRecord> DataStream = builder.stream("Datastreams");
        // Transform merged to Equals Keys to DataStream.Iot
        final KStream<String, GenericRecord> mergedKey = mergedFoIObs
                .map((key, value) -> KeyValue.pair(value.get("Datastream").toString(), value)); 
        // Join the DataStream with MergedStream

        final KStream<String, String> mergedFoIObsData = mergedKey.leftJoin(
                DataStream,
            (value, data) -> {
                try {
                    if(data != null{
                        value.put("Datastream", data.toString());
                        JSONObject jo = (JSONObject) new JSONParser().parse(value.toString());
                        return jo.toJSONString();}
                      return null

                } catch (ParseException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return null;

            }, JoinWindows.of(10000));

但我的问题是,在endstream中,我只得到一个具有正确值的列表,而其他列表的值为null。
taht意味着第一轮之后数据为空。
当我从datastream转换到ktable时,我遇到了一个问题:我没有得到正确的列表,但是只有37个列表,所以60个列表错了。
我希望你能帮助我。

eit6fx6z

eit6fx6z1#

对于kstream kstream join,是否加入取决于记录的时间戳。查看此博客文章了解更多详细信息:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
对于kstream ktable连接,这取决于ktable记录何时加载到ktable中——kafka streams尝试基于时间戳同步加载,但这是一种尽力而为的方法。因此,当ktable仍然为空时,可能会首先处理一些kstream记录。只有在处理完ktable记录后(即ktable更新并包含该记录),连续的kstream记录才会成功加入。
请注意,下一个kafka版本2.1将改进这个时间戳同步,并提供更强的保证,用户甚至可以配置保证的严格程度。

brccelvz

brccelvz2#

此外,对于此kstream的每个不满足连接 predicate 的输入记录,将使用另一个流的null值调用提供的valuejoiner
所以当数据(右值)为null时不应该返回null,应该返回value(左值)。
leftjoin上的kafka文档链接

相关问题