kafka jdbc连接器中的java自定义分区分配

ercv8c1e  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(410)

我有一个用例,在这个用例中,我需要编写一个自定义逻辑来根据消息中的某些关键参数分配分区。我对此做了一些研究,发现kafka转换支持重写转换接口中的某些方法,但我无法在git hub或其他地方编写一些示例代码。有人可以分享示例代码或git-hub链接来在kafka jdbc源连接器中进行自定义分区分配吗?
提前谢谢!。

vsaztqbk

vsaztqbk1#

默认情况下,kafka connect to assigning partitions使用: DefaultPartitioner ( org.apache.kafka.clients.producer.internals.DefaultPartitioner )
如果您需要用一些自定义参数覆盖默认值,这是可能的,但您必须记住,覆盖适用于所有源连接器。要做到这一点,你必须 producer.partitioner.class 财产,例如 producer.partitioner.class=com.example.CustomPartitioner . 另外,您必须用分区器将jar复制到带有kafka连接库的目录中。
改造方式:
在转换中设置分区也是可能的,但这不是正确的方法。从 Transformation 您没有访问主题元数据的权限,这对于分配分区至关重要。
如果要为记录设置分区,代码应如下所示:

public class AddPartition <R extends ConnectRecord<R>> implements Transformation<R> {

    public static final ConfigDef CONFIG_DEF = new ConfigDef();

    @Override
    public void configure(Map<String, ?> props) {
        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    }

    @Override
    public R apply(R record) {
        return record.newRecord(record.topic(), calculatePartition(record), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
    }

    private Integer calculatePartition(R record) {
        // Partitions calcuation based on record information
        return 0;
    }

    @Override
    public void close() {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }
}

相关问题