kafka连接,获取jsonconverter的json模式

iih3973s  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(439)

我正在尝试用自定义值转换器设置kafka连接器。
我在用Kafka传送连载的节俭物品。
我想设置一个kafka连接器,它反序列化节俭消息,将它们转换为json并发送到elasticsearch。
方法 org.apache.kafka.connect.storage.Converter#toConnectData 退货 SchemaAndValue ,需要 org.apache.kafka.connect.data.Schema .
如何获取json的模式?
到目前为止我尝试的是:
我试着延伸 org.apache.kafka.connect.json.JsonConverter ,但它有自己的模式。
我尝试使用此库生成架构:https://github.com/reinert/jjschema,但是 JsonConverter 似乎有它自己的格式:它期望 map 而不是 object 等等。
请参见:https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/jsonconverter.java#l408
尽管我已经禁用了模式( "value.converter.schemas.enable":"false" )在我的配置中,连接器仍在崩溃并抱怨模式。这个模式从何而来?它们是如何产生的?
我将要编写一个方法,递归地重命名json模式中所有“错误”的东西,但这太尴尬了。有合适的方法吗?
我的配置是

{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "mytopic",
    "key.ignore": "true",
    "connection.url": "https://my-elastic:443",
    "type.name": "event",
    "elasticsearch.index.prefix" : "kafka",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter" : "com.example.ThriftToJsonDeserializer",
    "value.converter.schemas.enable":"false"
}
nimxete2

nimxete21#

问题是elasticsearch连接器试图基于消息模式推断elasticsearch的Map。消息架构由 Converter 修改人 Transforms . 如果你设置 value.converter.schemas.enablefalse 你的记录是空的。
你必须把 schema.ignore 如果为true,elasticsearch连接器将不会推断架构。

相关问题