检索avro架构时出错-找不到主题

cigdeys3  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(384)

我有一个主题,最终会有很多不同的模式。现在它只有一个。我通过rest创建了一个连接作业,如下所示:

{
 "name":"com.mycompany.sinks.GcsSinkConnector-auth2",
 "config": {
    "connector.class": "com.mycompany.sinks.GcsSinkConnector",
    "topics": "auth.events",
    "flush.size": 3,
    "my.setting":"bar",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "key.deserializer":"org.apache.kafka.common.serialization.StringDerserializer",
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry-service:8081",
    "value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "group.id":"account-archiver"

 }
}

然后,我用一个字符串键和一个avro序列化负载将一条消息推送到该主题。如果我检查控制中心中的主题,我会看到正确的反序列化数据通过。查看connect示例的输出,尽管我在日志中看到了这一点

RROR WorkerSinkTask{id=com.mycompany.sinks.GcsSinkConnector-auth2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic auth.events to Avro:
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 7
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

从这里可以看出,有两个相关的问题:
Error retrieving Avro schema for id 7 Subject not found.; error code: 40401 让我困扰的是,我已经将策略指定为recordnamestrategy,我认为应该使用magic byte来获取schema,而不是主题名,但是没有找到主题错误。我不确定它实际上是在寻找主题名还是通过id获取模式 http://schema-registry-service:8081/schemas/ids/7 我确实得到了返回的模式。这个堆栈跟踪上面还有一些额外的日志记录,令人失望的是,它仍然使用错误的名称策略:

INFO AvroConverterConfig values:
    schema.registry.url = [http://schema-registry-service:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = false
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

有人知道怎么解决这个问题吗?我正在使用以下图像:
合流公司/cpKafka-connect:5.2.0
汇合公司/cp-kafka:5.1.0
谢谢

zbq4xfa0

zbq4xfa01#

在痕迹里, lookUpSubjectVersion 意味着它试图在 /subjects/:name/versions 对于列出的每个id,则找不到 schemaId=7 (注:not version=7),尽管从日志中不太清楚 :name 它试图用在这里,但如果没有找到,那么你会得到你的 Subject not found 错误。如果我的公关被接受,主题名称会更清楚
我相信这可能是因为 RecordNameStrategy . 查看该属性的pr,我发现它实际上只针对生产者/消费者代码进行了测试,而没有完全在connectapi中进行测试。与违约行为相比 TopicNameStrategy 你可以看到它试图使用

value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

但仔细看,我想你可能配置错了。
你怎么会 value.converter.schema.registry.url ,您实际上需要设置 value.converter.value.subject.name.strategy 相反。

相关问题