apache flink与kafka:invalidtypesexception

yvfmudvl  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(361)

我有以下代码:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());

FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
                    "test-kafka-topic",
                    new SimpleStringSchema(),
                    properties);

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);

DataStream<String> stringStream = kafkaInputStream
                    .map(new MapFunction<MyCustomClass,String>() {
                        @Override
                        public String map(MyCustomClass message) {
                            logger.info("--- Received message : " + message.toString());
                            return message.toString();
                        }
                    });

streamEnv.execute("Published messages");

mycustomclassdeserializer是用来将字节数组转换成java对象的。
在本地运行此程序时,出现错误:
原因:org.apache.flink.api.common.functions.invalidtypesexception:输入不匹配:应为基本类型。
我得到这个代码行:

.map(new MapFunction<MyCustomClass,String>() {

不知道为什么我会得到这个?

nhjlsmyf

nhjlsmyf1#

所以,您有一个返回pojo的反序列化程序,但是您告诉flink它应该从 byte[]String 通过使用 SimpleStringSchema . 现在看到问题了吗?:)
我不认为你应该使用自定义Kafka反序列化程序 FlinkKafkaConsumer 一般来说。相反,您的目标应该是创建一个扩展 DeserializationSchema 来自Flink。它在类型安全性和可测试性方面应该更好。

相关问题