flink流处理计数每个回购的唯一问题

kupeojn6  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(340)

我在scala和flink一起工作,我正在努力获得每次回购的独特发行数量。我有这样一个元组的数据流:(repo\u name,issue\u id,event\u time)。如何获取每个回购名称的唯一问题id的计数?我想我必须使用 mapWithState ,但我不知道怎么用。
提前谢谢。

bf1o4zei

bf1o4zei1#

假设您希望在7天的滚动时间窗口中处理事件。

// eventStream: stream of case classes of type GithubEvent 
eventStream
   // only look at IssuesEvent
  .filter(e => e.`type` == "IssuesEvent")
   // key by the name of the repository
  .keyBy("repo.name")
   // tumbling time window of a week
  .timeWindow(Time.days(7))
   // apply window function
  .apply { (key, _, vals, out: Collector[(String)]) =>
    var count = 0;
    for (_ <- vals) {
      count = count + 1;
    }
    out.collect(s"Repo name: $key Unique issues: $count")
  }

要计算每个存储库的唯一问题数,我们需要查看issuesevents。我们按存储库的名称输入密钥。然后,我们应用一个窗口函数来返回一个字符串,该字符串指示问题的唯一数量。
参考文献:
在apache flink中引入流窗口
燧石窗

相关问题