Kafka循环分配策略不起作用

s2j5cfk0  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(223)

我是Kafka的新人。kafka producer不是以循环方式分发我的消息,而是将所有消息发送到同一分区,尽管我没有设置任何密钥。共享下面的生产者代码和输出。Kafka原木

public static void main(String[] args) {

    final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);

    //create producer properties
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

    //create producer
    KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

    for( int i=0;i<5;i++) {

        //Producer Record
        final ProducerRecord<String, String> record = new ProducerRecord<String, String>("Topic-A", "test" + Integer.toString(i));

        //send data
        producer.send(record, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e == null) {
                    logger.info("Recieved new metadata: \n" +
                            "Topic : " + recordMetadata.topic() + "\n" +
                            "Partition : " + recordMetadata.partition() + "\n" +
                            "Offsets : " + recordMetadata.offset() + "\n" +
                            "Timestamp : " + recordMetadata.timestamp());
                } else {
                    logger.error("error : " + e);
                }
            }
        });

    }

        producer.flush();

        producer.close();

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题