kafka stream dsl非键联接当前解决方法说明

pu82cl6c  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(329)

我正在努力理解以下内容中提到的解决方法:
https://issues.apache.org/jira/browse/kafka-3705
如在
如今,在kafka streams dsl中,ktable连接仅基于密钥。如果用户希望通过键a将ktable a与另一个ktable b连接起来,但是要使用“外键”a,并且假设他们是从分别在a和b上划分的两个主题中读取的,那么他们需要执行以下模式:

tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"

tableA.join(tableB', joiner);

我很难理解到底发生了什么。
尤其是这句话让人困惑:“如果用户想通过键a将一个ktable a与另一个ktable b连接起来,但是要使用“外键”a”。我也不明白上面的代码。
有人能澄清一下这里发生了什么事吗?
这里也提到了这一点:
缩小流中ktable和关系数据库中表的语义之间的差距。通常的做法是将rdbms中表的更改捕获到kafka主题中(JDBCConnect、debezium、maxwell)。这些实体通常具有多个一对多关系。通常RDBMS提供了很好的支持,可以通过连接来解决这种关系。streams在这里是不够的,而且解决方法(groupby-join-lateral视图)也没有得到很好的支持,不符合基于记录的处理的思想。https://cwiki.apache.org/confluence/display/kafka/kip-213+support+non-key+joining+in+ktable
什么意思(分组-连接-侧视图)?我怀疑它与上面的代码有关,但还是有点难以理解。有人能解释一下吗?

e0uiprwp

e0uiprwp1#

下面的代码是用非键连接两个ktable的伪代码:

tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"

tableA.join(tableB', joiner);

说明:
比如说,tablea有一个关键字段“a”。为了将另一个ktable与tablea连接起来,它应该是共分区的。它应该有相同的钥匙。因此,我们将用字段“a”重新设置ktable tableb的键

tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"
``` `groupBy()` 是的缩写 `selectKey()+ groupByKey()` 操作。 `groupBy(/* select on field "a" */)` 将在字段“a”上重新设置tableb的键,并按该键分组。因此,现在有了一个kgroupedtable,其中字段“a”是键。为了获得ktable,需要对此调用.aggregate()。以上代码就是这样。
附笔。 `.agg()` 应重命名为 `.aggregate()` 一旦tableb'准备就绪,就可以使用下面的代码与tablea连接。

tableA.join(tableB', joiner);

这里joiner指的是 `ValueJoiner` 实施。
例子:

// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.join(right,
/* Below line is ValueJoiner */
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue
);

目前,这是在ktables上进行非键联接的方法,您可以在文档中找到很好的解释:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-K表联接

相关问题