无法扩展flink processfunction

yhxst69z  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(412)

我最近从Flink1.2升级到Flink1.3,我正在尝试更新我的 ProcessFunction 使用1.3。我有一个函数,我正在寻找建立扩展 ProcessFunction 类,但它抛出了一个编译错误,表示我没有重写 processElement 以及 onTimer 以下是我的代码:

class TimeoutStateFunction extends ProcessFunction[ObjectNode, (String, Long)] {
    lazy val state : ListState[CountWithTimestamp] = getRuntimeContext
      .getListState(new ListStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))

     override def processElement(value: ObjectNode, ctx: Context, out: Collector[(String, Long)]): Unit = {
         //Stuff here
     }

     override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
         //More Stuff here
     }
}

下面是我得到的编译错误:

Error:(8, 7) class TimeoutStateFunction needs to be abstract, since method processElement in class ProcessFunction of type (x$1: com.fasterxml.jackson.databind.node.ObjectNode, x$2: org.apache.flink.streaming.api.functions.ProcessFunction[com.fasterxml.jackson.databind.node.ObjectNode,(String, Long)]#Context, x$3: org.apache.flink.util.Collector[(String, Long)])Unit is not defined
class TimeoutStateFunction extends ProcessFunction[ObjectNode, (String, Long)] {

Error:(17, 18) method processElement overrides nothing.
Note: the super classes of class TimeoutStateFunction contain the following, non final members named processElement:
def processElement(x$1: com.fasterxml.jackson.databind.node.ObjectNode,x$2: org.apache.flink.streaming.api.functions.ProcessFunction[com.fasterxml.jackson.databind.node.ObjectNode,(String, Long)]#Context,x$3: org.apache.flink.util.Collector[(String, Long)]): Unit
  override def processElement(value: ObjectNode, ctx: Context, out: Collector[(String, Long)]): Unit = {

Error:(36, 16) method onTimer overrides nothing.
Note: the super classes of class TimeoutStateFunction contain the following, non final members named onTimer:
def onTimer(x$1: Long,x$2: org.apache.flink.streaming.api.functions.ProcessFunction[com.fasterxml.jackson.databind.node.ObjectNode,(String, Long)]#OnTimerContext,x$3: org.apache.flink.util.Collector[(String, Long)]): Unit
   override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {

我目前正在使用scala 2.11和flink 1.3.2

brjng4g3

brjng4g31#

这个 Context 以及 OnTimerContext 取决于 ProcessFunction 以及它的输入和输出类型。
所以这应该有效:

override def processElement(
     value: ObjectNode,
     ctx: ProcessFunction[ObjectNode, (String, Long)]#Context,
     out: Collector[(String, Long)])
   : Unit = {
     //Stuff here
 }

 override def onTimer(
     timestamp: Long,
     ctx: ProcessFunction[ObjectNode, (String, Long)]#OnTimerContext,
     out: Collector[(String, Long)])
   : Unit = {
     //More Stuff here
 }

相关问题