kafka-ksql重划分和重密钥问题

epfja78i  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(496)

我已经定义了一条流

CREATE STREAM QUOTE (quoteId VARCHAR,
                      counterPartyId VARCHAR)
        WITH (KAFKA_TOPIC='quotes',
              VALUE_FORMAT='JSON',
              KEY='quoteId');

我想汇总到目前为止我得到了多少引用,以及那个事件的最后一个引用

CREATE TABLE KQUOTE AS
    SELECT Max(CAST(quoteId as INT)) as quoteId,COUNT(*) AS COUNT
        FROM QUOTE
        GROUP BY 1;

将此表转到流,因为我想知道聚合结果的历史记录(似乎我必须使用底层主题来创建流。无法直接从表“kquote”)创建流。

CREATE stream KQuoteStream (quoteId VARCHAR,
                      count INT)
        WITH (KAFKA_TOPIC='KQUOTE',
              VALUE_FORMAT='JSON',
              KEY='quoteId');

我希望上面使用rawkey quoteid,但事实并非如此。正如我们在下面看到的,rawkey总是1(因为我们在创建表kquote时按常量1分组)。

ksql> select * from KQuoteStream;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21

尝试按quoteid重新划分流,以将rawkey更改为quoteid

CREATE stream KQuoteStreamByQuoteId
        as
    SELECT quoteId, COUNT FROM KQuoteStream PARTITION BY quoteId;

ramkey仍然是常量1

ksql> select * from KQuoteStreamByQuoteId;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21

顺便说一句:所有的主题都和1有相同的分区,使事情更简单。有人知道吗?谢谢!

yqkkidmi

yqkkidmi1#

这绝对是一个有趣的错误,你已经浮出水面!
这里的诀窍是要明白 WITH(KEY='quoteId') 实际上并没有做任何事情,这是对ksqldb的一个提示,即key字段恰好也存在于值中 quoteId . 然后,当你 PARTITION BY quoteId ,它认为您正在按rowkey进行分区,所以它什么也不做!我同意这种行为是相当不直观的,这就是为什么我们计划删除 WITH(KEY=...) 支持更直观的功能(待定)。
同时,解决方法应该是在创建 KQuoteStream 所以ksql不会优化重分区。

相关问题