使用avro格式的json数组并存储在s3中

2jcobegt  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(221)

我有如下json数组:

[{"a":74,"b":1519202998533,"c":"Shipped","d":7318},{"a":11,"b":1519202998546,"c":"Shipped","d":40481}]

我做了一个主题o rd_avro_multiple 我将avro格式定义为

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic ord_avro_multiple --property value.schema='{"type":"array","name":"arrayjson","items":{"type":"record","name":"arraysjs","fields":[{"name":"a","type":"int"},{"name":"b","type":"long"},{"name":"c","type":"string"},{"name":"d","type":"int"}]}}'

当我使用 kafka-connect-s3 当我试图在s3 bucket中使用以下方法编写它时,它给了我一个错误:

[2018-02-21 09:02:51,700] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org.apache.kafka.connect.errors.ConnectException: com.fasterxml.jackson.databind.JsonMappingException: No serializer found for class org.apache.kafka.connect.data.Struct and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: java.util.ArrayList[0])
        at io.confluent.connect.s3.format.json.JsonRecordWriterProvider$1.write(JsonRecordWriterProvider.java:81)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:402)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:204)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: No serializer found for class org.apache.kafka.connect.data.Struct and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: java.util.ArrayList[0])
        at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:284)
        at com.fasterxml.jackson.databind.SerializerProvider.mappingException(SerializerProvider.java:1110)
        at com.fasterxml.jackson.databind.SerializerProvider.reportMappingProblem(SerializerProvider.java:1135)
        at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:69)
        at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:32)
        at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
        at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
        at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:292)
        at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:2493)
        at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:378)
        at io.confluent.connect.s3.format.json.JsonRecordWriterProvider$1.write(JsonRecordWriterProvider.java:77)
        ... 14 more
[2018-02-21 09:02:51,704] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)
[2018-02-21 09:02:51,705] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

如何解决这个问题?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题