使用以下配置创建接收器连接器时
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-west-2
topics.dir=topics
flush.size=3
schema.compatibility=NONE
topics=my_topic
tasks.max=1
s3.part.size=5242880
format.class=io.confluent.connect.s3.format.avro.AvroFormat
# added after comment
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=my-bucket
运行它我得到以下错误
org.apache.kafka.connect.errors.DataException: coyote-test-avro
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 91319
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:192)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:121)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
但是我的主题有一个模式,我可以看到使用docker容器提供的ui landoop/fast-data-dev
. 即使我尝试将原始数据写入s3
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat
storage.class=io.confluent.connect.s3.storage.S3Storage
schema.compatibility=NONE
以及移除 schema.generator.class
,同样的错误出现了,即使我认为这不应该使用avro模式。
为了能够写入s3,我设置了环境变量 AWS_ACCESS_KEY_ID
以及 AWS_SECRET_ACCESS_KEY
在我的容器,但似乎无论如何,问题来之前,这一点。
我想版本可能有问题,如上所述,我使用的是容器 landoop/fast-data-dev
在docker机器中(它在mac本机docker机器中不工作),生产和消费完美地工作。这是关于部分
我查看了连接日志,但找不出任何有用的信息,但是如果您能告诉我应该查找什么,我将添加相关行(所有日志都太大)
1条答案
按热度按时间3b6akqbq1#
每个主题消息都必须编码为avro,由schema注册表指定。
转换器查看原始Kafka数据(键和值)的字节2-5,将其转换为整数(在您的情况下,是错误中的id),并在注册表中进行查找。
如果不是avro,或者是其他坏数据,那么要么是这里的错误,要么是关于的错误
invalid magic byte
.这个错误不是连接错误。如果添加
print-key
财产。假设是这样,一种解决方案是将密钥serde更改为使用字节数组反序列化器,以便跳过avro查找
否则,由于您无法删除kafka中的消息,因此这里唯一的解决方案是找出生产者发送错误数据的原因,修复它们,然后将connect consumer组移动到具有有效数据的最新偏移量,等待无效数据在主题上过期,或者完全移动到新主题