Kafka流媒体中的节俭序列化与反序列化

ibrsph3r  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(435)

我使用thrift只是为了从kafka流式传输字节数据时提高性能而进行序列化和反序列化
当我反序列化时,会不断出现以下错误:
org.apache.thrift.protocol.tprotocolexception:无法识别的类型123
我的代码

public void streamMessageByte() {   
    final StreamsBuilder builder = new StreamsBuilder();
    <Integer, byte[]> stream = builder.stream(kafka_topic);
    deserializer = new TDeserializer();
    serializer = new TSerializer();
    //Thrift class pojo object is 'deser' which matches byte array data format
    stream.map((k,v){

        try{
            deserializer.deserialize(deser, v);
        }

        catch(TException e){

        }
    null;
});
bqf10yzr

bqf10yzr1#

我在使用不同的协议进行序列化和反序列化时遇到了这个问题。
序列化程序正在使用 ObjectMapper 反序列化程序正在使用 TDeserializerTBinaryProtocol . 例子:

@Test
public void testSerDe() throws TException, JsonProcessingException {
   final Person person = new Person("Thomas", Byte.valueOf("23"));
   JsonSerializer serializer = new JsonSerializer();
   ObjectMapper mapper = new ObjectMapper();
   byte[] serialized = mapper.writeValueAsString(person).getBytes();
   TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
   Person desPerson = new Person();
   deserializer.deserialize(desPerson, serialized);
   assertEquals(person, desPerson);
}

那会扔 org.apache.thrift.protocol.TProtocolException: Unrecognized type 123 如果以相同的方式序列化和反序列化,它应该可以工作。举个例子:

@Test
public void testSerDe() throws TException {
   final Person person = new Person("Thomas", Byte.valueOf("23"));
   TSerializer serializer = new TSerializer(TBinaryProtocol::new);
   TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
   byte[] serializedPerson = serializer.serialize(person);
   GraphEvent dePerson = new Person();
   deserializer.deserialize(dePerson, serializedPerson);
   assertEquals(person, dePerson);
}

相关问题