在confluent 4.1+kafka 1.1中为kafka connect打包一个自定义的java`partitioner.class`插件?

pw136qt2  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(524)

我已经成功地为confluent 3.2.x(kafka 0.10.x)上的kafka连接接收器使用了一个用java编写的简单自定义分区器类。我想升级到confluent 4.1(kafka 1.1)并且遇到错误。
kafkaconnect的插件加载机制似乎在cp3.3.0中发生了变化。以前,只有classpath选项,但是在cp3.3.0+中有一个更新的和推荐的选项 plugin.path 机制。
如果我尝试继续使用遗留类路径插件机制,当我尝试使用我的插件时,我会得到:

java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.DefaultPartitioner

这是一个cp内部类。对于旧的cp3.2.x,它在类路径上是可用的,但是对于cp>=3.3.0中的新类路径隔离工作,我认为它必须与插件一起提供。
我想换成新的是明智的 plugin.path 机制。我删除类路径条目。默认情况下 /etc/kafka/connect-distributed.properties ,我明白了 plugin.path=/usr/share/java ,所以我将plugin.jar安装到 /usr/share/java/my-custom-partitioner/my-custom-partitioner.jar . 我也尝试过添加和不添加dependency.jar文件。
我的插件似乎在kafka connect服务启动时加载:

INFO Loading plugin from: /usr/share/java/my-custom-partitioner (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/my-custom-partitioner/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)

当我这样做时:

curl -X PUT -H "Content-Type: application/json" --data-binary "@sink_test_1.json" my-dev-test-vm:8083/connectors/sink-test-1/config

我得到:

{"error_code":500,"message":null}%

我可以在kafka connect系统日志中看到:

java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:270)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:238)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:617)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:625)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:525)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:508)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:490)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)

不清楚出了什么问题,也不清楚为什么我的partitioner类加载不正确。
仅供参考,我已经用cp4.1+kafka1.1依赖性重新构建了我的java插件,并进行了一些小的更新,以匹配api的更改,比如为 getSchemaGeneratorClass 去我的分区班。

cbjzeqam

cbjzeqam1#

自定义kafka connect分区器类将无法通过旧的classpath机制工作,它们也无法作为具有较新的kafka 0.11.0+独立插件机制的插件工作。
唯一可行的解决方案是将带有自定义kafka connect分区器类的自定义.jar文件复制到 kafka-connect-storage-common 插件目录 /usr/share/java/kafka-connect-storage-common/ . 自定义kafka connect partitioner插件类必须存在于同一目录中,以便它们位于同一个独立类装入器中。
仅供参考,您可以看到kafka 0.11.0+独立插件机制将只加载四个特定java类的子类,这些类不包括kafka connect分区器:
https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/delegatingclassloader.java#l279
感谢cricket\u007推荐了这个精确的解决方案:将自定义kafka connect partitioner.jar文件放入 /share/java/kafka-storage-common 目录。我从中学到了为什么必须这么做,为什么其他选择不起作用。

相关问题