我正在尝试用自定义值转换器设置kafka连接器。
我在用Kafka传送连载的节俭物品。
我想设置一个kafka连接器,它反序列化节俭消息,将它们转换为json并发送到elasticsearch。
方法 org.apache.kafka.connect.storage.Converter#toConnectData
退货 SchemaAndValue
,需要 org.apache.kafka.connect.data.Schema
.
如何获取json的模式?
到目前为止我尝试的是:
我试着延伸 org.apache.kafka.connect.json.JsonConverter
,但它有自己的模式。
我尝试使用此库生成架构:https://github.com/reinert/jjschema,但是 JsonConverter
似乎有它自己的格式:它期望 map
而不是 object
等等。
请参见:https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/jsonconverter.java#l408
尽管我已经禁用了模式( "value.converter.schemas.enable":"false"
)在我的配置中,连接器仍在崩溃并抱怨模式。这个模式从何而来?它们是如何产生的?
我将要编写一个方法,递归地重命名json模式中所有“错误”的东西,但这太尴尬了。有合适的方法吗?
我的配置是
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "mytopic",
"key.ignore": "true",
"connection.url": "https://my-elastic:443",
"type.name": "event",
"elasticsearch.index.prefix" : "kafka",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "com.example.ThriftToJsonDeserializer",
"value.converter.schemas.enable":"false"
}
1条答案
按热度按时间nimxete21#
问题是elasticsearch连接器试图基于消息模式推断elasticsearch的Map。消息架构由
Converter
修改人Transforms
. 如果你设置value.converter.schemas.enable
在false
你的记录是空的。你必须把
schema.ignore
如果为true,elasticsearch连接器将不会推断架构。