我是Kafka的新人。kafka producer不是以循环方式分发我的消息,而是将所有消息发送到同一分区,尽管我没有设置任何密钥。共享下面的生产者代码和输出。Kafka原木
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);
//create producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//create producer
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
for( int i=0;i<5;i++) {
//Producer Record
final ProducerRecord<String, String> record = new ProducerRecord<String, String>("Topic-A", "test" + Integer.toString(i));
//send data
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
logger.info("Recieved new metadata: \n" +
"Topic : " + recordMetadata.topic() + "\n" +
"Partition : " + recordMetadata.partition() + "\n" +
"Offsets : " + recordMetadata.offset() + "\n" +
"Timestamp : " + recordMetadata.timestamp());
} else {
logger.error("error : " + e);
}
}
});
}
producer.flush();
producer.close();
}
暂无答案!
目前还没有任何答案,快来回答吧!