我对flink和streaming也是新手。我想将每个分区的某个函数应用于流的每个窗口(使用事件时间)。到目前为止,我所做的是:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream = env.readTextFile("dataset.txt")
.map(transformStream(_))
.assignAscendingTimestamps(_.eventTime)
.keyBy(_.id)
.timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep))
def transformStream(input: String): EventStream = {...}
case class EventStream(val eventTime: Long, val id: String, actualEvent: String)
我想做的是对每个窗口批处理的每个分区应用一个通用函数,也许应用一个复杂的处理算法或类似的东西。我见过这个方法在datastreamapi中的应用,但我不明白它是如何工作的。在flink api中,它表示在scala中的用法如下:
inputStream.apply { WindowFunction }
有人能解释一下apply方法是做什么的或者它是如何使用的吗?最好用scala作为例子。应用方法是否符合我的要求?
3条答案
按热度按时间3lxsmp7m1#
结果它需要一点scala魔法。到目前为止,我所做的是:
根据我的实验,processpartition方法在整个批处理上应用了一个“key partitioned”(批处理将只包含具有相同key的元素)的函数。我从javaapi获取了这个方法的参数。如果有人能详细介绍一下apply函数及其工作原理,那将非常有用。
hgc7kmma2#
因此,基本上有两个可能的方向,根据你想做的计算类型。使用方法之一:
fold
/reduce
/aggregate
或者更普通的,你已经提到了-apply
. 所有这些都适用于windows的一个键。至于
apply
这是一种非常通用的计算方法。最基本的版本(在scala中)是:其中函数采用4个参数:
窗口的键(记住您正在处理keyedstream)
窗口(您可以从中提取窗口的开始或结束)
分配给此特定窗口和键的元素
应该向其发出处理结果的收集器
但是必须记住,在发出窗口之前,这个版本必须保持每个元素的状态。一个更好的内存性能解决方案是使用一个带有预聚集器的版本,它在启动上述函数之前执行一些计算。
在这里,您可以看到一个简短的片段,其中包含预汇总的:
它统计会话窗口中密钥的出现次数。
所以基本上,如果你不需要窗口的元信息,我会坚持
fold
\reduce
\aggregate
如果足够的话。而不是考虑应用某种类型的预聚合,如果这还不够,看看最通用的apply
.更完整的例子,你可以看看这里。
7lrncoxx3#
据我所知,您可以对有状态窗口数据应用map/flatmap/keyby函数调用
val inputStream
以便更改数据。所以如果你要创造class DoSthWithYourStream {...}
如果要定义方法和输入数据限制,则可以创建另一个值:val inputStreamChanged = inputStream .map( a => DoSthWithYourStream.Change2ColumnsIntoOne(a.change1st, a.change2nd), a) .flatMap(new DoSthWithYourStream())
使用map/flapmap/key等扩展java类并将scala类应用到流中的示例如果您想使用cep,那么我认为最好的选择是利用cep模式api
val pattern = Pattern.begin("start").where(_.getId == 42) .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0) .followedBy("end").where(_.getName == "end")
val patternStream = CEP.pattern(inputStream, pattern)val result: DataStream[Alert] = patternStream.select(createAlert(_))