从json创建模式,用kafka sink将parquet写入hdfs

qvk1mo1f  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(137)

我用的是Kafkahdfs连接。
我想从kafka主题的json中编写Parquet文件。
我想用“schema”、“payload”创建json,如下所示(来自so问题):

{
"schema": {
    "type": "struct",
    "fields": [{
        "type": "int32",
        "optional": true,
        "field": "c1"
    }, {
        "type": "string",
        "optional": true,
        "field": "c2"
    }, {
        "type": "int64",
        "optional": false,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "create_ts"
    }, {
        "type": "int64",
        "optional": false,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "update_ts"
    }],
    "optional": false,
    "name": "foobar"
},
"payload": {
    "c1": 10000,
    "c2": "bar",
    "create_ts": 1501834166000,
    "update_ts": 1501834166000
}
}

有一个自动工具是用kafka connect类型从json创建模式?
我的属性看起来像:

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
flush.size=3
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
schema.compatability=BACKWARD
key.converter.schemas.enabled=false
value.converter.schemas.enabled=false
schemas.enable=false

在创建模式之后,我应该更改/添加什么?
谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题