我有一个连接器配置来在集群之间复制一个主题。这两个集群都有自己的合流模式注册表(我知道这不是推荐的做法)。为了保持模式的同步,我打算将avro转换器用于源和目标转换(仅用于值)。
配置如下(用于通过RESTAPI发布的json):
{
"connector.name": "my_replicator_connector",
"connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
"topic.whitelist": "topic.source",
"topic.rename.format": "topic.dest",
"topic.auto.create": "true",
"src.key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"src.value.converter": "io.confluent.connect.avro.AvroConverter",
"src.value.converter.schema.registry.url": "https://my-schema-reg-source",
"key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://my-schema-reg-dest",
"src.kafka.bootstrap.servers": "my-src-bootstrap-server",
"offset.translator.tasks.max": 0
}
当我将配置发布到以分布式模式运行的连接服务器时,连接器会失败,并出现以下跟踪:
“io.confluent.common.config.configexception:配置键的值io.confluent.kafka.serializers.subject.topicnamestrategy无效。subject.name.strategy:找不到类io.confluent.kafka.serializers.subject.topicnamestrategy。\n\t io.confluent.common.configdef.parsetype(configdef)。java:392)\否\tio.confluent.common.config.configdef.define(configdef。java:110)\n\t io.confluent.common.config.configdef.define(configdef。java:154)\n\t io.confluent.kafka.serializers.abstractkafkaschemaserdeconfig.baseconfigdef(abstractkafkaschemaserdeconfig。java:146)\n\t io.confluent.connect.avro.avroconverterconfig.(avroconverterconfig。java:27)\否\tio.confluent.connect.avro.avroconverter.configure(avroconverter。java:64)\n\t io.confluent.connect.replicator.replicatorsourceconnectorconfig.getdataconverter(replicatorsourceconnectorconfig。java:800)\n\t io.confluent.connect.replicator.replicatorsourceconnectorconfig.getsourcevalueconverter(replicatorsourceconnectorconfig。java:809)\否\tio.confluent.connect.replicator.replicatorsourcetask.start(replicatorsourcetask。java:278)\n\t org.apache.kafka.connect.runtime.workersourcetask.execute(workersourcetask。java:208)\n\t org.apache.kafka.connect.runtime.workertask.dorun(工作任务。java:177)\n\t org.apache.kafka.connect.runtime.workertask.run(workertask。java:227)\否\tjava.util.concurrent.executors$runnableadapter.call(executors。java:511)\n\t java.util.concurrent.futuretask.run(futuretask。java:266)\n\t java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)\n\t java.util.concurrent.threadpoolexecutor$worker.run(线程池执行器。java:624)\否\tjava.lang.thread.run(线程。java:748)\n“
我可以使用合流工具在本地以独立模式成功运行配置。很明显,我的连接服务器在某些方面配置错误。
连接服务器版本为 5.4.2-ccs
replicatorsourceconnector版本为 5.5.0
但是,我可以在服务器的其他连接器中成功地使用avro转换器。我甚至可以在复制器中将它成功地用作dest转换器,但不能用作src转换器。所以 io.confluent.kafka.serializers.subject.TopicNameStrategy
类可用于加载。但它似乎不能由连接器的源部分加载。
这段有问题的代码(configdef.parsetype())在git的历史中有一些反复,关于如何加载类,最新的讨论如下:https://github.com/confluentinc/common/pull/241 这似乎有关系,但我不知道怎么回事。
问题是为什么会发生这种情况,以及如何解决?
暂无答案!
目前还没有任何答案,快来回答吧!