ProducerRecord<byte[],byte[]> record = new
ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
2条答案
按热度按时间q43xntqr1#
将数据生成到topic时,RecordMetadata对象在
KafkaProducer.send
上返回send()方法是异步的。当调用它时,它将记录添加到一个等待发送记录的缓冲区中,并立即返回。这允许生产者将单个记录批处理在一起以提高效率。
生成器由一个缓冲区空间池和一个后台I/O线程组成,缓冲区空间池用于保存尚未传输到服务器的记录,后台I/O线程负责将这些记录转换为请求并传输到群集。使用后如果未能关闭生成器,将泄漏这些资源。
由于
send
方法返回Future<RecordMetadata>
,并且每条记录的结果都由异步线程执行,因此可以添加回调以打印offset
、partition
和记录信息或失败时的异常**注意:**在Kafka主题中生成记录时,由于异步机制,调用
send
方法的线程将不同于执行Future<RecordMetadata>
结果的线程。vojdkbi02#
正如另一个答案所说,
RecordMetadata
是在生产者端创建的。如果您谈论的是
ConsumerRecordMetadata
(Spring的概念),那么它并不适用于使用原始的ConsumerRecord
;当您有一个POJO侦听器(仅用于value()
)但仍希望获取元数据时,可以使用它。