apache-flink:processwindowfunction keyby()多个值

gg58donl  于 2021-06-26  发布在  Flink
关注(0)|答案(3)|浏览(479)

我正在尝试将windowfunction用于datastream,我的目标是实现如下查询

SELECT  *,
    count(id) OVER(PARTITION BY country) AS c_country,
    count(id) OVER(PARTITION BY city) AS c_city,
    count(id) OVER(PARTITION BY city) AS c_addrs
FROM fm
ORDER BY country

帮助我按国家字段进行聚合,但我需要在同一时间窗口中按两个字段进行聚合。我不知道在keyby()中是否可以有两个或更多的键

val parsed = stream2.map(x=> {
      val arr = x.split(",")
      (arr(0).toInt, arr(1), arr(2))
    })

    parsed
    .keyBy(x => x._2) 
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new ProcessWindowFunction[
        (Int, String, String), (Int, String, String, Int), String, TimeWindow   
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, String)],
                             out: Collector[(Int, String, String, Int)]): Unit = {  
          val lst = elements.toList
          lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
      }
      }).print().setParallelism(1)

这对于第一次聚合很好,但是我错过了同一时间窗口中城市字段的第二次聚合。
输入数据:

10,"SPAIN","BARCELONA","C1"
20,"SPAIN","BARCELONA","C2"
30,"SPAIN","MADRID","C3"
30,"SPAIN","MADRID","C3"
80,"SPAIN","MADRID","C4"
90,"SPAIN","VALENCIA","C5"
40,"ITALY","ROMA","C6"
41,"ITALY","ROMA","C7"
42,"ITALY","VENECIA","C8"
50,"FRANCE","PARIS","C9"
60,"FRANCE","PARIS","C9"
70,"FRANCE","MARSELLA","C10"

预期产量

(10,"SPAIN","BARCELONA",6,2,1)
(20,"SPAIN","BARCELONA",6,2,1)
(30,"SPAIN","MADRID",6,3,2)
(30,"SPAIN","MADRID",6,3,2)
(80,"SPAIN","MADRID",6,3,1)
(90,"SPAIN","VALENCIA",6,1,1)
(50,"FRANCE","PARIS",3,2,1)
(60,"FRANCE","PARIS",3,2,1)
(70,"FRANCE","MARSELLA",3,1,1)
(40,"ITALY","ROMA",3,2,2)
(41,"ITALY","ROMA",3,2,2)
(42,"ITALY","VENECIA",3,1,1)
hfyxw5xn

hfyxw5xn1#

作为 city 是的子类别 country ,您可以通过 city 维度,然后通过 country 尺寸。

val parsed = stream2.map(x=> {
      val arr = x.split(",")
      (arr(0).toInt, arr(1), arr(2))
    })

    parsed
    .keyBy(x => x._3) 
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new ProcessWindowFunction[
        (Int, String, String), (Int, String, String, Int), String, TimeWindow   
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, String)],
                             out: Collector[(Int, String, String, Int)]): Unit = {  
          val lst = elements.toList
          lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
      }
      })
      .keyBy(x => x._2)
      .process(new ProcessWindowFunction[
        (Int, String, String), (Int, String, String, Int), String, TimeWindow   
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, String)],
                             out: Collector[(Int, String, String, Int)]): Unit = {  
          val cnt = 0
          for(e:elements){
             cnt += e._4
          }

          lst.foreach(x => out.collect((x._1, x._2, x._3, cnt)))
      }
      }).print().setParallelism(1)

如果一个维度不是另一个维度的子维度,您可以将这两个维度合并并生成一个新的键,然后自己在process func中实现聚合逻辑。

keyBy(x=>x._2+x._3)

更新

我认为在一个进程函数中计算结果是不可能的,因为您试图用不同的键进行统计。一步完成的唯一方法是将全局并行度设置为1(即使使用 keyby func)或将输入数据广播到所有下游任务。
因为您的计算实际上有一些通用的过程逻辑,所以最好进行一些抽象。

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object CountJob {

  @throws[Exception]
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val transactions: DataStream[Record] = env
      .addSource(new SourceFunction[Record] {
        override def run(sourceContext: SourceFunction.SourceContext[Record]): Unit = {
          while (true) {
            sourceContext.collect(Record(1, "a", "b", "c", 1, 1, 1))
            Thread.sleep(1000)
          }
        }

        override def cancel(): Unit = {

        }
      })
      .name("generate source")

    transactions.keyBy(_.addr)
      .timeWindow(Time.seconds(1))
      .process(new CustomCountProc("ADDR"))
      .keyBy(_.city)
      .timeWindow(Time.seconds(1))
      .process(new CustomCountProc("CITY"))
      .keyBy(_.country)
      .timeWindow(Time.seconds(1))
      .process(new CustomCountProc("COUNTRY"))
      .print()

    env.execute("Count Job")
  }
}

// a common operator to process different aggregation
class CustomCountProc(aggrType: String) extends ProcessWindowFunction[Record, Record, String, TimeWindow] {

  override def process(key: String, context: Context, elements: Iterable[Record], out: Collector[Record]): Unit = {

    for (e <- elements) {
      if ("ADDR".equals(aggrType)) {
        out.collect(Record(-1, e.country, e.city, key, e.country_cnt, e.city_cnt, elements.size))
      }
      else if ("CITY".equals(aggrType)) {
        out.collect(Record(-1, e.country, key, e.country, e.country_cnt, elements.size, e.addr_cnt))
      }
      else if ("COUNTRY".equals(aggrType)) {
        out.collect(Record(-1, key, e.city, e.addr, elements.size, e.city_cnt, e.addr_cnt))
      }
    }

  }
}

case class Record(
                   id: Int,
                   country: String,
                   city: String,
                   addr: String,
                   country_cnt: Int,
                   city_cnt: Int,
                   addr_cnt: Int
                 ) {
}

顺便说一句,我不确定输出是否真的符合您的期望。由于您没有实现有状态的流程函数,我认为您正在尝试计算每批数据的聚合结果,并且每批数据都包含在1秒的时间窗口中接收的数据。输出不会一直累积,每批都会从零开始。
通过使用 timeWindow 函数,您还需要注意 TimeCharacteristic 默认情况下是处理时间。
输出也可能因为使用3个结果而延迟 window 功能。假设第一个进程func在一秒钟内完成了聚合,并将结果转发到下游。作为第二个进程,func还有一个 timewindow 1秒,它将不会发出任何结果,直到它收到来自上游的下一批输出。
让我们看看别人是否有更好的办法来解决你的问题。

zyfwsgd6

zyfwsgd62#


我现在想对3列进行聚合。如果我使用的选项是链接keyby()输出,但这可能会变得非常长和复杂,并且不太可读。除此之外,我还设置了一个time.seconds(1)的时间窗口,因为如果没有这个窗口,上面的keyby()输出将作为单个事件。
我感兴趣的是,我是否可以在一个进程函数中进行这些聚合。
我有那么长的密码。。。

parsed
    .keyBy(_.country) // key by product id.
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new ProcessWindowFunction[
        AlarmasIn, AlarmasOut, String, TimeWindow
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[AlarmasIn],
                             out: Collector[AlarmasOut]): Unit = {
          val lst = elements.toList
          lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address, lst.size,0,0)))
      }
      })
      .keyBy( _.city).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
        .process(new ProcessWindowFunction[
          AlarmasOut, AlarmasOut, String, TimeWindow
        ]() {
          override def process(key: String,
                               context: Context,
                               elements: Iterable[AlarmasOut],
                               out: Collector[AlarmasOut]): Unit = {
            val lst = elements.toList
            lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address,x.c_country,lst.size,x.c_addr)))
          }
        })
      .keyBy( _.address).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
      .process(new ProcessWindowFunction[
        AlarmasOut, AlarmasOut, String, TimeWindow
      ]() {
        override def process(key: String,
                             context: Context,
                             elements: Iterable[AlarmasOut],
                             out: Collector[AlarmasOut]): Unit = {
          val lst = elements.toList
          lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address,x.c_country,x.c_city,lst.size)))
        }
      })
      .print()

/// CASE CLASS
 case class AlarmasIn(
                      id: Int,
                      country: String,
                      city: String,
                      address: String
                    )

  case class AlarmasOut(
                       id: Int,
                       country: String,
                       city: String,
                       address: String,
                       c_country: Int,
                       c_city: Int,
                       c_addr: Int
                     )

相关问题