DebeziumMySQL连接器可以按事件的操作类型将数据更改事件路由到不同的主题吗?

qfe3c7zg  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(317)

debezium支持三种类型的数据更改事件:
插入
删除
更新
我知道有一个 op 在debezium发布的消息的有效负载中标识事件类型的字段,但我想知道,是否还有其他方法可以按操作类型将这三种类型的数据更改事件路由到不同的kafka主题,如smt?

3mpgtkmj

3mpgtkmj1#

单消息转换

正如您所建议的,这里使用单个消息转换是一个很好的选择。debezium有一个转换,目前在beta中称为 ContentBasedRouter 使用它,您可以使用包括groovy在内的语言对路由进行编码。

ksqldb公司

您可以使用ksqldb执行此操作:

-- Declare source topic from Debezium as ksqlDB stream
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='asgard.demo.ORDERS', VALUE_FORMAT='AVRO');

-- Create three streams (backed by Kafka topics) based on the op-type
CREATE STREAM ORDERS_UPDATES AS SELECT * FROM ORDERS WHERE OP='u';
CREATE STREAM ORDERS_DELETES AS SELECT * FROM ORDERS WHERE OP='d';
CREATE STREAM ORDERS_CREATES AS SELECT * FROM ORDERS WHERE OP='c';

查看数据

ksql> SHOW TOPICS;

 Kafka Topic                           | Partitions | Partition Replicas
-------------------------------------------------------------------------
 ORDERS_CREATES                        | 1          | 1
 ORDERS_DELETES                        | 1          | 1
 ORDERS_UPDATES                        | 1          | 1

检查计数

ksql> SELECT OP,COUNT(*) AS EVENTS FROM ORDERS GROUP BY OP EMIT CHANGES;
+-------+----------+
|OP     |EVENTS    |
+-------+----------+
|u      |3         |
|c      |502       |
|d      |5         |

ksql> SELECT 'ORDERS_UPDATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
        FROM ORDERS_UPDATES GROUP BY 'ORDERS_UPDATES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME      |EVENT_COUNT  |
+----------------+-------------+
|ORDERS_UPDATES  |3            |
Limit Reached
Query terminated

ksql> SELECT 'ORDERS_CREATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
        FROM ORDERS_CREATES GROUP BY 'ORDERS_CREATES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME      |EVENT_COUNT  |
+----------------+-------------+
|ORDERS_CREATES  |503          |
Limit Reached
Query terminated

ksql> SELECT 'ORDERS_DELETES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
        FROM ORDERS_DELETES GROUP BY 'ORDERS_DELETES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME      |EVENT_COUNT  |
+----------------+-------------+
|ORDERS_DELETES  |5            |
Limit Reached
Query terminated

相关问题