当 试图 从 Kafka 复制 到 vertica 时 无法 识别 纪元 时间

lzfw57am  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(175)

我尝试将JSON数据从Kafka复制到vertica。

COPY public.from_kafka 
   SOURCE KafkaSource(stream='example_data|0|-2, example_data|1|-2',
                      brokers='kafka01.example.com:9092',
                      duration=interval '10000 milliseconds') PARSER KafkaJSONParser()
   REJECTED DATA AS TABLE public.rejections;

主题中的每个消息看起来都像这样:

{"location_id":30277, "start_date":1667911800000}

当我运行查询时,没有创建新行。当我检查rejections表时,我看到以下rejected_reason

Missing or null value for column with NOT NULL constraint [start_date]

但是rejected_data{"location_id":30277, "start_date":1667911800000}
为什么Vertica无法识别start_date字段?我该如何解决该问题?
垂直表:

CREATE TABLE public.from_kafka
(
    location_id int NOT NULL,
    start_date timestamp NOT NULL
)

CREATE PROJECTION public.from_kafka /*+createtype(L)*/ 
(
 location_id ENCODING RLE,
 start_date ENCODING GCDDELTA
)
AS
 SELECT from_kafka.location_id,
        from_kafka.start_date,
 FROM public.from_kafka
 ORDER BY from_kafka.start_date,
          from_kafka.location_id
SEGMENTED BY hash(from_kafka.location_id, from_kafka.start_date) ALL NODES KSAFE 1;

编辑-解决方案

PARSER KafkaJSONParser()不知道如何将long转换为timestamp,因为我必须使用java转换JSON消息,将更新后的JSON插入到新的主题中,然后使用KafkaJSONParser()函数

ia2d9nvy

ia2d9nvy1#

在 任何 SQL 数据 库 中 , 时间 戳 都 是 时间 戳 , 而 不是 整数 。
要 加载 JSON 格式 并 具有 时间 戳 , 请 重新 定义 表 以 接收 整数 并 将 其 动态 转换 为 时间 戳 。
我 从 文件 中 做 , 在 这里 , 但 它 将 工作 与 Kafka 流 , 太 。

-- create your table like so:
DROP TABLE IF EXISTS public.from_kafka;
CREATE TABLE public.from_kafka
(
    location_id int NOT NULL,
    start_date  int NOT NULL,
    start_date_ts timestamp DEFAULT TO_TIMESTAMP(start_date//1000)
);

中 的 每 一 个
下面 是 我 使用 的 JSON 文件 :

$ cat kafka.json
{"location_id":30277, "start_date":1667911800000},
{"location_id":30278, "start_date":1667911900000},
{"location_id":30279, "start_date":1667912000000},
{"location_id":30280, "start_date":1667912100000},
{"location_id":30281, "start_date":1667912200000},
{"location_id":30282, "start_date":1667912300000}

格式
这 是 我 使用 的 copy 命令 :

COPY public.from_kafka (
  location_id
, start_date 
)
FROM LOCAL 'kafka.json' PARSER FJsonParser(record_terminator=E'\n')                                                                                                                  
EXCEPTIONS 'kafka.log';

格式
最 后 , from_kafka 将 包含 以下 内容 :

SELECT * FROM public.from_kafka;
-- out  location_id | start_date |    start_date_ts    
-- out -------------+------------+---------------------
-- out        30277 | 1667911800 | 2022-11-08 13:50:00
-- out        30278 | 1667911900 | 2022-11-08 13:51:40
-- out        30279 | 1667912000 | 2022-11-08 13:53:20
-- out        30280 | 1667912100 | 2022-11-08 13:55:00
-- out        30281 | 1667912200 | 2022-11-08 13:56:40
-- out        30282 | 1667912300 | 2022-11-08 13:58:20

格式

相关问题