有人在flink中有通用processfunction的例子吗?

qmb5sa22  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(488)

所谓“泛型”,我的意思是能够接受任何类型的对象作为输入,并返回相同的对象作为输出。
假设函数的任务是将每个元素序列化为json,并将其作为一个边输出写入。

class MyProcessFunction() extends ProcessFunction[? , ?] {

    def processElement(element: ?, ctx: ProcessFunction[?, ?]#Context, out: Collector[?]): Unit = ??? 

    ... 
}

我是否可以这样定义它:它将被不同类型的输入使用?

iswrvxsc

iswrvxsc1#

你可以通过使你的类成为泛型来做到这一点。所以,你会有这样的结果:

class MyProcessFunction[T] extends ProcessFunction[T, T] {
  override def processElement(value: T, ctx: ProcessFunction[T, T]#Context, out: Collector[T]): Unit = ???
}

这样,您就可以在创建函数示例时确定类型。

相关问题