使用自定义转换器与Kafka连接?

uurity8g  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(434)

我试图使用Kafka连接自定义转换器,我似乎无法得到它的权利。我希望有人有这方面的经验,可以帮助我找到它!

初始情况

我的自定义转换器的类路径是 custom.CustomStringConverter .
为了避免任何错误,我的自定义转换器目前只是预先存在的stringconverter的一个拷贝/粘贴(当然,当我让它工作时,它会改变)。https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/stringconverter.java
我有一个Kafka连接集群的3个节点,这些节点都运行合流的官方docker图片( confluentinc/cp-kafka-connect:3.3.0 ).
每个节点都配置为加载一个jar,其中包含我的转换器(使用docker卷)。

会发生什么?

当连接器启动时,它们会正确地加载jar并找到自定义转换器。事实上,这就是我在日志中看到的:

[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)

然后将json配置发布到其中一个连接器节点以创建连接器:

{
  "name": "hdfsSinkCustom",
  "config": {
    "topics": "yellow",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "custom.CustomStringConverter",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
    "topics.dir": "yellow_storage",
    "flush.size": "1",
    "rotate.interval.ms": "1000"
  }
}

并收到以下回复:

{
   "error_code": 400,
   "message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value custom.CustomStringConverter for configuration value.converter: Class custom.CustomStringConverter could not be found.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}

我错过了什么?

如果我尝试单独运行kafka connect statdnone,错误消息是相同的。
有人已经面对过了吗?我错过了什么?

2uluyalo

2uluyalo1#

好吧,多亏了Kafka用户邮件列表上的菲利普·施密特,我找到了解决方案。
他提到这个问题:https://issues.apache.org/jira/projects/kafka/issues/kafka-6007 ,这确实是我面临的问题。
引用他的话:
为了测试这一点,我只需将我的smt jar复制到我正在使用的连接器的文件夹中,并调整plugin.path属性。
实际上,我将转换器放在连接器的文件夹中,从而消除了这个错误。
我还尝试了其他方法:创建一个自定义连接器,并将该自定义连接器与自定义转换器一起使用,两者都作为插件加载。它也起作用。
摘要:转换器由连接器加载。如果连接器是插件,那么转换器也应该是插件。如果连接器不是插件(与kafka connect发行版捆绑在一起),那么转换器也不应该是插件。

相关问题