如何将自定义分区与kafkasender.send method()一起使用?

b1uwtaje  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(318)

我创建了一个自定义的partitioner类,它扩展了默认的partitioner。
问题:我想在kafkasender.send method()中添加此自定义分区器
kafkasender.send method()代码:
sender.send(flux.just(senderrecord.create(new producerrecord<>(topic,partition,key,record,recordheaders),1)))
这里的分区器是一个整数
自定义分区器代码:

public class CustomPartitioner extends DefaultPartitioner {
private final static String CHAR_FORMAT = "UTF-8";
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    // my logic 
    try {
        return super.partition(topic, key, iocKey.toString().getBytes(CHAR_FORMAT), value, valueBytes, cluster);
    } catch (UnsupportedEncodingException e) {
       //error message
    }
}

}
注意:我试着用下面的代码硬编码

Properties properties = new Properties();
     properties.put("partitioner.class", "CustomPartitioner ");

如何强制kafkasender.send method()使用自定义分区器?

vulvrdjw

vulvrdjw1#

您必须将属性Map作为producer配置的一部分传递给kafkatemplatebean。

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put("partitioner.class", "<packagename>.CustomPartitioner");
    return new KafkaTemplate<>(configProps );
}

相关问题