我启动了zookeeper,kafka服务器,kafka生产者和kafka消费者和我把从confluent下载的jdbc sql连接器jar放在路径中,我在connect standalone properties中提到了plugin.path,我运行了connect-standalone.bat….\config\connect-standalone.properties….\config\sink-quickstart-mysql.properties,没有任何错误,但是它有很多错误警告和它不是开始,但我的数据没有得到反映在表中。我错过了什么?你能帮我解决我有下面的警告吗
org.reflections.ReflectionsException: could not get type for name io.netty.inter
nal.tcnative.SSLPrivateKeyMethod
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
at org.reflections.Reflections.<init>(Reflections.java:140)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$Inte
rnalReflections.<init>(DelegatingClassLoader.java:433)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scan
PluginPath(DelegatingClassLoader.java:325)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scan
UrlsAndAddPlugins(DelegatingClassLoader.java:261)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.init
PluginLoader(DelegatingClassLoader.java:209)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.init
Loaders(DelegatingClassLoader.java:202)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.jav
a:60)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone
.java:79)
Caused by: java.lang.ClassNotFoundException: io.netty.internal.tcnative.SSLPriva
teKeyMethod
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
... 9 more
1条答案
按热度按时间ymzxtsji1#
不需要自己编写源连接器,除非您需要将kafka连接到某个外来数据源。像mysql这样的流行工具已经被很好地覆盖了。confluent已经有了一个“jdbc连接器”,它可以满足您的需要。
https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html
您将需要一个正常工作的kafka connect安装,然后您可以使用kafka connect api的http post将mysql表“连接”到kafka。只需指定一个逗号分隔的列表,其中列出了您希望用作
tables.whitelist
属性。例如,像这样的事情。。。。