我正在尝试使用Flink中的KafkaSource从Kafka主题中读取数据,而不是弃用的FlinkKafkaConsumer。下面是我的代码:
// Use KafkaSource instead of FlinkKafkaConsumer
DataStream<String> patientData = env.fromSource(
KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setGroupId("stroke-risk-group")
.setTopics("patient-data-topic")
.setProperties(properties)
.setDeserializer(new SimpleStringSchema())
.setStartingOffsets(StartupMode.EARLIEST)
.build(),
"Kafka Source");
但是,我得到一个错误,它说:
“类型KafkaSourceBuilder中的方法setDeserializer(KafkaRecordDeserializationSchema)不适用于参数(SimpleStringSchema)”
更深入地查看代码:https://github.com/IshaanAdarsh/healthcare-analytics/blob/main/icu-vsm/src/main/java/hes/cs63/CEPMonitor/StrokeRiskAlarm.java
我不知道如何解决这个问题。有人能解释一下为什么我会得到这个错误,以及如何修复它吗?谢谢你,谢谢!
1条答案
按热度按时间wz3gfoph1#
你应该使用
而不是
setDeserializer
。当您使用KafkaRecordDeserializationSchema
时,setDeserializer
是合适的,但在本例中,您使用的是DeserializationSchema
。不同之处在于,
KafkaRecordDeserializationSchema
反序列化完整的Kafka记录对象--除了记录的值部分之外,还可能使用记录的键、头和其他元数据进行反序列化--而DeserializationSchema
只查看值。