我使用下面的代码(不是真的,但假设是这样)创建一个模式,并由生产者将其发送给kafka。
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
问题是代码只允许我用这个模式发送一条消息。然后我需要更改架构名称以便发送下一条消息。。。所以名称字符串现在是随机生成的,所以我可以发送更多的消息。这是一个黑客,所以我想知道正确的方法来做这件事。
我还研究了如何在没有模式的情况下发送消息(即已经向kafka发送了一条带有模式的消息,现在所有其他消息都不再需要模式了) new GenericData.Record(..)
需要架构参数。如果为空,则会抛出一个错误。
那么,向Kafka发送avro模式消息的正确方法是什么呢?
下面是另一个代码示例-与我的非常相同:
https://github.com/confluentinc/examples/blob/kafka-0.10.0.1-cp-3.0.1/kafka-clients/producer/src/main/java/io/confluent/examples/producer/producerexample.java
它也没有显示如何在不设置模式的情况下发送。
1条答案
按热度按时间deikduxw1#
我不懂台词:
问题是代码只允许我用这个模式发送一条消息。然后我需要更改模式名称以便发送下一条消息。
在这两个示例(您的示例和您提供的合并示例)中,模式都不会发送给kafka。
在您提供的示例中,用于创建genericrecord对象的模式。提供模式是因为您希望根据某个模式验证记录(例如,验证您将只能在genericrecord对象中放置整数int1字段)。
在您的代码中,唯一的区别是您决定将数据序列化为byte[],这可能是不需要的,因为您可以将此职责委托给kafkaavroserializer,正如您在合流示例中看到的那样。
genericrecord是一个avro对象,它不是Kafka的强制执行。如果要将任何类型的对象发送到kafka(有或没有架构),只需创建(或使用现有的)序列化程序,将对象转换为byte[],并在为生产者创建的属性中设置此序列化程序。
通常,用avro消息本身发送指向模式的指针是一种好的做法。您可以在以下链接中找到原因:http://www.confluent.io/blog/schema-registry-kafka-stream-processing-yes-virginia-you-really-need-one/