将avro消息发送到kafka主题的代码
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
producer = new KafkaProducer<>(props);
public void send(List<String> results){
TestCallback callback = new TestCallback();
for (Object result : results) {
ProducerRecord<String, String> record = new ProducerRecord(topic,result.toString());<==confused at this point
producer.send(record,callback);
}
producer.close();
}
send方法包含从sql查询获取的记录列表。
错误
2017-10-05 23:54:36 DEBUG RestService:118 - Sending POST with input {"schema":"\"string\""} to http://localhost:8081/subjects/my_topicq1-value/versions
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "string"
最后,我想从主题中获取这些记录,并使用kafka connect hdfs将其放入hdfs中。
你能给我一些建议吗?这样我就可以继续了。谢谢您。!
1条答案
按热度按时间wz8daaqr1#
如果要拉入sql记录,是否需要一个定制的生产者——可以使用kafka connect jdbc连接器直接从sql查询中拉入数据,然后使用kafka connect将其连接到hdfs吗?