我的流正在生成
Tuple2<String,String> .toString()
输出 (usr12345,{"_key":"usr12345","_temperature":46.6})
钥匙在哪里 usr12345
价值就是 {"_key":"usr12345","_temperature":46.6}
这个 .print()
在流上正确输出值: (usr12345,{"_key":"usr12345","_temperature":46.6})
但当我把这条流写给Kafka时,关键就变成了 usr12345
(开头有空格)和 ({"_key":"usr12345","_temperature":46.6}
注意键开头的空格和值开头的左括号。
很奇怪。为什么会这样?
以下是序列化代码:
TypeInformation<String> resultType = TypeInformation.of(String.class);
KeyedSerializationSchema<Tuple2<String, String>> schema =
new TypeInformationKeyValueSerializationSchema<>(resultType, resultType, env.getConfig());
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
stream,
"topic",
schema,
kafkaProducerProperties);
1条答案
按热度按时间v9tzhpje1#
这个
TypeInformationKeyValueSerializationSchema
使用flink的自定义序列化程序序列化数据,这意味着结果必须解释为二进制数据。flink的字符串序列化程序写入字符串的长度,然后对所有字符进行编码。我假设您使用纯字符串反序列化器反序列化kafka主题。对于键,序列化长度被解释为空白字符。对于值,长度解释为
'('
.尝试使用其他序列化程序将键和值序列化为普通字符串,或使用兼容的反序列化程序。