在Flink中使用KafkaSource时出错:“方法setDeserializer(...)不适用于参数...”

db2dz4w8  于 2023-09-28  发布在  Apache
关注(0)|答案(1)|浏览(284)

我正在尝试使用Flink中的KafkaSource从Kafka主题中读取数据,而不是弃用的FlinkKafkaConsumer。下面是我的代码:

// Use KafkaSource instead of FlinkKafkaConsumer
DataStream<String> patientData = env.fromSource(
        KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setGroupId("stroke-risk-group")
                .setTopics("patient-data-topic")
                .setProperties(properties)
                .setDeserializer(new SimpleStringSchema())
                .setStartingOffsets(StartupMode.EARLIEST)
                .build(),
        "Kafka Source");

但是,我得到一个错误,它说:
“类型KafkaSourceBuilder中的方法setDeserializer(KafkaRecordDeserializationSchema)不适用于参数(SimpleStringSchema)”
更深入地查看代码:https://github.com/IshaanAdarsh/healthcare-analytics/blob/main/icu-vsm/src/main/java/hes/cs63/CEPMonitor/StrokeRiskAlarm.java
我不知道如何解决这个问题。有人能解释一下为什么我会得到这个错误,以及如何修复它吗?谢谢你,谢谢!

wz3gfoph

wz3gfoph1#

你应该使用

.setValueOnlyDeserializer(new SimpleStringSchema())

而不是setDeserializer。当您使用KafkaRecordDeserializationSchema时,setDeserializer是合适的,但在本例中,您使用的是DeserializationSchema
不同之处在于,KafkaRecordDeserializationSchema反序列化完整的Kafka记录对象--除了记录的值部分之外,还可能使用记录的键、头和其他元数据进行反序列化--而DeserializationSchema只查看值。

相关问题