我正在使用 spring-intergration-kafka
发送和接收Kafka的信息。消息对象基于泛型类型。基类看起来像这样
public abstract class AbstractMessage<T> implements Serializable {
...
}
我想能够发送和接收这个类的不同实现。我使用的是基于avro反射的编码器和解码器
<bean id="kafkaMessageEncoder"
class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
<constructor-arg value="com.....model.AbstractMessage" />
</bean>
<bean id="kafkaMessageDecoder"
class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
<constructor-arg value="com.....model.AbstractMessage" />
</bean>
此操作失败并出现错误
avrotypeexception:未知类型:t
这是有意义的,因为avro无法找出abstractmessage类中指定的泛型类型,但后来我决定使用自定义编码器和解码器。
public class MessageEncoder implements Encoder<Object> {
@Override
public byte[] toBytes(Object object) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(object);
oos.flush();
oos.close();
return baos.toByteArray();
} catch (Exception e) {
e.printStackTrace();
}
return "".getBytes();
}
}
public class MessageDecoder implements Decoder<Object> {
@Override
public Object fromBytes(byte[] bs) {
try {
ObjectInputStream bais = new ObjectInputStream(new ByteArrayInputStream(bs));
AbstractMessage<?> message = (AbstractMessage<?>) bais.readObject();
return message;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
这个很好用。
我想我的问题是我在这里没有做什么特别的事情。为什么avro不能做同样的事情,并且有不同的avrod/编码器可以序列化/反序列化我的泛型类型对象呢。
谢谢
暂无答案!
目前还没有任何答案,快来回答吧!