apache flink自定义窗口聚合

6kkfgxo0  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(373)

我想将一系列交易汇总到相同交易量的窗口中,这是区间内所有交易的交易规模之和。
我能够编写一个自定义触发器,将数据划分到windows中。代码如下:

case class Trade(key: Int, millis: Long, time: LocalDateTime, price: Double, size: Int)

class VolumeTrigger(triggerVolume: Int, config: ExecutionConfig) extends Trigger[Trade, Window] {
  val LOG: Logger = LoggerFactory.getLogger(classOf[VolumeTrigger])
  val stateDesc = new ValueStateDescriptor[Double]("volume", createTypeInformation[Double].createSerializer(config))

  override def onElement(event: Trade, timestamp: Long, window: Window, ctx: TriggerContext): TriggerResult = {
    val volume = ctx.getPartitionedState(stateDesc)
    if (volume.value == null) {
      volume.update(event.size)
      return TriggerResult.CONTINUE
    }

    volume.update(volume.value + event.size)
    if (volume.value < triggerVolume) {
      TriggerResult.CONTINUE
    }
    else {
      volume.update(volume.value - triggerVolume)
      TriggerResult.FIRE_AND_PURGE
    }
  }

  override def onEventTime(time: Long, window: Window, ctx: TriggerContext): TriggerResult = {
    TriggerResult.FIRE_AND_PURGE
  }

  override def onProcessingTime(time: Long, window:Window, ctx: TriggerContext): TriggerResult = {
    throw new UnsupportedOperationException("Not a processing time trigger")
  }

  override def clear(window: Window, ctx: TriggerContext): Unit = {
    val volume = ctx.getPartitionedState(stateDesc)
    ctx.getPartitionedState(stateDesc).clear()
  }
}

def main(args: Array[String]) : Unit = {

  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.setParallelism(1)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val trades = env
    .readTextFile("/tmp/trades.csv")
    .map {line =>
      val cells = line.split(",")
      val time = LocalDateTime.parse(cells(0), DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss.SSSSSSSSS"))
      val millis = time.toInstant(ZoneOffset.UTC).toEpochMilli
      Trade(0, millis, time, cells(1).toDouble, cells(2).toInt)
    }

  val aggregated = trades
    .assignAscendingTimestamps(_.millis)
    .keyBy("key")
    .window(GlobalWindows.create)
    .trigger(new VolumeTrigger(500, env.getConfig))
    .sum(4)

  aggregated.writeAsText("/tmp/trades_agg.csv")

  env.execute("volume agg")
}

例如,数据如下所示:

0180102 04:00:29.715706404,169.10,100
20180102 04:00:29.715715627,169.10,100
20180102 05:08:29.025299624,169.12,100
20180102 05:08:29.025906589,169.10,214
20180102 05:08:29.327113252,169.10,200
20180102 05:09:08.350939314,169.00,100
20180102 05:09:11.532817015,169.00,474
20180102 06:06:55.373584329,169.34,200
20180102 06:07:06.993081961,169.34,100
20180102 06:07:08.153291898,169.34,100
20180102 06:07:20.081524768,169.34,364
20180102 06:07:22.838656715,169.34,200
20180102 06:07:24.561360031,169.34,100
20180102 06:07:37.774385969,169.34,100
20180102 06:07:39.305219107,169.34,200

我有时间戳,价格和尺码。
上面的代码可以将其划分为大小大致相同的窗口:

Trade(0,1514865629715,2018-01-02T04:00:29.715706404,169.1,514)
Trade(0,1514869709327,2018-01-02T05:08:29.327113252,169.1,774)
Trade(0,1514873215373,2018-01-02T06:06:55.373584329,169.34,300)
Trade(0,1514873228153,2018-01-02T06:07:08.153291898,169.34,464)
Trade(0,1514873242838,2018-01-02T06:07:22.838656715,169.34,600)
Trade(0,1514873294898,2018-01-02T06:08:14.898397117,169.34,500)
Trade(0,1514873299492,2018-01-02T06:08:19.492589659,169.34,400)
Trade(0,1514873332251,2018-01-02T06:08:52.251339070,169.34,500)
Trade(0,1514873337928,2018-01-02T06:08:57.928680090,169.34,1000)
Trade(0,1514873338078,2018-01-02T06:08:58.078221995,169.34,1000)

现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要通过将间隔结束时的交易分为两部分来稍微更改数据,一部分属于要触发的实际窗口,另一部分高于触发值的剩余量必须分配给下一个窗口。
这可以用一些定制的聚合函数来处理吗?它需要知道以前窗口的结果,但我无法找到如何做到这一点。
Apache·Flink的Maven们对如何处理这个案子有什么想法吗?
添加逐出器不起作用,因为它只在开始时清除一些元素。
我希望从spark结构化流媒体到flink的转变是一个不错的选择,因为我以后还要处理更复杂的情况。

a6b3iqyw

a6b3iqyw1#

一个简单的方法(虽然不是超高效的)是在窗口流之前放置一个flatmapfunction。如果它的键控方式相同,那么可以使用valuestate跟踪总体积,并在达到限制时发出两条记录(分割)。

9wbgstp7

9wbgstp72#

由于所有记录的密钥都相同,因此在这种情况下可能不需要窗口。请参阅flink文档中的这一页https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-托管键控状态。它有一个countwindowaverage类,其中流中每个记录的值的聚合是使用状态变量完成的。您可以实现这一点,并在状态变量到达触发器卷时发送输出,并用剩余卷重置状态变量的值。

相关问题