scala流处理,计算每个日期的发生次数

vsmadaxz  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(277)

我正在使用scala流处理,我有一个事件列表,并且我正在尝试以这种格式计算每个日期的出现次数( dd-mm-yyyy )
这是我对案例类的定义:

case class Event(prId: Int, author: String, event: String, timestamp: Date)

我的解决办法如下

def CountOccur(events: DataStreaming[Event]): DataStreaming[(String, Int)] = {
    events.map(c => (c.timestamp, 1)).keyBy(x => x._1).timeWindow(Time.seconds(5)).sum(1)
  }

假设返回的答案如下:

2010-09-25,10
2010-09-27,7
.
.
.

我现在的问题是 timestamp 属于 date format ,但我需要 string .
我怎样才能把它转换成字符串格式的日期呢?
固定的:
以下是解决此问题的方法:

events.map(c => (sdf.format(c.timestamp), 1)).keyBy(x => sdf.format(x._1)).window(EventTimeSessionWindows.withGap(Time.seconds(10))

但问题是现在它什么都不回!!

s8vozzvw

s8vozzvw1#

我不认识Flink,但我猜

def CountOccur(events: DataStreaming[Event]): DataStreaming[(String, Int)] = {
    val sdf = new SimpleDateFormat("dd-mm-yyyy")

    events.map(c => (sdf.format(c.timestamp), 1)).keyBy(x => x._1).timeWindow(Time.seconds(5)).sum(1)
}

您需要删除第二种格式 sdf.format(x._1) ,它将引发异常。

相关问题