使用ConfluentKafka for python向主题注册模式id

mbzjlibv  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(394)

到目前为止,我得到的唯一答案是,你必须给模式和主题取一个相同的名字,然后把它们联系起来。但是在注册了一个名为 test_topic 比如:

{
  "type": "record",
  "name": "test_topic",
  "namespace": "com.test",
  "doc": "My test schema",
  "fields": [
    {
      "name": "name",
      "type": "string"
    }
  ]
}

并运行以下命令,它可以毫无问题地插入。

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"name": "My first name"}}]}' "http://localhost/topics/test_topic"

但是当我同时运行以下命令时,它会插入而不给出任何错误(注意,我更改了属性名)

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"test": "My first name"}}]}' "http://localhost/topics/test_topic"

我可能会怀疑一条错误消息说我的数据与此主题的架构不匹配。。。
我的模式id是10,所以我知道它正在工作和注册,但目前不是很有用。
python代码:

from confluent_kafka import Producer
import socket
import json
conf = {'bootstrap.servers': 'localhost:9092', 'client.id': socket.gethostname()}
producer = Producer(conf)

def acked(err, msg):
    if err is not None:
        print(f'Failed to deliver message: {str(msg)}, {str(err)}')
    else:
        print(f'Message produced: {str(msg)}')

    producer.produce("test_topic", key="key", value=json.dumps({"test": name}).encode('ascii') , callback=acked)

producer.poll(5)
nfg76nw0

nfg76nw01#

你必须给模式和主题取一个相同的名字,然后将它们链接在一起
schema注册表不是这样工作的。
每个Kafka记录都有一个键和一个值。
注册表中有主题,这些主题没有严格Map到主题。
但是,kafka客户机(反)序列化程序实现将同时使用这两种方法 topic-key 以及 topic-value 要从注册表中注册/提取架构的主题名称。
客户机无法告诉注册表将架构放在哪个id。这个逻辑是在服务器端计算的
我不确定我是否理解您的帖子与rest代理有什么关系,但是您发布的是纯json,没有告诉它数据应该是avro(您使用的标题不正确)
如果使用avro,则内容类型为 application/vnd.kafka.avro.v2+json

相关问题