ApacheKafka源代码连接器在json字段周围添加引号

5hcedyr0  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(229)

我使用合流源连接器将postgres中的数据复制到kafka
下面是我想复制给Kafka的表格:

CREATE TABLE public.format_ts_v
(
    alerts json,
    updated timestamp with time zone NOT NULL DEFAULT now()
)

警报字段是json,格式如下

{"alerts":[{"timedate":4587052516256796700,"elementName":"pariatur commodo","description":"ex eiusmod consectetur"}]

我创建了一条流

CREATE STREAM alerts_stream_json_array 
      (alerts ARRAY<STRUCT<
            timedate INT, 
            elementName VARCHAR,
            description VARCHAR>>, 
       updated INT) 
WITH (KAFKA_TOPIC='cdc1_format_ts_v', VALUE_FORMAT='JSON');

并使用以下命令查看数据

SELECT EXPLODE(alerts)->timedate AS timedate,
       EXPLODE(alerts)->elementName AS elementName,
       EXPLODE(alerts)->description AS description
FROM   alerts_stream_json_array 
EMIT CHANGES;

如果我不使用源连接器直接向Kafka插入一条消息,效果会很好

/kafkacat/kafkacat/kafkacat -b 127.0.0.1:9092 -P -t cdc1_format_ts_v <<EOF
{"alerts":{"timedate":4587052516256796700,"elementName":"pariatur commodo","description":"ex eiusmod consectetur"}}
EOF

但是,当数据与源连接器一起插入时,一个额外的引号被添加到json字符串中,ksqlselect不再工作

{"alerts":"[{\x5C"timedate\x5C":4587052516256796700,"elementName\x5C":\x5C"pariatur commodo\x5C",\x5C"description\x5C":\x5C"ex eiusmod consectetur\x5C"}]","updated":1594909653789}

如何避免在json中使用引号?我可以让它工作,甚至与报价?
谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题