我使用的是scala 2.12,在build.sbt中有以下依赖项。
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.0"
libraryDependencies += "io.confluent" % "kafka-avro-serializer" % "3.1.1"
libraryDependencies += "io.confluent" % "common-config" % "3.1.1"
libraryDependencies += "io.confluent" % "common-utils" % "3.1.1"
libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % "3.1.1"
感谢这个社区,我能够将我的原始数据转换成所需的avro格式。
我们需要使用合流库来序列化数据并将其发送到kafka主题。
我正在使用以下属性和avro记录。
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
properties.put("schema.registry.url", "http://myschemahost:8081")
为了简洁起见,只显示所需的代码片段。
val producer = new KafkaProducer[String, GenericData.Record](properties)
val schema = new Schema.Parser().parse(new File(schemaFileName))
var avroRecord = new GenericData.Record(schema)
// code to populate record
// check output below to see the data
logger.info(s"${avroRecord.toString}\n")
producer.send(new ProducerRecord[String, GenericData.Record](topic, avroRecord), new ProducerCallback)
producer.flush()
producer.close()
根据输出的架构和数据。
{"name": "person","type": "record","fields": [{"name": "address","type": {"type" : "record","name" : "AddressUSRecord","fields" : [{"name": "streetaddress", "type": "string"},{"name": "city", "type":"string"}]}}]}
我在发布到Kafka时遇到以下错误。
Error registering Avro schema:
org.apache.kafka.common.errors.SerializationException:
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:238)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:230)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:225)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:877)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:839)
基于模式和数据,有什么遗漏吗?我的记录正确吗?
另外,我想知道如何从scala中填充“avro”null?没有不起作用的。
任何帮助都将不胜感激。我真的被困在这里了。
更新:
感谢@cricket\u 007指出这个问题。我确实遇到以下错误:
2019-03-20 13:26:09.660 [application-akka.actor.default-dispatcher-5] INFO i.c.k.s.KafkaAvroSerializerConfig.logAll(169) - KafkaAvroSerializerConfig values:
schema.registry.url = [http://myhost:8081]
max.schemas.per.subject = 1000
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005
但是,当我使用相同的url(http://myhost:8081)在我的浏览器上运行良好。我可以看到主题和其他信息。但是,只要我使用客户机(上面的scala程序),它就会失败,并出现上述错误。
我刚刚用下面的示例代码进行了检查,它给出了相同的问题。
val client = new OkHttpClient
val request = new Request.Builder().url("http://myhost:8081/subjects").build()
val output = client.newCall(request).execute().body().string()
logger.info(s"Subjects: ${output}\n")
我的架构注册表url的连接被拒绝。
Subjects: <HEAD><TITLE>Connection refused</TITLE></HEAD>
<BODY BGCOLOR="white" FGCOLOR="black"><H1>Connection refused</H1><HR>
<FONT FACE="Helvetica,Arial"><B>
Description: Connection refused</B></FONT>
<HR>
<!-- default "Connection refused" response (502) -->
</BODY>
所以,我想看看我是否遗漏了什么。当我在浏览器上运行它时,同样的方法也能起作用,但像上面这样的简单代码会失败。
1条答案
按热度按时间nkoocmlb1#
这是http响应解析错误。似乎您的模式注册表没有返回json响应,而是返回一些以
<
打开标签。您应该检查注册表是否真的在运行
http://myschemahost:8081
,并且您可以使用restapi手动将架构发布到它,以执行与序列化程序相同的操作。