ksql:将多个子记录附加到父记录

lawou6xi  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(277)

我正在尝试使用ksql(作为confluent-5.0.0的一部分)从一组父记录和子记录中创建一个记录,其中每个父记录都有多个子记录(特别是支付详细信息和参与支付的各方)。这些父/子记录由父/子记录的id链接。为了说明这一点,我将在源系统中处理大致这种结构的记录:

payment:
| id    | currency | amount | payment_date |
|------------------------------------------|
| pmt01 | USD      | 20000  | 2018-11-20   |
| pmt02 | USD      | 13000  | 2018-11-23   |

payment_parties:
| id    | payment_id | party_type   | party_ident | party_account |
|-----------------------------------------------------------------|
| prt01 | pmt01      | sender       | XXYYZZ23    | (null)        |
| prt02 | pmt01      | intermediary | AADDEE98    | 123456789     |
| prt03 | pmt01      | receiver     | FFGGHH56    | 987654321     |
| prt04 | pmt02      | sender       | XXYYZZ23    | (null)        |
| prt05 | pmt02      | intermediary | (null)      | (null)        |
| prt06 | pmt02      | receiver     | FFGGHH56    | 987654321     |

这些记录以avro格式加载到使用oracle golden gate的一组kafka主题中,每个表有一个主题。这意味着存在以下主题: src_payment 以及 src_payment_parties . 根据源系统的工作方式,这些记录的时间戳在几毫秒内。
现在,我们的目的是将这些记录“展平”为单个记录,这些记录将从传出的主题中使用。为了说明,对于上述记录,所需的输出将沿着以下几行:

payment_flattened:
| id    | currency | amount | payment_date | sender_ident | sender_account | intermediary_ident | intermediary_account | receiver_ident | receiver_account |
|----------------------------------------------------------------------------------------------------------------------------------------------------------|
| pmt01 | USD      | 20000  | 2018-11-20   | XXYYZZ23     | (null)         | AADDEE98           | 123456789            | FFGGHH56       | 987654321        |
| pmt02 | USD      | 13000  | 2018-11-23   | XXYYZZ23     | (null)         | (null)             | (null)               | FFGGHH56       | 987654321        |

我想在这里问的第一个问题是:如何才能最好地实现源主题数据的这种组合*
当然,我自己也尝试过一些行动。为了简洁起见,我将描述在付款记录中添加第一个付款方所达到的效果。
第一步:设置源流
注意:由于ogg安装程序在avro模式中添加了一个名为“table”的属性,因此我必须指定要从主题中获取的字段。另外,我对指定操作类型的字段(例如insert或update)不感兴趣。

create stream payment_stream (id varchar, currency varchar, amount double, \
payment_date varchar) with (kafka_topic='src_payment',value_format='avro');

create stream payment_parties_stream (id varchar, payment_id varchar, party_type varchar, \
party_ident varchar, party_account varchar) with (kafka_topic='src_payment_parties',\
value_format='avro');

第二步:为付款发送者创建流
注意:根据我从文档中收集到的信息,并通过实验发现,为了能够将支付流连接到支付方流,后者需要按支付id进行分区。此外,我使连接生效的唯一方法是重命名列。

create stream payment_sender_stream as select payment_id as id, party_ident, \
party_account from payment_parties_stream where party_type = 'sender' partition by id;

第三步:加入两条流
注意:我使用的是左连接,因为不是所有的参与者都出席了每一笔付款。如上面的示例记录,其中 pmt02 没有中间人。

create stream payment_with_sender as select pmt.id as id, pmt.currency, pmt.amount, \
pmt.payment_date, snd.party_ident, snd.party_account from payment_stream pmt left join \
payment_sender_stream snd within 1 seconds on pmt.id = snd.id;

现在,我希望这个流的输出是这样的:

ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null

相反,我看到的结果是:

ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | null | null
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | null | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null

因此,我想问的第二个(两部分)问题是:为什么左连接会产生这些重复的记录?这能避免吗
为文字墙致歉,我尽量完整的描述问题。当然,我很乐意添加任何可能丢失的信息,并尽我所知回答有关设置的问题。

sqxo8psd

sqxo8psd1#

你就快到了:-) WITHIN 1 SECONDS 将为您提供从连接的两侧触发的结果。
相反,试试看 WITHIN (0 SECONDS, 1 SECONDS) . 然后,只有连接右侧的记录才会连接到左侧,反之亦然。
你可以在我写的文章中读到更多关于这种模式的内容。
顺便说一句,如果你想在 table ogg的保留字问题,您可以设置 includeTableNamefalse 在gg配置中。

相关问题