如何使用python编程注册kafka模式注册表中的avro模式

gr8qqesn  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(508)

我用python将数据和模式放入kafka和模式注册表。

from confluent_kafka import avro

from confluent_kafka.avro import AvroProducer

value_schema_str = """

    {

       "type":"record",

       "name":"myrecord",

       "fields":[

          {

             "name":"ID",

             "type":[

                "null",

                "int"

             ],

             "default":null

          },

          {

             "name":"PRODUCT",

             "type":[

                "null",

                "string"

             ],

             "default":null

          },

          {

             "name":"QUANTITY",

             "type":[

                "null",

                "int"

             ],

             "default":null

          },

          {

             "name":"PRICE",

             "type":[

                "null",

                "int"

             ],

             "default":null

          }

       ]

    }

    """

key_schema_str = """

    {

       "type":"record",

       "name":"key_schema",

       "fields":[

          {

             "name":"ID",

             "type":"int"

          }

       ]

    }

    """

def delivery_report(err, msg):

    """ Called once for each message produced to indicate delivery result.

        Triggered by poll() or flush(). """

    if err is not None:

        print('Message delivery failed: {}'.format(err))

    else:

        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

if __name__ == '__main__':

    value_schema = avro.loads(value_schema_str)

    key_schema = avro.loads(key_schema_str)

    value = {"ID": 199 , "PRODUCT":"Yagiz Gulbahar", "QUANTITY":1453,"PRICE":61}

    key = {"ID": 199}

    avroProducer = AvroProducer({

        'bootstrap.servers': '10.0.0.0:9092',

        'on_delivery': delivery_report,

        'schema.registry.url': 'http://10.0.0.0:8081'

    }, default_key_schema=key_schema, default_value_schema=value_schema)

    avroProducer.produce(topic='ersin_test_2', key=key, value=value)

    avroProducer.flush()

我可以只放模式而不放数据吗?
提前谢谢

xam8gpfp

xam8gpfp1#

您还可以使用 confluent-kafka-python 或者 python-schema-registry-client .
使用 confluent-kafka-python ```
from confluent_kafka.avro import SchemaRegistryClient, Schema

schema_str = """
{
"type": "record",
"name": "user",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "age", "type": "int"}
]
}
"""

avro_schema = Schema(schema_str, 'AVRO')
sr = SchemaRegistryClient("http://schema-registry-host:8081")
_schema_id = client.register_schema("yourSubjectName", avro_schema)

使用 `python-schema-registry-client` ```
from schema_registry.client import SchemaRegistryClient, schema

schema_ = schema.AvroSchema({
    "type": "record",
    "name": "user",
    "fields": [
        {"name": "firstName", "type": "string"},
        {"name": "age",  "type": "int"}
    ]
})

my_schema = sr.register("yourSubjectName", schema_)

相关问题