我已经开始探索Kafka和Kafka连接最近做了一些初步的设置。但希望在模式注册部分进行更多的探索。
我的模式注册表现在启动,我应该做什么。
我有一个avro schema存储在avro\u schema.avsc中。
这是模式
{
"name": "FSP-AUDIT-EVENT",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "ID",
"type": "string"
},
{
"name": "VERSION",
"type": "int"
},
{
"name": "ACTION_TYPE",
"type": "string"
},
{
"name": "EVENT_TYPE",
"type": "string"
},
{
"name": "CLIENT_ID",
"type": "string"
},
{
"name": "DETAILS",
"type": "string"
},
{
"name": "OBJECT_TYPE",
"type": "string"
},
{
"name": "UTC_DATE_TIME",
"type": "long"
},
{
"name": "POINT_IN_TIME_PRECISION",
"type": "string"
},
{
"name": "TIME_ZONE",
"type": "string"
},
{
"name": "TIMELINE_PRECISION",
"type": "string"
},
{
"name": "AUDIT_EVENT_TO_UTC_DT",
"type": [
"string",
"null"
]
},
{
"name": "AUDIT_EVENT_TO_DATE_PITP",
"type": "string"
},
{
"name": "AUDIT_EVENT_TO_DATE_TZ",
"type": "string"
},
{
"name": "AUDIT_EVENT_TO_DATE_TP",
"type": "string"
},
{
"name": "GROUP_ID",
"type": "string"
},
{
"name": "OBJECT_DISPLAY_NAME",
"type": "string"
},
{
"name": "OBJECT_ID",
"type": [
"string",
"null"
]
},
{
"name": "USER_DISPLAY_NAME",
"type": [
"string",
"null"
]
},
{
"name": "USER_ID",
"type": "string"
},
{
"name": "PARENT_EVENT_ID",
"type": [
"string",
"null"
]
},
{
"name": "NOTES",
"type": [
"string",
"null"
]
},
{
"name": "SUMMARY",
"type": [
"string",
"null"
]
}
]
}
我的架构是否有效。我从json在线转换了它?我不确定这个模式文件的位置应该放在哪里。请指导我走哪一步。我从lambda函数和jdbc源代码发送记录。
那么基本上我如何执行avro模式和测试呢?我必须更改avro消费者属性文件中的任何内容吗?
或者这是注册模式的正确方法
./bin/kafka-avro-console-producer \
--broker-list b-3.**:9092,b-**:9092,b-**:9092 --topic AVRO-AUDIT_EVENT \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema" : "{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"VERSION\"},{\"type\":\"string\",\"optional\":true,\"field\":\"ACTION_TYPE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"EVENT_TYPE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"CLIENT_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"DETAILS\"},{\"type\":\"string\",\"optional\":true,\"field\":\"OBJECT_TYPE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"UTC_DATE_TIME\"},{\"type\":\"string\",\"optional\":true,\"field\":\"POINT_IN_TIME_PRECISION\"},{\"type\":\"string\",\"optional\":true,\"field\":\"TIME_ZONE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"TIMELINE_PRECISION\"},{\"type\":\"string\",\"optional\":true,\"field\":\"GROUP_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"OBJECT_DISPLAY_NAME\"},{\"type\":\"string\",\"optional\":true,\"field\":\"OBJECT_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"USER_DISPLAY_NAME\"},{\"type\":\"string\",\"optional\":true,\"field\":\"USER_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"PARENT_EVENT_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"NOTES\"},{\"type\":\"string\",\"optional\":true,\"field\":\"SUMMARY\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_UTC_DT\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_DATE_PITP\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_DATE_TZ\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_DATE_TP\"}],\"optional\":false,\"name\":\"test\"}"}' http://localhost:8081/subjects/view/versions
接下来我要做什么
但当我试图看到我的模式时,我只得到下面的
curl --silent -X GET http://localhost:8081/subjects/AVRO-AUDIT-EVENT/versions/latest
这就是结果
{"subject":"AVRO-AUDIT-EVENT","version":1,"id":161,"schema":"{\"type\":\"string\",\"optional\":false}"}
为什么看不到完整注册的架构
当我尝试删除模式时也是如此
我得到下面的错误
{"error_code":405,"message":"HTTP 405 Method Not Allowed"
我不确定我的模式是否正确注册。
请帮帮我。提前谢谢
1条答案
按热度按时间omqzjyyz1#
我的架构有效吗
您可以使用注册表的RESTAPI尝试并提交它,然后查看。。。
我不确定这个模式文件的位置应该放在哪里
不清楚你是怎么发信息的。。。
如果您真的编写了kafka生产者代码,那么您可以将其存储在代码中(作为字符串)或作为资源文件。。如果使用java,则可以使用schemabuilder类来创建schema对象
如果尚未使用avro模式和序列化程序,则需要重写producer以使用它们
如果我们创建avro模式,它也可以用于json。
avro是一种二进制格式,但它有一个jsondecoder。
我们的avro架构属性文件的url应该是什么?
一旦你想好如何启动它,它就必须是你的模式注册表的ip(与
schema-registry-start
)我必须更改avro消费者属性文件中的任何内容吗?
你需要使用avro反序列化程序
这是注册模式的正确方法吗
.>
/bin/kafka-avro-console-producer \
不完全是。这就是使用模式生成消息的方式(需要使用正确的模式)。你还必须提供--property schema.registry.url
您可以使用注册表的restapi来注册和验证模式