我想将一系列交易汇总到相同交易量的窗口中,这是区间内所有交易的交易规模之和。
我能够编写一个自定义触发器,将数据划分到windows中。代码如下:
case class Trade(key: Int, millis: Long, time: LocalDateTime, price: Double, size: Int)
class VolumeTrigger(triggerVolume: Int, config: ExecutionConfig) extends Trigger[Trade, Window] {
val LOG: Logger = LoggerFactory.getLogger(classOf[VolumeTrigger])
val stateDesc = new ValueStateDescriptor[Double]("volume", createTypeInformation[Double].createSerializer(config))
override def onElement(event: Trade, timestamp: Long, window: Window, ctx: TriggerContext): TriggerResult = {
val volume = ctx.getPartitionedState(stateDesc)
if (volume.value == null) {
volume.update(event.size)
return TriggerResult.CONTINUE
}
volume.update(volume.value + event.size)
if (volume.value < triggerVolume) {
TriggerResult.CONTINUE
}
else {
volume.update(volume.value - triggerVolume)
TriggerResult.FIRE_AND_PURGE
}
}
override def onEventTime(time: Long, window: Window, ctx: TriggerContext): TriggerResult = {
TriggerResult.FIRE_AND_PURGE
}
override def onProcessingTime(time: Long, window:Window, ctx: TriggerContext): TriggerResult = {
throw new UnsupportedOperationException("Not a processing time trigger")
}
override def clear(window: Window, ctx: TriggerContext): Unit = {
val volume = ctx.getPartitionedState(stateDesc)
ctx.getPartitionedState(stateDesc).clear()
}
}
def main(args: Array[String]) : Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val trades = env
.readTextFile("/tmp/trades.csv")
.map {line =>
val cells = line.split(",")
val time = LocalDateTime.parse(cells(0), DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss.SSSSSSSSS"))
val millis = time.toInstant(ZoneOffset.UTC).toEpochMilli
Trade(0, millis, time, cells(1).toDouble, cells(2).toInt)
}
val aggregated = trades
.assignAscendingTimestamps(_.millis)
.keyBy("key")
.window(GlobalWindows.create)
.trigger(new VolumeTrigger(500, env.getConfig))
.sum(4)
aggregated.writeAsText("/tmp/trades_agg.csv")
env.execute("volume agg")
}
例如,数据如下所示:
0180102 04:00:29.715706404,169.10,100
20180102 04:00:29.715715627,169.10,100
20180102 05:08:29.025299624,169.12,100
20180102 05:08:29.025906589,169.10,214
20180102 05:08:29.327113252,169.10,200
20180102 05:09:08.350939314,169.00,100
20180102 05:09:11.532817015,169.00,474
20180102 06:06:55.373584329,169.34,200
20180102 06:07:06.993081961,169.34,100
20180102 06:07:08.153291898,169.34,100
20180102 06:07:20.081524768,169.34,364
20180102 06:07:22.838656715,169.34,200
20180102 06:07:24.561360031,169.34,100
20180102 06:07:37.774385969,169.34,100
20180102 06:07:39.305219107,169.34,200
我有时间戳,价格和尺码。
上面的代码可以将其划分为大小大致相同的窗口:
Trade(0,1514865629715,2018-01-02T04:00:29.715706404,169.1,514)
Trade(0,1514869709327,2018-01-02T05:08:29.327113252,169.1,774)
Trade(0,1514873215373,2018-01-02T06:06:55.373584329,169.34,300)
Trade(0,1514873228153,2018-01-02T06:07:08.153291898,169.34,464)
Trade(0,1514873242838,2018-01-02T06:07:22.838656715,169.34,600)
Trade(0,1514873294898,2018-01-02T06:08:14.898397117,169.34,500)
Trade(0,1514873299492,2018-01-02T06:08:19.492589659,169.34,400)
Trade(0,1514873332251,2018-01-02T06:08:52.251339070,169.34,500)
Trade(0,1514873337928,2018-01-02T06:08:57.928680090,169.34,1000)
Trade(0,1514873338078,2018-01-02T06:08:58.078221995,169.34,1000)
现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要通过将间隔结束时的交易分为两部分来稍微更改数据,一部分属于要触发的实际窗口,另一部分高于触发值的剩余量必须分配给下一个窗口。
这可以用一些定制的聚合函数来处理吗?它需要知道以前窗口的结果,但我无法找到如何做到这一点。
Apache·Flink的Maven们对如何处理这个案子有什么想法吗?
添加逐出器不起作用,因为它只在开始时清除一些元素。
我希望从spark结构化流媒体到flink的转变是一个不错的选择,因为我以后还要处理更复杂的情况。
2条答案
按热度按时间a6b3iqyw1#
一个简单的方法(虽然不是超高效的)是在窗口流之前放置一个flatmapfunction。如果它的键控方式相同,那么可以使用valuestate跟踪总体积,并在达到限制时发出两条记录(分割)。
9wbgstp72#
由于所有记录的密钥都相同,因此在这种情况下可能不需要窗口。请参阅flink文档中的这一页https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-托管键控状态。它有一个countwindowaverage类,其中流中每个记录的值的聚合是使用状态变量完成的。您可以实现这一点,并在状态变量到达触发器卷时发送输出,并用剩余卷重置状态变量的值。