我正在运行schema registry和kafka connect,现在配置bigquery连接器向bigquery发送json消息。
为了重现这些错误,我使用kafka json模式控制台生成器生成了一些基本的json消息,如下所示(架构注册表内部)
/usr/bin/kafka-json-schema-console-producer \
--broker-list $MY_BROKERS \
--topic json-test \
--property schema.registry.url=http://localhost:8081 \
--property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}}'
因此,查询模式注册表( curl localhost:8081/schemas/ids/1
)给我以下结果。
{"schemaType":"JSON","schema":"{\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"}
接下来,我用下面的配置创建了实际的bigquery接收器连接器。
{
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"tasks.max": 1,
"autoCreateTables" : "true",
"autoUpdateSchemas" : "true",
"schemaRetriever" : "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
"schemaRegistryLocation":"http://kafka-schema-registry-cp-schema-registry.kafka-schema-registry.svc.cluster.local:8081",
"topics": "json-test",
"sanitizeTopics": "true",
"autoCreateTables": "true",
"project": "MYPROJECT1",
"datasets": ".*=MYDATASET1",
"keyfile": "/bigquery-key.json",
"value.converter.schema.registry.url": "http://kafka-schema-registry-cp-schema-registry.kafka-schema-registry.svc.cluster.local:8081",
"schema.registry.url": "http://kafka-schema-registry-cp-schema-registry.kafka-schema-registry.svc.cluster.local:8081",
"value.deserializer": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter"
}
产生的误差如下。我不能理解的一点是,我使用的是jsonschemaconverter,从来没有在任何地方指定过avro。在我配置kafka连接本身的时候,我从来没有给avro选项,所以不用担心重写。
为什么会出现avro schemaparse错误?
暂无答案!
目前还没有任何答案,快来回答吧!