我可以让kafka消费者/接收器连接跳过主题中的特定分区吗?

t5fffqht  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(320)

kafka connect中指定从哪个分区读取消息的任何选项。基本上,我在kafka connects中寻找一个选项来手动分配要读取的分区列表。
类似于kafkaconsumer api中的assign()方法
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#assign(java.util.collection)集合

i34xakig

i34xakig1#

你不能只听Kafka连接中的特定分区。
但是您可以实现只从特定分区插入消息的功能。
要拥有这样的特性,您需要实现您的定制 Transformation . 如果 Transformation 退货 null 消息被跳过,因此您的自定义 Transformation 必须返回 null 不需要的分区。
示例代码如下:

public class PartitionFilter <R extends ConnectRecord<R>> implements Transformation<R> {

    public static final ConfigDef CONFIG_DEF = new ConfigDef();

    @Override
    public void configure(Map<String, ?> props) {
        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    }

    @Override
    public R apply(R record) {
        int neededPartition = 1; // some parititon
        if (record.kafkaPartition() != neededPartition)
           return null;
        return record;
    }

    @Override
    public void close() {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }
}

有关转换的更多信息,请参见:https://kafka.apache.org/documentation/#connect_transforms

相关问题