mongo kafka连接源

vd2z7a6w  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(463)

我使用kafka connect从mongo读取数据并将其写入kafka主题。
我正在使用mongo kafka源连接器。
我得到以下错误:

ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:115)
java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
    at com.mongodb.kafka.connect.source.MongoSourceConfig.createConfigDef(MongoSourceConfig.java:209)
    at com.mongodb.kafka.connect.source.MongoSourceConfig.<clinit>(MongoSourceConfig.java:138)
    at com.mongodb.kafka.connect.MongoSourceConnector.config(MongoSourceConnector.java:56)
    at org.apache.kafka.connect.connector.Connector.validate(Connector.java:129)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:282)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:109)
Caused by: java.lang.ClassNotFoundException: com.mongodb.ConnectionString
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
    at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
    ... 7 more

jar里好像有一堆乱七八糟的东西。为了得到jar,我使用了两种不同的方法,但是得到了相同的错误。首先我使用下载fro:maven存储库,然后从github repo克隆源代码,然后我自己构建jar。我把jar推到plugins.path。当我解压生成的jar并遍历calss时,我找不到提到的类:com.mongodb.connectionstring
我使用了以下配置文件
worker.properties属性:

rest.port=18083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins

# (connectors, converters, transformations). The list should consist of top level directories that include

# any combination of:

# a) directories immediately containing jars with plugins and their dependencies

# b) uber-jars with plugins and their dependencies

# c) directories immediately containing the package directory structure of classes of plugins and their dependencies

# Note: symlinks will be followed to discover dependencies or plugins.

# Examples:

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

plugin.path=/usr/share/java/plugins

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=127.0.0.1:9092

mongo-connector.properties(mongo-connector.properties属性):

name=mongo
tasks.max=1
connector.class =com.mongodb.kafka.connect.MongoSourceConnector
database=
collection=alerts
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter

topic.prefix=someprefix
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options

pipeline=[]
batch.size=0
change.stream.full.document=updateLookup

然后我通过以下命令启动连接器:

/usr/local/kafka/bin/connect-standalone.sh worker.properties mongo-connector.properties

你知道怎么解决吗

v8wbuo2f

v8wbuo2f1#

我正在创建这个答案,因为我花了一些时间来找出解决方案,正如scalacode所指出的,最简单的解决方案是从confluent下载jar,而不是从maven下载。
https://www.confluent.io/hub/mongodb/kafka-connect-mongodb

csga3l58

csga3l582#

必须将连接器的jar文件放在 plugin.path 你的情况是 /usr/share/java/plugins .
这些说明已经存在于confluent的文档中:
kafka connect插件是:
一个uberjar,在一个jar文件中包含插件的所有类文件和它的第三方依赖项;或者文件系统上包含插件及其第三方依赖项的jar文件的目录。但是,插件不应该包含kafka connect运行时提供的任何库。
kafka connect使用其插件路径查找插件,该路径是kafka connect的worker配置中定义的以逗号分隔的目录列表。要安装插件,请将插件目录或uberjar(或解析为其中一个的符号链接)放置在插件路径上列出的目录中,或者更新插件路径以包含包含插件的目录的绝对路径。

相关问题