flink流媒体应用功能

w46czmvw  于 2021-06-25  发布在  Flink
关注(0)|答案(3)|浏览(423)

我对flink和streaming也是新手。我想将每个分区的某个函数应用于流的每个窗口(使用事件时间)。到目前为止,我所做的是:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val inputStream = env.readTextFile("dataset.txt")
      .map(transformStream(_))
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.id)
      .timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep))

def transformStream(input: String): EventStream = {...}

case class EventStream(val eventTime: Long, val id: String, actualEvent: String)

我想做的是对每个窗口批处理的每个分区应用一个通用函数,也许应用一个复杂的处理算法或类似的东西。我见过这个方法在datastreamapi中的应用,但我不明白它是如何工作的。在flink api中,它表示在scala中的用法如下:

inputStream.apply { WindowFunction }

有人能解释一下apply方法是做什么的或者它是如何使用的吗?最好用scala作为例子。应用方法是否符合我的要求?

3lxsmp7m

3lxsmp7m1#

结果它需要一点scala魔法。到目前为止,我所做的是:

val test: DataStream[Long] = inputStream.apply(processPartition(_,_,_,_))

    def processPartition(key: String, window: TimeWindow,
                         batch: Iterable[EventStream],
                         out: Collector[Long]): Unit =  {..}

根据我的实验,processpartition方法在整个批处理上应用了一个“key partitioned”(批处理将只包含具有相同key的元素)的函数。我从javaapi获取了这个方法的参数。如果有人能详细介绍一下apply函数及其工作原理,那将非常有用。

hgc7kmma

hgc7kmma2#

因此,基本上有两个可能的方向,根据你想做的计算类型。使用方法之一: fold / reduce / aggregate 或者更普通的,你已经提到了- apply . 所有这些都适用于windows的一个键。
至于 apply 这是一种非常通用的计算方法。最基本的版本(在scala中)是:

def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R]

其中函数采用4个参数:
窗口的键(记住您正在处理keyedstream)
窗口(您可以从中提取窗口的开始或结束)
分配给此特定窗口和键的元素
应该向其发出处理结果的收集器
但是必须记住,在发出窗口之前,这个版本必须保持每个元素的状态。一个更好的内存性能解决方案是使用一个带有预聚集器的版本,它在启动上述函数之前执行一些计算。
在这里,您可以看到一个简短的片段,其中包含预汇总的:

val stream: DataStream[(String,Int)] =   ...

stream.keyBy(_._1)
      .window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap())))
      .apply((e1, e2) => (e1._1, e1._2 + e2._2),
             (key, window, in, out: Collector[(String, Long, Long, Int)]) => {
                out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum))
      })

它统计会话窗口中密钥的出现次数。
所以基本上,如果你不需要窗口的元信息,我会坚持 fold \ reduce \ aggregate 如果足够的话。而不是考虑应用某种类型的预聚合,如果这还不够,看看最通用的 apply .
更完整的例子,你可以看看这里。

7lrncoxx

7lrncoxx3#

据我所知,您可以对有状态窗口数据应用map/flatmap/keyby函数调用 val inputStream 以便更改数据。所以如果你要创造 class DoSthWithYourStream {...} 如果要定义方法和输入数据限制,则可以创建另一个值: val inputStreamChanged = inputStream .map( a => DoSthWithYourStream.Change2ColumnsIntoOne(a.change1st, a.change2nd), a) .flatMap(new DoSthWithYourStream()) 使用map/flapmap/key等扩展java类并将scala类应用到流中的示例
如果您想使用cep,那么我认为最好的选择是利用cep模式api val pattern = Pattern.begin("start").where(_.getId == 42) .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0) .followedBy("end").where(_.getName == "end") val patternStream = CEP.pattern(inputStream, pattern) val result: DataStream[Alert] = patternStream.select(createAlert(_))

相关问题