有没有办法使用Kafka模式注册表没有魔术字节?

wko9yo5t  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(380)

我试图使用confluent的schema注册表使我的应用程序工作,但此时我并不完全控制生产者,您甚至可以将它们视为不绑定到confluent产品的遗留应用程序。
我正在查看合并信息,似乎所有消息都应该在有效负载中包含一个神奇的字节和模式id
https://docs.confluent.io/3.2.0/schema-registry/docs/serializer-formatter.html
或者当我尝试使用它时,我会得到一个错误:

[2020-09-25 13:12:09,008] ERROR WorkerSinkTask{id=s3_parquet_connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:491)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic com.obj_pos to Protobuf: 
            at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:123)
            at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:491)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
            ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2020-09-25 13:12:09,010] ERROR WorkerSinkTask{id=s3_parquet_connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

我的问题是,是否有一种方法可以禁用这个神奇的字节检查,或者我可以创建一个kafka流,只在初始消息中附加一个this5字节,这样之后我就可以和一个连接到schema注册表的使用者一起使用它。
所发生的事情是,生产者是我无法控制的,所以我需要以某种方式能够反序列化消息,不包含这5个字节,因为它们是由生产者不依赖汇合序列化/反序列化

fwzugrvs

fwzugrvs1#

它们是由不依赖合流序列化程序的生产者生成的
那么问题不在注册处。
您不应该使用confluent编写的转换器来使用这些消息,因为这些消息绑定到注册表,因此无法跳过它。
您可以改为使用blueporator类(假设数据是protobuf),或者编写自己的转换器类。

相关问题