管道未使用过程将数据摄取到memsql表中

rggaifut  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(531)

我将json(20个键值对)推入kafka,并能够使用它,并对它进行了良好的测试,以验证数据是否成功地被推入kafka。
下面的脚本正在创建一个管道,但它没有将数据加载到memsql表中。我是否需要修改json数据类型的创建管道脚本。

CREATE OR REPLACE PIPELINE omnitracs_gps_evt_pipeline
AS LOAD DATA KAFKA '192.168.188.110:9092/ib_Omnitracs' 
INTO procedure INGEST_OMNITRACS_EVT_PROC;

DELIMITER //
CREATE OR REPLACE PROCEDURE INGEST_OMNITRACS_EVT_PROC(batch query(evt_json json))
AS
BEGIN
    INSERT INTO TEST(id, name) 
      SELECT evt_json::ignition,evt_json::positiontype
      FROM batch;
      ECHO SELECT 'HELLO';
END
//
DELIMITER ; 

TEST PIPELINE omnitracs_gps_evt_pipeline LIMIT 5;
START PIPELINE omnitracs_gps_evt_pipeline FOREGROUND LIMIT 5 BATCHES;

有人能帮我一下吗。

ig9co6j1

ig9co6j11#

从kafka中的生产者中删除producerconfig.transactional\u id\u config配置后,管道现在正在工作。

CREATE PIPELINE FEB13_PIPELINE_2
AS LOAD DATA KAFKA '192.168.188.110:9092/FEB13_PROC' 
INTO procedure INGEST_EVT_PROC;

DELIMITER //
CREATE OR REPLACE PROCEDURE INGEST_EVT_PROC(batch query(evt_json json))
AS
BEGIN
    INSERT INTO TEST_FEB13(ID, NAME) 
      SELECT evt_json::ID,evt_json::NAME
      FROM batch;
END
//
DELIMITER ;

只是一个小小的疑问,现在连双引号都被添加到表列中了。如何逃离它。发送给Kafka的json:“{id':1,'name':'a'}”

hs1rzwqc

hs1rzwqc2#

您可能应该修改create pipeline的as load data子句以执行本机json加载,如下所述:https://docs.memsql.com/sql-reference/v6.7/load-data/#json-加载数据。
原因有两个:
所编写的管道将期望来自kafka的输入是带有1个字段的tsv格式。tsv是默认格式,它推断从参数到目标存储过程的预期字段数。实际上,输入json记录很可能会成功地进行解析,但我并不依赖于此。
使用本机json管道的subvalue\u mapping子句来提取和插入::ignition和::positiontype会更有效,完全跳过存储过程的开销。另外,所编写的管道将示例化内存中的临时json数据结构,这是相对昂贵的。
我建议如下:

CREATE OR REPLACE PIPELINE omnitracs_gps_evt_pipeline
AS LOAD DATA KAFKA '192.168.188.110:9092/ib_Omnitracs' 
INTO TABLE TEST
FORMAT JSON
( 
  id <- ignition_event,
  name <- position_type
);
ocebsuys

ocebsuys3#

在管道的存储过程中不允许echo select。当你运行启动管道时,你应该得到一个错误。。。前台,或在创建管道时(如果已定义过程)。

相关问题