我已经在分布式模式下安装并测试了kafka connect,它现在可以工作了,它连接到配置的接收器并从配置的源读取数据。在这种情况下,我采取行动,以加强我的安装。我认为需要立即注意的一个方面是,要创建连接器,唯一可用的方法是通过rest调用,这意味着我需要在不受保护的情况下通过电线发送信息。为了确保这一点,kafka引入了新的configprovider。这很有帮助,因为它允许在服务器中设置属性,然后在rest调用中引用它们,如下所示:
{
.
.
"property":"${file:/path/to/file:nameOfThePropertyInFile}"
.
.
}
只需在服务器上添加属性文件,并在distributed.properties文件上添加以下配置,就可以很好地工作:
config.providers=file # multiple comma-separated provider types can be specified here
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
虽然这个解决方案可行,但它确实无助于缓解我对安全性的担忧,因为现在传递的信息从通过有线发送到现在存放在存储库中,所有人都可以看到清晰可见的文本。kafka团队预见到了这个问题,并允许客户机生成自己的配置提供程序来实现configprovider接口。我已经创建了自己的实现,并打包在一个jar中,给它起了建议的最终名称:
META-INF/services/org.apache.kafka.common.config.ConfigProvider
并在分布式文件中添加了以下条目:
config.providers=cust
config.providers.cust.class=com.somename.configproviders.CustConfigProvider
但是,我从connect中得到一个错误,指出实现configprovider的类名为:
com.somename.configproviders.CustConfigProvider
找不到。我现在不知所措,因为他们网站上的文档没有明确说明如何很好地配置自定义配置提供程序。
是否有人曾处理过类似的问题,并能对此提供一些见解?任何帮助都将不胜感激。
1条答案
按热度按时间ttcibm8c1#
我最近刚刚通过这些设置了一个定制的configprovider。官方文件模棱两可,令人费解。
我已经创建了自己的实现并打包在一个jar中,给它起了建议的最终名称:meta-inf/services/org.apache.kafka.common.config.configprovider
您可以随意命名jar的最终名称,但需要打包成jar格式,其后缀为.jar。
以下是完整的步骤。假设您的自定义configprovider完全限定名为
com.my.CustomConfigProvider.MyClass
. 1在以下目录下创建一个文件:meta-inf/services/org.apache.kafka.common.config.configprovider。文件内容是完全限定的类名:com.my.customconfigprovider.myclass包括您的源代码,以及上面的meta inf文件夹以生成jar包。如果您使用的是maven,那么文件结构如下所示
将最后一个jar文件custom-config-provider-1.0.jar放在kafka worker plugin文件夹下。默认值为/usr/share/java。kafka worker配置文件中的插件路径。
将所有依赖jar也上传到plugin\u路径。使用jar文件中的meta info/manifest.mf文件来配置代码将使用的依赖jar的“类路径”。
在kafka worker config文件中,创建两个附加属性:
重新启动工作进程
通过将post curl 到kafka restful api来更新连接器配置文件。在连接器配置文件中,可以引用其中的值
ConfigData
从返回ConfigProvider:get(path, keys)
使用如下语法: