我有推特流api,我从那里检索推特。
我也有一个我想要考虑的单词列表。
我要做的是将最精确的值存储到我的cassandra数据库中,该值对应于这个词在一天中被使用的次数。
我在考虑使用窗口函数每5秒钟合并一次结果,然后在数据库中写入这个合并值。
我不知道这是不是最好的办法。如果这是最好的方法,我试着在文档后面做一个简单的例子,但是它没有每5秒钟对单词进行分组。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val counts =
env.fromElements("foo bar test test baz foo", "yes no no yes", "hi hello hi hello")
.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.filter(word => Words.listOfWords.contains(word) || Words.listOfWords2.contains(word))
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5)).sum( 1)
counts.print()
env.execute("test-code")
}
1条答案
按热度按时间vktxenjb1#
好吧,目前它将不起作用,因为您正在创建
DataStream
从元素,这不是窗口化的最佳方法,因为您实际上没有5秒的运行时间来创建多个窗口,所以所有消息都将转到同一个窗口。但是,如果您在实际的twitterapi上运行它,通常应该将这些项目正确地分组到windows中。