由以下原因引起:org.apache.kafka.common.errors.serializationexception:注册avro架构时出错

mnowg1ta  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(312)

我有一个管道流,我将debezium cdc mysql connector从confluent平台连接到confluent云,因为云内置的debezium mysql connector正在预览中,我已经成功地建立了连接,来自主题的消息由s3接收器连接器订阅。最初我的流是json格式的,但后来我希望它是avro格式的,因此我更改了键和值转换器的连接器配置文件,如下所示:
debezium连接器:

{
    "name":"mysql_deb3",
    "config":{
       "connector.class":"io.debezium.connector.mysql.MySqlConnector",
       "tasks.max":"1",
       "database.hostname":"host_name",
       "database.port":"3306",
       "database.user":"user_name",
       "database.password":"password",
       "database.server.id":"123456789",
       "database.server.name": "server_name",
       "database.whitelist":"db_name",
       "database.history.kafka.topic":"dbhistory.db_name",
       "include.schema.changes": "true",
       "table.whitelist": "db_name.table_name",
       "tombstones.on.delete": "false",
       "key.converter": "io.confluent.connect.avro.AvroConverter",
       "value.converter": "io.confluent.connect.avro.AvroConverter",
       "key.converter.schema.registry.url": "cloud_schema_registry_endpoint",
       "value.converter.schema.registry.url": "cloud_schema_registry_endpoint",
       "key.converter.schema.registry.basic.auth.user.info":"schema_registry_api_key:schema_registry_api_secret",
       "value.converter.schema.registry.basic.auth.user.info":"schema_registry_api_key:schema_registry_api_secret",
       "decimal.handling.mode": "double",
       "transforms": "unwrap",
       "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
       "transforms.unwrap.drop.tombstones": "true",
       "transforms.unwrap.delete.handling.mode": "rewrite",
"database.history.kafka.bootstrap.servers":"confluent_cloud_kafka_server_endpoint:9092",
"database.history.consumer.security.protocol":"SASL_SSL",
"database.history.consumer.ssl.endpoint.identification.algorithm":"https",
"database.history.consumer.sasl.mechanism":"PLAIN",
"database.history.consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"cloud_kafka_api\" password=\"cloud_kafka_api_secret\";",
"database.history.producer.security.protocol":"SASL_SSL",
"database.history.producer.ssl.endpoint.identification.algorithm":"https",
"database.history.producer.sasl.mechanism":"PLAIN",
"database.history.producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"cloud_kafka_api\" password=\"cloud_kafka_api_secret\";",
    }
 }

####################################################################

connect-distributed.properties:连接-分布属性:

bootstrap.servers=confluent_cloud_kafka_server_endpoint:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";
request.timeout.ms=20000
retry.backoff.ms=500

producer.bootstrap.servers=confluent_cloud_kafka_server_endpoint:9092
producer.ssl.endpoint.identification.algorithm=https
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500

consumer.bootstrap.servers=confluent_cloud_kafka_server_endpoint:9092
consumer.ssl.endpoint.identification.algorithm=https
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500

offset.flush.interval.ms=10000
group.id=connect-cluster
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3

schema.registry.url=https://cloud_schema_registry_endpoint
schema.registry.basic.auth.user.info=<schema_registry_api_key>:<schema_registry_api_secret>

#################################################

--我通过-->bin/connect distributed etc/connect-distributed.properties启动kafka connect
--connect启动良好,但是当我尝试使用curl命令加载debezium连接器时,会显示以下错误“unauthorized”,但是我给出的api密钥和机密是正确的,我也使用cli手动检查了它。
原因:org.apache.kafka.connect.errors.dataexception:在io.confluent.connect.avro.avroconverter.fromconnectdata(avroconverter)处暂存开发rds群集。java:78)位于org.apache.kafka.connect.runtime.workersourcetask.lambda$convertedtransformedrecord$1(workersourcetask)。java:266)在org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execandretry(retrywithtoleranceoperator)。java:128)位于org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execandhandleerror(retrywithtoleranceoperator)。java:162) ... 11更多原因:org.apache.kafka.common.errors.serializationexception:注册avro架构时出错:{“type”:“record”,“name”:“schemachangekey”,“namespace”:“io.debezium.connector.mysql”,“fields”:[{“name”:“databasename”,“type”:“string”}],“connect.name”:“io.debezium.connector.mysql.schemachangekey”}原因:io.confluent.kafka.schemaregistry.client.rest.exceptions.restclientexception:unauthorized;错误代码:401 at io.confluent.kafka.schemaregistry.client.rest.restservice.sendhttprequest(restservice)。java:209)at io.confluent.kafka.schemaregistry.client.rest.restservice.httprequest(restservice。java:235)在io.confluent.kafka.schemaregistry.client.rest.restservice.registerschema(restservice。java:326)在io.confluent.kafka.schemaregistry.client.rest.restservice.registerschema(restservice。java:318)在io.confluent.kafka.schemaregistry.client.rest.restservice.registerschema(restservice。java:313)在io.confluent.kafka.schemaregistry.client.cachedschemaregistryclient.registerandgetid(cachedschemaregistryclient。java:119)在io.confluent.kafka.schemaregistry.client.cachedschemaregistryclient.register(cachedschemaregistryclient)。java:156)位于io.confluent.kafka.serializers.abstractkafkaavroserializer.serializeimpl(abstractkafkaavroserializer)。java:79)在io.confluent.connect.avro.avroconverter$serializer.serialize(avroconverter。java:117)在io.confluent.connect.avro.avroconverter.fromconnectdata(avroconverter。java:76)位于org.apache.kafka.connect.runtime.workersourcetask.lambda$convertedtransformedrecord$1(workersourcetask)。java:266)位于org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execandretry(retrywithtoleranceoperator)。java:128)在org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execandhandleerror(retrywithtoleranceoperator)。java:162)位于org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execute(retrywithtoleranceoperator)。java:104)在org.apache.kafka.connect.runtime.workersourcetask.converttransformedrecord(workersourcetask。java:266)在org.apache.kafka.connect.runtime.workersourcetask.sendrecords(workersourcetask。java:293)在org.apache.kafka.connect.runtime.workersourcetask.execute(workersourcetask。java:228)在org.apache.kafka.connect.runtime.workertask.dorun(workertask。java:175)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:219)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748) [2020-11-30 05:30:47389]错误workersourcetask{id=mysql\u deb3-0}任务被终止,在手动重新启动之前不会恢复(org.apache.kafka.connect.runtime.workertask:178)[2020-11-30 05:30:47389]信息停止连接器(io.debezium.connector.common.base)ourcetask:187) [2020-11-30 05:30:47,389]停止mysql连接器任务的信息(io.debezium.connector.mysql.mysqlconn)ectortask:458)
请大家帮个忙。提前谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题