注册avro架构时出错:“string”

avwztpqn  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(335)

将avro消息发送到kafka主题的代码

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    io.confluent.kafka.serializers.KafkaAvroSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    io.confluent.kafka.serializers.KafkaAvroSerializer.class);
            props.put("schema.registry.url", "http://localhost:8081");
            producer = new KafkaProducer<>(props); 

    public void send(List<String> results){
            TestCallback callback = new TestCallback();
            for (Object result : results) {
                ProducerRecord<String, String> record = new ProducerRecord(topic,result.toString());<==confused at this point
                producer.send(record,callback);

            }
            producer.close();
        }

send方法包含从sql查询获取的记录列表。
错误

2017-10-05 23:54:36 DEBUG RestService:118 - Sending POST with input {"schema":"\"string\""} to http://localhost:8081/subjects/my_topicq1-value/versions
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "string"

最后,我想从主题中获取这些记录,并使用kafka connect hdfs将其放入hdfs中。
你能给我一些建议吗?这样我就可以继续了。谢谢您。!

wz8daaqr

wz8daaqr1#

如果要拉入sql记录,是否需要一个定制的生产者——可以使用kafka connect jdbc连接器直接从sql查询中拉入数据,然后使用kafka connect将其连接到hdfs吗?

相关问题