我有以下代码:
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());
FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
"test-kafka-topic",
new SimpleStringSchema(),
properties);
final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);
DataStream<String> stringStream = kafkaInputStream
.map(new MapFunction<MyCustomClass,String>() {
@Override
public String map(MyCustomClass message) {
logger.info("--- Received message : " + message.toString());
return message.toString();
}
});
streamEnv.execute("Published messages");
mycustomclassdeserializer是用来将字节数组转换成java对象的。
在本地运行此程序时,出现错误:
原因:org.apache.flink.api.common.functions.invalidtypesexception:输入不匹配:应为基本类型。
我得到这个代码行:
.map(new MapFunction<MyCustomClass,String>() {
不知道为什么我会得到这个?
1条答案
按热度按时间nhjlsmyf1#
所以,您有一个返回pojo的反序列化程序,但是您告诉flink它应该从
byte[]
至String
通过使用SimpleStringSchema
. 现在看到问题了吗?:)我不认为你应该使用自定义Kafka反序列化程序
FlinkKafkaConsumer
一般来说。相反,您的目标应该是创建一个扩展DeserializationSchema
来自Flink。它在类型安全性和可测试性方面应该更好。