如何为kafka connect avro使用模式注册表

thigvfpy  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(475)

我已经开始探索Kafka和Kafka连接最近做了一些初步的设置。但希望在模式注册部分进行更多的探索。
我的模式注册表现在启动,我应该做什么。
我有一个avro schema存储在avro\u schema.avsc中。
这是模式

{
  "name": "FSP-AUDIT-EVENT",
  "type": "record",
  "namespace": "com.acme.avro",
  "fields": [
    {
      "name": "ID",
      "type": "string"
    },
    {
      "name": "VERSION",
      "type": "int"
    },
    {
      "name": "ACTION_TYPE",
      "type": "string"
    },
    {
      "name": "EVENT_TYPE",
      "type": "string"
    },
    {
      "name": "CLIENT_ID",
      "type": "string"
    },
    {
      "name": "DETAILS",
      "type": "string"
    },
    {
      "name": "OBJECT_TYPE",
      "type": "string"
    },
    {
      "name": "UTC_DATE_TIME",
      "type": "long"
    },
    {
      "name": "POINT_IN_TIME_PRECISION",
      "type": "string"
    },
    {
      "name": "TIME_ZONE",
      "type": "string"
    },
    {
      "name": "TIMELINE_PRECISION",
      "type": "string"
    },
    {
      "name": "AUDIT_EVENT_TO_UTC_DT",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "AUDIT_EVENT_TO_DATE_PITP",
      "type": "string"
    },
    {
      "name": "AUDIT_EVENT_TO_DATE_TZ",
      "type": "string"
    },
    {
      "name": "AUDIT_EVENT_TO_DATE_TP",
      "type": "string"
    },
    {
      "name": "GROUP_ID",
      "type": "string"
    },
    {
      "name": "OBJECT_DISPLAY_NAME",
      "type": "string"
    },
    {
      "name": "OBJECT_ID",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "USER_DISPLAY_NAME",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "USER_ID",
      "type": "string"
    },
    {
      "name": "PARENT_EVENT_ID",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "NOTES",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "SUMMARY",
      "type": [
        "string",
        "null"
      ]
    }
  ]
}

我的架构是否有效。我从json在线转换了它?我不确定这个模式文件的位置应该放在哪里。请指导我走哪一步。我从lambda函数和jdbc源代码发送记录。
那么基本上我如何执行avro模式和测试呢?我必须更改avro消费者属性文件中的任何内容吗?
或者这是注册模式的正确方法

./bin/kafka-avro-console-producer \
                 --broker-list b-3.**:9092,b-**:9092,b-**:9092 --topic AVRO-AUDIT_EVENT \
                 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json"     --data '{"schema" : "{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"VERSION\"},{\"type\":\"string\",\"optional\":true,\"field\":\"ACTION_TYPE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"EVENT_TYPE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"CLIENT_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"DETAILS\"},{\"type\":\"string\",\"optional\":true,\"field\":\"OBJECT_TYPE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"UTC_DATE_TIME\"},{\"type\":\"string\",\"optional\":true,\"field\":\"POINT_IN_TIME_PRECISION\"},{\"type\":\"string\",\"optional\":true,\"field\":\"TIME_ZONE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"TIMELINE_PRECISION\"},{\"type\":\"string\",\"optional\":true,\"field\":\"GROUP_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"OBJECT_DISPLAY_NAME\"},{\"type\":\"string\",\"optional\":true,\"field\":\"OBJECT_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"USER_DISPLAY_NAME\"},{\"type\":\"string\",\"optional\":true,\"field\":\"USER_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"PARENT_EVENT_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"NOTES\"},{\"type\":\"string\",\"optional\":true,\"field\":\"SUMMARY\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_UTC_DT\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_DATE_PITP\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_DATE_TZ\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_DATE_TP\"}],\"optional\":false,\"name\":\"test\"}"}' http://localhost:8081/subjects/view/versions

接下来我要做什么
但当我试图看到我的模式时,我只得到下面的

curl --silent -X GET http://localhost:8081/subjects/AVRO-AUDIT-EVENT/versions/latest

这就是结果

{"subject":"AVRO-AUDIT-EVENT","version":1,"id":161,"schema":"{\"type\":\"string\",\"optional\":false}"}

为什么看不到完整注册的架构
当我尝试删除模式时也是如此
我得到下面的错误

{"error_code":405,"message":"HTTP 405 Method Not Allowed"

我不确定我的模式是否正确注册。
请帮帮我。提前谢谢

omqzjyyz

omqzjyyz1#

我的架构有效吗
您可以使用注册表的RESTAPI尝试并提交它,然后查看。。。
我不确定这个模式文件的位置应该放在哪里
不清楚你是怎么发信息的。。。
如果您真的编写了kafka生产者代码,那么您可以将其存储在代码中(作为字符串)或作为资源文件。。如果使用java,则可以使用schemabuilder类来创建schema对象
如果尚未使用avro模式和序列化程序,则需要重写producer以使用它们
如果我们创建avro模式,它也可以用于json。
avro是一种二进制格式,但它有一个jsondecoder。
我们的avro架构属性文件的url应该是什么?
一旦你想好如何启动它,它就必须是你的模式注册表的ip(与 schema-registry-start )
我必须更改avro消费者属性文件中的任何内容吗?
你需要使用avro反序列化程序
这是注册模式的正确方法吗
.> /bin/kafka-avro-console-producer \ 不完全是。这就是使用模式生成消息的方式(需要使用正确的模式)。你还必须提供 --property schema.registry.url 您可以使用注册表的restapi来注册和验证模式

相关问题