kafka:avrodeserializer中的classcastexception

zdwk9cvp  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(216)

我正在使用kafka流来处理avro消息。因为我将使用来自外部系统的avro消息,所以我已经为此编写了一个producer。我已经通过maven avro插件生成了我的模型avro类。我得到一个classcast异常(第44行):
genericdata$record无法转换为org.apache.avro.specific.specificrecordbase
在我的反序列化程序代码中。

public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializer.class);
    protected final Class<T> targetType;

    public AvroDeserializer(Class<T> targetType) {
        this.targetType = targetType;
    }

    public void close() {
    }

    public void configure(Map<String, ?> arg0, boolean arg1) {
    }

    public T deserialize(String topic, byte[] data) {
        try {
            T result = null;
            if (data != null) {
                LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
                DatumReader<T> datumReader = new SpecificDatumReader(((SpecificRecordBase)this.targetType.newInstance()).getSchema());
                Decoder decoder = DecoderFactory.get().binaryDecoder(data, (BinaryDecoder)null);
                result = (T)datumReader.read((T)null, decoder);
                LOGGER.debug("deserialized data='{}'", result);
            }

            return result;
        } catch (Exception var6) {
            throw new SerializationException("Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", var6);
        }
    }
}

对于producer,我使用以下序列化程序:

public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {

  private static final Logger LOGGER = LoggerFactory.getLogger(AvroSerializer.class);

  @Override
  public void close() {
    // No-op
  }

  @Override
  public void configure(Map<String, ?> arg0, boolean arg1) {
    // No-op
  }

  @Override
  public byte[] serialize(String topic, T data) {
    try {
      byte[] result = null;

      if (data != null) {
        LOGGER.debug("data='{}'", data);

        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
            EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);

        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
        datumWriter.write(data, binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close();

        result = byteArrayOutputStream.toByteArray();
        LOGGER.debug("serialized data='{}'", DatatypeConverter.printHexBinary(result));
      }
      return result;
    } catch (IOException ex) {
      throw new SerializationException(
          "Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
    }
  }
}

我试过使用其他地方提到的avrodeserializer,但没有用。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题