public class AddPartition <R extends ConnectRecord<R>> implements Transformation<R> {
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
}
@Override
public R apply(R record) {
return record.newRecord(record.topic(), calculatePartition(record), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
}
private Integer calculatePartition(R record) {
// Partitions calcuation based on record information
return 0;
}
@Override
public void close() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}
1条答案
按热度按时间vsaztqbk1#
默认情况下,kafka connect to assigning partitions使用:
DefaultPartitioner
(org.apache.kafka.clients.producer.internals.DefaultPartitioner
)如果您需要用一些自定义参数覆盖默认值,这是可能的,但您必须记住,覆盖适用于所有源连接器。要做到这一点,你必须
producer.partitioner.class
财产,例如producer.partitioner.class=com.example.CustomPartitioner
. 另外,您必须用分区器将jar复制到带有kafka连接库的目录中。改造方式:
在转换中设置分区也是可能的,但这不是正确的方法。从
Transformation
您没有访问主题元数据的权限,这对于分配分区至关重要。如果要为记录设置分区,代码应如下所示: