在kafka connect中添加新的自定义属性

imzjd6km  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(258)

我正在尝试添加新的自定义kafka转换器,它是connectjson中jsonconverterconfig的一个修改。我试图在扩展jsonconverterconfig的转换器中添加一些新的自定义属性,比如“schemas.modifications.enable”。但Kafka连接无法找到有关转换器的细节。我的代码段:

public class ModifiedJsonConfig extends JsonConverterConfig {

public static final String SCHEMAS_MODIFY_CONFIG = "schemas.modifications.enable";
public static final boolean SCHEMAS_MODIFY_CONFIG_DEFAULT = true;
private static final String SCHEMAS_MODIFY_CONFIG_DOC = "The maximum number of schemas that can be cached in this converter instance.";
private static final String SCHEMAS_MODIFY_CONFIG_DISPLAY = "Schema Cache Size";

private final static ConfigDef CONFIG;

static {
    String group = "Schemas-modification";
    int orderInGroup = 0;
    CONFIG = ConverterConfig.newConfigDef();
    CONFIG.define(SCHEMAS_MODIFY_CONFIG, Type.BOOLEAN, SCHEMAS_MODIFY_CONFIG_DEFAULT, Importance.HIGH, SCHEMAS_MODIFY_CONFIG_DOC, group,
            orderInGroup++, Width.MEDIUM, SCHEMAS_MODIFY_CONFIG_DISPLAY);
}

public static ConfigDef configDef() {
    return CONFIG;
}

public ModifiedJsonConfig(Map<String, ?> props) {
    super(props);
}

public boolean schemasModified() {
    return getBoolean(SCHEMAS_MODIFY_CONFIG);
    }
}

但我得到了一个错误:
由于错误而停止时出错(org.apache.kafka.connect.cli.connectdistributed:83)org.apache.kafka.common.config.configexception:未知配置“schemas.modifications.enable”
但我已经定义了这个配置。如果你能帮我在这里设置一个自定义转换器属性,那将非常有用。
提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题