// eventStream: stream of case classes of type GithubEvent
eventStream
// only look at IssuesEvent
.filter(e => e.`type` == "IssuesEvent")
// key by the name of the repository
.keyBy("repo.name")
// tumbling time window of a week
.timeWindow(Time.days(7))
// apply window function
.apply { (key, _, vals, out: Collector[(String)]) =>
var count = 0;
for (_ <- vals) {
count = count + 1;
}
out.collect(s"Repo name: $key Unique issues: $count")
}
1条答案
按热度按时间bf1o4zei1#
假设您希望在7天的滚动时间窗口中处理事件。
要计算每个存储库的唯一问题数,我们需要查看issuesevents。我们按存储库的名称输入密钥。然后,我们应用一个窗口函数来返回一个字符串,该字符串指示问题的唯一数量。
参考文献:
在apache flink中引入流窗口
燧石窗