我创建了一个自定义连接器,它扩展了JdbcSinkConnector,如下所示
public class CustomJdbcSinkConnector extends JdbcSinkConnector {
@Override
public Class<? extends Task> taskClass() {
return CustomJdbcSinkTask.class;
}
@Override
public String version() {
//version of your custom connector
return "1.0.0";
}
}
public class CustomJdbcSinkTask extends JdbcSinkTask {
private static final Logger log = LoggerFactory.getLogger(CustomJdbcSinkTask.class);
ErrantRecordReporter reporter;
DatabaseDialect dialect;
JdbcSinkConfig config;
JdbcDbWriter writer;
int remainingRetries;
private Map<String, String> configProps;
@Override
public void put(Collection<SinkRecord> records) {
log.info("hello from custom put");
log.info("custom put method starting...");
JdbcSinkTask jdbcSinkTask = new JdbcSinkTask();
jdbcSinkTask.put(records);
}
}
我已经把jar构建好了,并把它的路径添加到了www.example.com的connect-distributed.properties
然后启动了Kafka连接,但这个自定义连接器没有加载
日志显示
[2023-05-30 14:54:01,585] INFO Loading plugin from: C:\Kafka\JDBC\custom-jdbc-sink-1.0.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:275)
[2023-05-30 14:54:01,613] INFO Registered loader: PluginClassLoader{pluginLocation=file:/C:/Kafka/JDBC/custom-jdbc-sink-1.0.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:297)
它是加载和注册,但插件没有添加。jar文件没有损坏,我可以从终端运行它。
1条答案
按热度按时间izkcnapc1#
自定义连接器能够通过使用pom.xml中的maven-assembly-plugin加载,如