我用python将数据和模式放入kafka和模式注册表。
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"type":"record",
"name":"myrecord",
"fields":[
{
"name":"ID",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"PRODUCT",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"QUANTITY",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"PRICE",
"type":[
"null",
"int"
],
"default":null
}
]
}
"""
key_schema_str = """
{
"type":"record",
"name":"key_schema",
"fields":[
{
"name":"ID",
"type":"int"
}
]
}
"""
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
if __name__ == '__main__':
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"ID": 199 , "PRODUCT":"Yagiz Gulbahar", "QUANTITY":1453,"PRICE":61}
key = {"ID": 199}
avroProducer = AvroProducer({
'bootstrap.servers': '10.0.0.0:9092',
'on_delivery': delivery_report,
'schema.registry.url': 'http://10.0.0.0:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='ersin_test_2', key=key, value=value)
avroProducer.flush()
我可以只放模式而不放数据吗?
提前谢谢
1条答案
按热度按时间xam8gpfp1#
您还可以使用
confluent-kafka-python
或者python-schema-registry-client
.使用
confluent-kafka-python
```from confluent_kafka.avro import SchemaRegistryClient, Schema
schema_str = """
{
"type": "record",
"name": "user",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "age", "type": "int"}
]
}
"""
avro_schema = Schema(schema_str, 'AVRO')
sr = SchemaRegistryClient("http://schema-registry-host:8081")
_schema_id = client.register_schema("yourSubjectName", avro_schema)