如何加入Kafka的两个流?

8hhllhi2  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(306)

学习Kafka流,尝试在5分钟的窗口内连接两个流(json值)。我的理解是对值使用相同的键来匹配联接条件。如果我的理解是正确的,像钥匙一样只能加入,对吗?如果是的话,我该如何连接json值。即。 Stream1: Key=a, value={a,b,c}. Stream2: Key=a, value={x} and key=a, value={y}. Expected o/p: {a,b,c,x} and {a,b,c,y}. 要做到这一点,我的valuejoiner应该是什么样子。帮我拿这个。我的示例代码:

KStream<String, JsonNode> resultStream = stream1.leftJoin(stream2,
                new ValueJoiner<JsonNode, JsonNode, JsonNode>() {
                    @Override
                    public JsonNode apply(JsonNode value1, JsonNode value2) {
                        if (value1 != null && value2 != null) {

                            return value1;
                        }
                        return null;
                    }
                }, JoinWindows.of(TimeUnit.SECONDS.toMillis(20)), Joined.with(Serdes.String(), /* key */
                        jsonSerde, /* left value */
                        jsonSerde) /* right value */
        );
fwzugrvs

fwzugrvs1#

您对连接如何工作的理解是正确的(假设记录时间戳different小于连接窗口大小)。
要操作jsonnode,只需搜索internet:如何在java中修改jsonnode?

相关问题