我将json(20个键值对)推入kafka,并能够使用它,并对它进行了良好的测试,以验证数据是否成功地被推入kafka。
下面的脚本正在创建一个管道,但它没有将数据加载到memsql表中。我是否需要修改json数据类型的创建管道脚本。
CREATE OR REPLACE PIPELINE omnitracs_gps_evt_pipeline
AS LOAD DATA KAFKA '192.168.188.110:9092/ib_Omnitracs'
INTO procedure INGEST_OMNITRACS_EVT_PROC;
DELIMITER //
CREATE OR REPLACE PROCEDURE INGEST_OMNITRACS_EVT_PROC(batch query(evt_json json))
AS
BEGIN
INSERT INTO TEST(id, name)
SELECT evt_json::ignition,evt_json::positiontype
FROM batch;
ECHO SELECT 'HELLO';
END
//
DELIMITER ;
TEST PIPELINE omnitracs_gps_evt_pipeline LIMIT 5;
START PIPELINE omnitracs_gps_evt_pipeline FOREGROUND LIMIT 5 BATCHES;
有人能帮我一下吗。
3条答案
按热度按时间ig9co6j11#
从kafka中的生产者中删除producerconfig.transactional\u id\u config配置后,管道现在正在工作。
只是一个小小的疑问,现在连双引号都被添加到表列中了。如何逃离它。发送给Kafka的json:“{id':1,'name':'a'}”
hs1rzwqc2#
您可能应该修改create pipeline的as load data子句以执行本机json加载,如下所述:https://docs.memsql.com/sql-reference/v6.7/load-data/#json-加载数据。
原因有两个:
所编写的管道将期望来自kafka的输入是带有1个字段的tsv格式。tsv是默认格式,它推断从参数到目标存储过程的预期字段数。实际上,输入json记录很可能会成功地进行解析,但我并不依赖于此。
使用本机json管道的subvalue\u mapping子句来提取和插入::ignition和::positiontype会更有效,完全跳过存储过程的开销。另外,所编写的管道将示例化内存中的临时json数据结构,这是相对昂贵的。
我建议如下:
ocebsuys3#
在管道的存储过程中不允许echo select。当你运行启动管道时,你应该得到一个错误。。。前台,或在创建管道时(如果已定义过程)。