连接ksql中两个(或多个)kafka主题的最佳方式,该主题将从所有主题发出更改?

ghg1uchk  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(381)

我们有一个“微服务”平台,我们正在使用debezium从这些平台上的数据库中捕获变化数据,这很好地工作。
现在,我们想让我们更容易加入这些主题,并将结果流成一个新的主题,这个主题可以被多个服务使用。
免责声明:这假设v0.11 ksqldb和cli(似乎这在旧版本中可能不起作用)
来自两个数据库示例的两个表的示例,它们流到kafka主题中:

-- source identity microservice (postgres)
CREATE TABLE public.user_entity (
    id varchar(36) NOT NULL,
    first_name varchar(255) NULL,
    PRIMARY KEY (id)
);
-- ksql stream 
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');

-- source organization microservice (postgres)
CREATE TABLE public.user_info (
    id varchar(36) NOT NULL,
    user_entity_id varchar(36) NOT NULL,
    business_unit varchar(255) NOT NULL,
    cost_center varchar(255) NOT NULL,
    PRIMARY KEY (id)
);
-- ksql stream 
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');

方案1:溪流

CREATE STREAM stream_user_info_by_user_entity_id
AS SELECT * FROM stream_user_info
PARTITION BY user_entity_id
EMIT CHANGES;

SELECT 
    user_entity_id,
    first_name,
    business_unit,
    cost_center
FROM stream_user_entity ue
LEFT JOIN stream_user_info_by_user_entity_id ui WITHIN 365 DAYS ON ue.id = ui.user_entity_id 
EMIT CHANGES;

通知 WITHIN 365 DAYS ,从概念上讲,这些表可以运行很长时间而不被更改,因此这个窗口在技术上是无限大的。这看起来可疑,似乎暗示这不是一个好办法。
选项2:表格

CREATE TABLE ktable_user_info_by_user_entity_id (
    user_entity_id,
    first_name,
    business_unit,
    cost_center
)
with (KAFKA_TOPIC='stream_user_info_by_user_entity_id', value_format='avro');

SELECT 
    user_entity_id,
    first_name,
    business_unit,
    cost_center
FROM stream_user_entity ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON ue.id = ui.user_entity_id 
EMIT CHANGES;

我们不再需要Windows了 WITHIN 365 DAYS ,所以感觉更正确。但是,这只在消息被发送到流而不是表时发出更改。
在本例中:用户更新第一个\u名称->已发出更改用户更新业务\u单位->未发出更改
也许有一种方法可以创建一个由用户实体id划分的合并流,并连接到子表,该子表将保存当前状态,这使我。。。。
选项3:合并流和表

-- "master" change stream with merged stream output
CREATE STREAM stream_user_changes (user_entity_id VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes SELECT id as user_entity_id FROM stream_user_entity;
INSERT INTO stream_user_changes SELECT user_entity_id FROM stream_user_info;

CREATE STREAM stream_user_entity_by_id
AS SELECT * FROM stream_user_entity
PARTITION BY id
EMIT CHANGES;

CREATE TABLE ktable_user_entity_by_id (
    id VARCHAR PRIMARY KEY,
    first_name VARCHAR
) with (KAFKA_TOPIC='stream_user_entity_by_id', value_format='avro');

SELECT 
    uec.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_changes uec
LEFT JOIN ktable_user_entity_by_id ue ON uec.user_entity_id = ue.id
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

这个看起来最好,但是每个表都有很多移动组件我们有2个流,1个insert查询,1个ktable。这里的另一个潜在问题可能是隐藏的竞争条件,即流在表被更新之前发出更改。
选项4:更多合并的表和流

CREATE STREAM stream_user_entity_changes_enriched
AS SELECT 
    ue.id AS user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_by_id ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

CREATE STREAM stream_user_info_changes_enriched
AS SELECT 
    ui.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_info_by_user_entity_id ui
LEFT JOIN ktable_user_entity_by_id ue ON ui.user_entity_id = ue.id
EMIT CHANGES;

CREATE STREAM stream_user_changes_enriched (user_entity_id VARCHAR, first_name VARCHAR, business_unit VARCHAR, cost_center VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes_enriched', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_entity_changes_enriched;
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_info_changes_enriched;

这与前面的概念相同,但“合并”发生在连接之后。可以想象,这可能会消除任何潜在的竞争条件,因为我们主要从流而不是表中进行选择。
缺点是,复杂性甚至比选项3还要糟糕,为任何包含两个以上表的连接编写和跟踪所有这些流都会让人有点麻木。。。
问题:对于这个用例,什么方法是最好的,或者我们正在尝试做一些不应该使用ksql的事情?我们最好还是把它转移到传统的rdbms或spark替代品上?

z8dt9xmd

z8dt9xmd1#

我要试着回答我自己的问题,只有在投票赞成的情况下才接受。
答案是:方案3
下面是这个用例的原因这将是最好的,虽然可能是主观的
由主键和外键划分的流是常见且简单的。
基于这些流的表既常见又简单。
以这种方式使用的表将不是竞争条件。
所有选项都有优点,例如,如果您不关心发出所有更改,或者数据的行为类似于流(日志或事件),而不是缓慢变化的维度(sql表)。
至于“竞争条件”,“表”这个词欺骗了你的头脑,你实际上是在处理和持久化数据。实际上,它们实际上不是物理表,它们的行为更像流上的子查询。注意:对于实际生成主题的聚合表来说,这可能是一个例外(我认为这是一个不同的主题,但希望看到注解)
最后(语法可能有一些小错误):

---------------------------------------------------------
-- shared objects (likely to be used by multiple queries)
---------------------------------------------------------

-- shared streams wrapping topics
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');

-- shared keyed streams (i like to think of them as "indexes")
CREATE STREAM stream_user_entity_by_id AS 
SELECT * FROM stream_user_entity PARTITION BY id
EMIT CHANGES;
CREATE STREAM stream_user_info_by_user_entity_id AS 
SELECT * FROM stream_user_info PARTITION BY user_entity_id
EMIT CHANGES;

-- shared keyed tables (inferring columns with schema registry)
CREATE TABLE ktable_user_entity_by_id (id VARCHAR PRIMARY KEY) 
WITH (KAFKA_TOPIC='stream_user_entity_by_id', value_format='avro');
CREATE TABLE ktable_user_info_by_user_entity_id (user_entity_id VARCHAR PRIMARY KEY) 
WITH (KAFKA_TOPIC='stream_user_info_by_user_entity_id', value_format='avro');

---------------------------------------------------------
-- query objects (specific to the produced data)
---------------------------------------------------------
-- "master" change stream (include all tables in join)
CREATE STREAM stream_user_changes (user_entity_id VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes SELECT id as user_entity_id FROM stream_user_entity;
INSERT INTO stream_user_changes SELECT user_entity_id FROM stream_user_info;

-- pretty simple looking query
SELECT 
    uec.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_changes uec
LEFT JOIN ktable_user_entity_by_id ue ON uec.user_entity_id = ue.id
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

“共享”对象基本上是流模式(诱惑是为所有主题创建,但这是另一个问题),第二部分类似于查询模式。它最终是一个功能性的、干净的、可重复的模式。

相关问题