java Kafka Connect未加载自定义连接器

r6l8ljro  于 2023-06-04  发布在  Java
关注(0)|答案(1)|浏览(133)

我创建了一个自定义连接器,它扩展了JdbcSinkConnector,如下所示

public class CustomJdbcSinkConnector extends JdbcSinkConnector {

    @Override
    public Class<? extends Task> taskClass() {
        return CustomJdbcSinkTask.class;
    }

    @Override
    public String version() {
        //version of your custom connector
        return "1.0.0";
    }
}

public class CustomJdbcSinkTask extends JdbcSinkTask {

    private static final Logger log = LoggerFactory.getLogger(CustomJdbcSinkTask.class);

    ErrantRecordReporter reporter;
    DatabaseDialect dialect;
    JdbcSinkConfig config;
    JdbcDbWriter writer;
    int remainingRetries;
    private Map<String, String> configProps;

    @Override
    public void put(Collection<SinkRecord> records) {
        log.info("hello from custom put");
        log.info("custom put method starting...");
        JdbcSinkTask jdbcSinkTask = new JdbcSinkTask();
        jdbcSinkTask.put(records);
    }
}

我已经把jar构建好了,并把它的路径添加到了www.example.com的connect-distributed.properties
然后启动了Kafka连接,但这个自定义连接器没有加载
日志显示

[2023-05-30 14:54:01,585] INFO Loading plugin from: C:\Kafka\JDBC\custom-jdbc-sink-1.0.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:275)
[2023-05-30 14:54:01,613] INFO Registered loader: PluginClassLoader{pluginLocation=file:/C:/Kafka/JDBC/custom-jdbc-sink-1.0.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:297)

它是加载和注册,但插件没有添加。jar文件没有损坏,我可以从终端运行它。

izkcnapc

izkcnapc1#

自定义连接器能够通过使用pom.xml中的maven-assembly-plugin加载,如

<plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <mainClass>com.example.somedir.MainApplication</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>assemble-all</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

相关问题