我创建了一个自定义的partitioner类,它扩展了默认的partitioner。
问题:我想在kafkasender.send method()中添加此自定义分区器
kafkasender.send method()代码:
sender.send(flux.just(senderrecord.create(new producerrecord<>(topic,partition,key,record,recordheaders),1)))
这里的分区器是一个整数
自定义分区器代码:
public class CustomPartitioner extends DefaultPartitioner {
private final static String CHAR_FORMAT = "UTF-8";
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// my logic
try {
return super.partition(topic, key, iocKey.toString().getBytes(CHAR_FORMAT), value, valueBytes, cluster);
} catch (UnsupportedEncodingException e) {
//error message
}
}
}
注意:我试着用下面的代码硬编码
Properties properties = new Properties();
properties.put("partitioner.class", "CustomPartitioner ");
如何强制kafkasender.send method()使用自定义分区器?
1条答案
按热度按时间vulvrdjw1#
您必须将属性Map作为producer配置的一部分传递给kafkatemplatebean。