我们修改了kafka connect jdbc以支持一个自定义转换器,该转换器将单个sinkrecord转换为多个sinkrecord,从而支持事务性插入。创建接收器时,可以在配置属性中指定实现的类 SinkRecordConverter
然后,我们尝试用这个定制转换器的实现打包一个uber jar,并尝试以两种方式部署它:
我们把它放在kafka connect jdbc的同一个文件夹中
我们将connect-distributed.properties中的plugins.path修改为/usr/local/share/java,并将转换器放置在/usr/local/share/java/myconverter/myconverter-1.0.jar中
然后,我们尝试部署接收器,但在这两种情况下,尝试通过反射创建此转换器示例的代码都失败,并出现错误 java.lang.ClassNotFoundException.
我们试图通过在两种情况下发生问题的位置放置断点来调试类加载问题:
在第一种情况下,jar将显示为urlclasspath上的jar之一
在第二种情况下,它甚至不会显示为urlclasspath上的jar之一
向kafka connect jdbc添加自定义转换器的正确方法是什么?
1条答案
按热度按时间bvpmtnay1#
我们有两个问题:
为了组装jar,我们使用了一个名为onejar的sbt插件,它创建了一个定制的类加载器
我们需要从现有的kafka连接器(jdbc)内部访问这些类,而不是仅从kafka connect访问。
我们找到的解决方案如下:
我们放弃了uberjar,使用sbtpack在kafkaconnect示例上部署了所有lib。
我们将jar物理地放在kafka connect jdbc所在的同一个文件夹中