kafka connect s3 sink在加载avro时抛出illegalargumentexception

qni6mghb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(390)

我使用qubole的s3接收器将avro数据以Parquet格式加载到s3中。
在java应用程序中,我创建了一个producer

Properties props = new Properties();
props.put("bootstrap.servers", KafkaHelper.getServers());
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<byte[], byte[]>(props);

然后转换一个 GenericRecord 进入 byte[] 格式:

GenericRecord avroRecord = new GenericData.Record(avroSchema);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema);

for (Map.Entry<String, ?> entry : map.entrySet()) {
    String key = entry.getKey();
    Object value = entry.getValue();
    avroRecord.put(key, value);
}

ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, recordInjection.apply(avroRecord));
producer.send(record);

我在kafka connect属性中使用以下值:

key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

以及“我的文件接收器属性”中的以下配置选项:

connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

当我运行连接器时,会收到以下错误消息:“java.lang.illegalargumentexception:avro schema must be a record”。
我对kafka connect还很陌生,我知道可以设置一个模式注册表服务器——但我不知道sink是否需要注册表来将avro数据转换为parquet,或者这是否是某种格式或配置问题。在这个错误的上下文中,“记录”指的是什么样的数据格式?任何指示或帮助将不胜感激。

p8ekf7hl

p8ekf7hl1#

这个 ByteArrayConverter 不会执行任何数据转换:它假定连接器知道如何处理原始数据,而不是实际执行任何序列化/反序列化 byte[] 数据。然而 ParquetFormat (事实上大多数格式)不能只处理原始数据。相反,他们希望数据被反序列化并结构化为记录(您可以将其看作c结构、pojo等)。
请注意,qubole streamx自述文件指出 ByteArrayConverter 在可以安全地直接复制数据的情况下非常有用。例如,如果数据是json或csv格式的。这些不需要反序列化,因为每个kafka记录值的字节可以简单地复制到输出文件中。在这些情况下,这是一个很好的优化,但通常不适用于所有输出文件格式。

相关问题