confluentjdbc-sink用法?

yws3nbqq  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(330)

我使用conluent jdbc sink将数据从kafka加载到oracle。
但是我用数据来写我的模式。
我不想用数据来编写模式,如何才能在kafka主题上编写模式,然后只从我的客户机发送数据?
提前谢谢
json数据

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "field": 'ID',
                "type": "int32",
                "optional": False
            },
            {
                "field": 'PRODUCT',
                "type": "string",
                "optional": True
            },
            {
                "field": 'QUANTITY',
                "type": "int32",
                "optional": True
            },
            {
                "field": 'PRICE',
                "type": "int32",
                "optional": True
            }
        ],
        "optional": True,
        "name": "myrecord"
    },
    "payload": {
        "ID": 1071,
        "PRODUCT": 'ersin',
        "QUANTITIY": 1071,
        "PRICE": 1453
   }

python代码:

producer.send(topic, key=b'1071'
              , value=json.dumps(v, default=json_util.default).encode('utf-8'))

我怎样才能解决这个问题?
提前谢谢

kqlmhetl

kqlmhetl1#

如果要使用jdbc接收器连接器,必须提供模式。这可以通过三种方式实现:
启用模式时使用json
使用avro和schema注册表
将json模式与模式注册表一起使用
您当前使用的json启用了模式,需要将模式与实际负载一起发送。实现需求的唯一方法是使用avro和confluent schema registry,以便在schema registry中注册模式。这样,就不需要每次都发送有效负载模式。
另一种选择是将json与schema registry(#1289)结合使用。对于Kafka连接,您可以使用 JsonSchemaConverter 对于java消费者和生产者,您可以使用 KafkaJsonSchemaSerializer 以及 KafkaJsonSchemaDeserializer .

相关问题