scala:无法解析重载方法(flink水印策略)

7lrncoxx  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(1269)

我正在关注flink关于如何在kafkaconsumer中使用水印策略的文档。代码如下所示

val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(
  WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))

val stream: DataStream[MyType] = env.addSource(kafkaSource)

每当我试图编译上面的代码时,我都会得到一个错误
错误:重载方法值assigntimestampsandwatermarks和替代项:

error: overloaded method value assignTimestampsAndWatermarks with alternatives:
[ERROR]   (x$1: org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR]   (x$1: org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR]   (x$1: org.apache.flink.api.common.eventtime.WatermarkStrategy[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String]
[ERROR]  cannot be applied to (org.apache.flink.api.common.eventtime.WatermarkStrategy[Nothing])
[ERROR]         consumer.assignTimestampsAndWatermarks(
r7s23pms

r7s23pms1#

下面的代码返回watermarkstrategyy[nothing]而不是watermarkstrategy[string]

WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))

我用这个代码解决了这个问题

val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
watermark: Watermark[String] = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
kafkaSource.assignTimestampsAndWatermarks(watermark)

相关问题