-- Declare source topic from Debezium as ksqlDB stream
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='asgard.demo.ORDERS', VALUE_FORMAT='AVRO');
-- Create three streams (backed by Kafka topics) based on the op-type
CREATE STREAM ORDERS_UPDATES AS SELECT * FROM ORDERS WHERE OP='u';
CREATE STREAM ORDERS_DELETES AS SELECT * FROM ORDERS WHERE OP='d';
CREATE STREAM ORDERS_CREATES AS SELECT * FROM ORDERS WHERE OP='c';
1条答案
按热度按时间3mpgtkmj1#
单消息转换
正如您所建议的,这里使用单个消息转换是一个很好的选择。debezium有一个转换,目前在beta中称为
ContentBasedRouter
使用它,您可以使用包括groovy在内的语言对路由进行编码。ksqldb公司
您可以使用ksqldb执行此操作:
查看数据
检查计数