我尝试将JSON数据从Kafka复制到vertica。
COPY public.from_kafka
SOURCE KafkaSource(stream='example_data|0|-2, example_data|1|-2',
brokers='kafka01.example.com:9092',
duration=interval '10000 milliseconds') PARSER KafkaJSONParser()
REJECTED DATA AS TABLE public.rejections;
主题中的每个消息看起来都像这样:
{"location_id":30277, "start_date":1667911800000}
当我运行查询时,没有创建新行。当我检查rejections
表时,我看到以下rejected_reason
:
Missing or null value for column with NOT NULL constraint [start_date]
但是rejected_data
是{"location_id":30277, "start_date":1667911800000}
为什么Vertica无法识别start_date
字段?我该如何解决该问题?
垂直表:
CREATE TABLE public.from_kafka
(
location_id int NOT NULL,
start_date timestamp NOT NULL
)
CREATE PROJECTION public.from_kafka /*+createtype(L)*/
(
location_id ENCODING RLE,
start_date ENCODING GCDDELTA
)
AS
SELECT from_kafka.location_id,
from_kafka.start_date,
FROM public.from_kafka
ORDER BY from_kafka.start_date,
from_kafka.location_id
SEGMENTED BY hash(from_kafka.location_id, from_kafka.start_date) ALL NODES KSAFE 1;
编辑-解决方案
PARSER KafkaJSONParser()
不知道如何将long转换为timestamp,因为我必须使用java转换JSON消息,将更新后的JSON插入到新的主题中,然后使用KafkaJSONParser()
函数
1条答案
按热度按时间ia2d9nvy1#
在 任何 SQL 数据 库 中 , 时间 戳 都 是 时间 戳 , 而 不是 整数 。
要 加载 JSON 格式 并 具有 时间 戳 , 请 重新 定义 表 以 接收 整数 并 将 其 动态 转换 为 时间 戳 。
我 从 文件 中 做 , 在 这里 , 但 它 将 工作 与 Kafka 流 , 太 。
中 的 每 一 个
下面 是 我 使用 的 JSON 文件 :
格式
这 是 我 使用 的 copy 命令 :
格式
最 后 ,
from_kafka
将 包含 以下 内容 :格式