我使用合流源连接器将postgres中的数据复制到kafka
下面是我想复制给Kafka的表格:
CREATE TABLE public.format_ts_v
(
alerts json,
updated timestamp with time zone NOT NULL DEFAULT now()
)
警报字段是json,格式如下
{"alerts":[{"timedate":4587052516256796700,"elementName":"pariatur commodo","description":"ex eiusmod consectetur"}]
我创建了一条流
CREATE STREAM alerts_stream_json_array
(alerts ARRAY<STRUCT<
timedate INT,
elementName VARCHAR,
description VARCHAR>>,
updated INT)
WITH (KAFKA_TOPIC='cdc1_format_ts_v', VALUE_FORMAT='JSON');
并使用以下命令查看数据
SELECT EXPLODE(alerts)->timedate AS timedate,
EXPLODE(alerts)->elementName AS elementName,
EXPLODE(alerts)->description AS description
FROM alerts_stream_json_array
EMIT CHANGES;
如果我不使用源连接器直接向Kafka插入一条消息,效果会很好
/kafkacat/kafkacat/kafkacat -b 127.0.0.1:9092 -P -t cdc1_format_ts_v <<EOF
{"alerts":{"timedate":4587052516256796700,"elementName":"pariatur commodo","description":"ex eiusmod consectetur"}}
EOF
但是,当数据与源连接器一起插入时,一个额外的引号被添加到json字符串中,ksqlselect不再工作
{"alerts":"[{\x5C"timedate\x5C":4587052516256796700,"elementName\x5C":\x5C"pariatur commodo\x5C",\x5C"description\x5C":\x5C"ex eiusmod consectetur\x5C"}]","updated":1594909653789}
如何避免在json中使用引号?我可以让它工作,甚至与报价?
谢谢
暂无答案!
目前还没有任何答案,快来回答吧!