我使用conluent jdbc sink将数据从kafka加载到oracle。
但是我用数据来写我的模式。
我不想用数据来编写模式,如何才能在kafka主题上编写模式,然后只从我的客户机发送数据?
提前谢谢
json数据
{
"schema": {
"type": "struct",
"fields": [
{
"field": 'ID',
"type": "int32",
"optional": False
},
{
"field": 'PRODUCT',
"type": "string",
"optional": True
},
{
"field": 'QUANTITY',
"type": "int32",
"optional": True
},
{
"field": 'PRICE',
"type": "int32",
"optional": True
}
],
"optional": True,
"name": "myrecord"
},
"payload": {
"ID": 1071,
"PRODUCT": 'ersin',
"QUANTITIY": 1071,
"PRICE": 1453
}
python代码:
producer.send(topic, key=b'1071'
, value=json.dumps(v, default=json_util.default).encode('utf-8'))
我怎样才能解决这个问题?
提前谢谢
1条答案
按热度按时间kqlmhetl1#
如果要使用jdbc接收器连接器,必须提供模式。这可以通过三种方式实现:
启用模式时使用json
使用avro和schema注册表
将json模式与模式注册表一起使用
您当前使用的json启用了模式,需要将模式与实际负载一起发送。实现需求的唯一方法是使用avro和confluent schema registry,以便在schema registry中注册模式。这样,就不需要每次都发送有效负载模式。
另一种选择是将json与schema registry(#1289)结合使用。对于Kafka连接,您可以使用
JsonSchemaConverter
对于java消费者和生产者,您可以使用KafkaJsonSchemaSerializer
以及KafkaJsonSchemaDeserializer
.