我一直在写一个自定义Kafka连接器插件。为此,我必须从元数据文件动态创建一个模式,然后读取数据文件中的每个记录,并将单个记录发布到主题中。下面是创建架构的示例代码:
创建单个字段
JsonObject fileNameJSONObj = new JsonObject();
fileNameJSONObj.addProperty("name", "Update_Timestamp");
fileNameJSONObj.addProperty("type", "string");
fileNameJSONObj.addProperty("logicalType", "timestamp-millis");
fileNameJSONObj.addProperty("default", "");
创建记录
JsonObject fileNameJSONObj = new JsonObject();
jsonObject.addProperty("type", "record");
jsonObject.addProperty("name", "myrecord");
jsonObject.addProperty("namespace","abc");
jsonObject.add("fields", array);
我已经为所有字段设置了logicaltype,并且我也将这些字段写入avsc文件。在文件中,它显示为:
{"name":"Update_TimeStamp","type":"string","logicalType":"timestamp-millis","default":""},
但是,当我在调用schema registry端点时http://localhost:8081/subjects/myvalue/versions/1,显示如下:
{\"name\":\"Update_TimeStamp\",\"type\":{\"type\":\"string\",\"connect.default\":\"\"},\"default\":\"\"}
我已经调试够了,但是没有任何进展。我会做错什么?
暂无答案!
目前还没有任何答案,快来回答吧!