我正在使用kafka流来处理avro消息。因为我将使用来自外部系统的avro消息,所以我已经为此编写了一个producer。我已经通过maven avro插件生成了我的模型avro类。我得到一个classcast异常(第44行):
genericdata$record无法转换为org.apache.avro.specific.specificrecordbase
在我的反序列化程序代码中。
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializer.class);
protected final Class<T> targetType;
public AvroDeserializer(Class<T> targetType) {
this.targetType = targetType;
}
public void close() {
}
public void configure(Map<String, ?> arg0, boolean arg1) {
}
public T deserialize(String topic, byte[] data) {
try {
T result = null;
if (data != null) {
LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
DatumReader<T> datumReader = new SpecificDatumReader(((SpecificRecordBase)this.targetType.newInstance()).getSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(data, (BinaryDecoder)null);
result = (T)datumReader.read((T)null, decoder);
LOGGER.debug("deserialized data='{}'", result);
}
return result;
} catch (Exception var6) {
throw new SerializationException("Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", var6);
}
}
}
对于producer,我使用以下序列化程序:
public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroSerializer.class);
@Override
public void close() {
// No-op
}
@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
// No-op
}
@Override
public byte[] serialize(String topic, T data) {
try {
byte[] result = null;
if (data != null) {
LOGGER.debug("data='{}'", data);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder =
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
datumWriter.write(data, binaryEncoder);
binaryEncoder.flush();
byteArrayOutputStream.close();
result = byteArrayOutputStream.toByteArray();
LOGGER.debug("serialized data='{}'", DatatypeConverter.printHexBinary(result));
}
return result;
} catch (IOException ex) {
throw new SerializationException(
"Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
}
}
}
我试过使用其他地方提到的avrodeserializer,但没有用。
暂无答案!
目前还没有任何答案,快来回答吧!