无法使用GPKAFKA在GreenPlum中插入数据

vc9ivgsu  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(154)

我在使用GPKAFKA从Kafka阅读数据并将数据插入GreenPlum数据库时遇到了问题。我的目标表有多个不同数据类型的列,并且GPKAFKA正在创建的外部表是我的目标表的副本,但是,当将数据推入目标表时,我遇到了错误:
调试,由于无法执行批处理而回滚批处理0:pq:avro_import:仅支持单个json列
我使用的GreenPlum数据库版本是6.19.3。以下是详细信息。如何解决此问题?
带有avro模式的json格式的Kafka消息:

{
    "key1": "try81",
    "col1": {
        "int": 1
    },
    "col2": {
        "string": "def"
    },
    "col3": {
        "string": "ghi"
    },
    "col4": {
        "string": "jkl"
    },
    "instance_id": {
        "string": "009"
    }
}

字符串
table的边缘:

CREATE TABLE
    testgpkafka
    (
        key1 CHARACTER VARYING(5) NOT NULL,
        col1 INTEGER,
        col2 CHARACTER VARYING(55),
        col3 CHARACTER VARYING(19),
        col4 CHARACTER VARYING(13),
        instance_id CHARACTER VARYING(15),
        PRIMARY KEY (key1)
    );


gpkafka.yaml文件:

DATABASE: gpdb_dev
USER: --
PASSWORD: --
HOST: --
PORT: --
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: 192.168.151.201:9092, 192.168.151.202:9092, 192.168.151.203:9092
        TOPIC: mcp_kafka_net_21.mcp.testgpkafka
        PARTITIONS: (0)
      COLUMNS:
      FORMAT: avro
      AVRO_OPTION:
        SCHEMA_REGISTRY_ADDR: http://192.168.151.201:8081   
   OUTPUT:
      SCHEMA: smi
      TABLE:  testgpkafka
      MODE:   insert


我也试过使用json类型的schema,也遇到了错误:
调试,由于无法执行批处理而回滚批处理0:pq:json_import:仅支持单个json/jsonb/gp_jsonb列
以下是Kafka的信息:

{
    "key1": "tr159",
    "col1": 1,
    "col2": "def",
    "col3": "ghi",
    "col4": "jkl",
    "instance_id": "009"
}


我也尝试过将Kafka消息转换为单个JSON列,但也不起作用。
Kafka留言:

{
    "data": {
        "key1": "tr150",
        "col1": 1,
        "col2": "def",
        "col3": "ghi",
        "col4": "jkl",
        "instance_id": "009"
    }
}

nbnkbykc

nbnkbykc1#

插入模式新功能。更稳定的是Map
像这样的东西

DATABASE: gpdb_dev
USER: --
PASSWORD: --
HOST: --
PORT: --
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: 192.168.151.201:9092, 192.168.151.202:9092, 192.168.151.203:9092
        TOPIC: mcp_kafka_net_21.mcp.testgpkafka
        PARTITIONS: (0)
    VALUE:
        COLUMNS:
          - NAME: jdata
            TYPE: gp_json # custom type from EXTENSION dataflow in GreenPlum or you can use type "json" but it crashed in null char "\0000"
      COLUMNS:
      FORMAT: avro
      AVRO_OPTION:
        SCHEMA_REGISTRY_ADDR: http://192.168.151.201:8081  
   OUTPUT:
      SCHEMA: smi
      TABLE: testgpkafka
      MAPPING: 
        - NAME: key1
          EXPRESSION: (jdata->>'key1')::varchar(5)
        - NAME: col1
          EXPRESSION: (jdata->'col1'->>'int')::integer
        - NAME: col2
          EXPRESSION: (jdata->'col2'->>'string')::varchar(55) 
        - NAME: col3
          EXPRESSION: (jdata->'col3'->>'string')::varchar(19) 
        - NAME: col4
          EXPRESSION: (jdata->'col4'->>'string')::varchar(13) 
        - NAME: instance_id
          EXPRESSION: (jdata->'col4'->>'string')::varchar(15)

字符串
或者没有像这样的最后一次变换

MAPPING: 
        - NAME: key1
          EXPRESSION: (jdata->>'key1')::varchar(5)
        - NAME: col1
          EXPRESSION: (jdata->>'col1')::integer
        - NAME: col2
          EXPRESSION: (jdata->>'col2')::varchar(55) 
        - NAME: col3
          EXPRESSION: (jdata->>'col3')::varchar(19) 
        - NAME: col4
          EXPRESSION: (jdata->>'col4')::varchar(13) 
        - NAME: instance_id
          EXPRESSION: (jdata->>'col4')::varchar(15)


您的Kafka消息对于此avro模式看起来不正确

相关问题