flink流:序列化字符串消息中的意外字符

tsm1rwdh  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(406)

我的流正在生成
Tuple2<String,String> .toString() 输出 (usr12345,{"_key":"usr12345","_temperature":46.6}) 钥匙在哪里 usr12345 价值就是 {"_key":"usr12345","_temperature":46.6} 这个 .print() 在流上正确输出值: (usr12345,{"_key":"usr12345","_temperature":46.6}) 但当我把这条流写给Kafka时,关键就变成了 usr12345 (开头有空格)和 ({"_key":"usr12345","_temperature":46.6} 注意键开头的空格和值开头的左括号。
很奇怪。为什么会这样?
以下是序列化代码:

TypeInformation<String> resultType = TypeInformation.of(String.class);

KeyedSerializationSchema<Tuple2<String, String>> schema =
      new TypeInformationKeyValueSerializationSchema<>(resultType, resultType, env.getConfig());

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
      stream,   
      "topic",    
      schema,  
      kafkaProducerProperties);
v9tzhpje

v9tzhpje1#

这个 TypeInformationKeyValueSerializationSchema 使用flink的自定义序列化程序序列化数据,这意味着结果必须解释为二进制数据。flink的字符串序列化程序写入字符串的长度,然后对所有字符进行编码。
我假设您使用纯字符串反序列化器反序列化kafka主题。对于键,序列化长度被解释为空白字符。对于值,长度解释为 '(' .
尝试使用其他序列化程序将键和值序列化为普通字符串,或使用兼容的反序列化程序。

相关问题