我试图接收字节数组序列化的avro消息与Kafka连接的帮助。用于序列化avro数据的生产者配置
key.serializer-org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer-org.apache.kafka.common.serialization.ByteArraySerializer
hdfs接收器配置
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=csvtopic
hdfs.url=hdfs://10.15.167.119:8020
flush.size=3
locale=en-us
timezone=UTC
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter.schema.registry.url=http://localhost:8081
hive.metastore.uris=thrift://10.15.167.119:9083
hive.integration=true
schema.compatibility=BACKWARD
如果我从hdfs quickstart-hdfs.properties中删除hive integration和format.class,我就能够将数据保存到hdfs中。当配置单元集成被启用时,我收到以下异常堆栈跟踪
java.lang.RuntimeException: org.apache.kafka.connect.errors.SchemaProjectorException: Schema version required for BACKWARD compatibility
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:401)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
如何反序列化从kafka主题接收的字节流并将其保存在配置单元中??
2条答案
按热度按时间gstyhher1#
我看了你的评论和代码。您正在使用bytearrayoutputstream进行编码,kafka connect无法理解这种类型的数据。而是使用下面的方式发送数据。
在发送数据的时候用这个,
在kafka connect配置中,请使用:
oprakyz72#
如果将avro与消息的模式注册表一起使用,则应该使用
AvroConverter
不是ByteArrayConverter
,即: