我已经建立了从Kafka数据检索风暴拓扑。我想构建一个聚合,其中一个字段上的每个批的最小计数。我尝试在流上使用maxby函数,但是它不显示任何结果,尽管数据在系统中流动,输出函数与其他聚合一起工作。如何以不同的方式实现,或者在当前的实现中可以修复哪些内容?
以下是我当前的实现:
val tridentTopology = new TridentTopology()
val stream = tridentTopology.newStream("kafka_spout",
new KafkaTridentSpoutOpaque(spoutConfig))
.map(new ParserMapFunction, new Fields("created_at", "id", "text", "source", "timestamp_ms",
"user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
"user.friends_count", "user.favorite_count", "user.lang", "entities.hashtags"))
.maxBy("user.followers_count")
.map(new OutputFunction)
我的自定义输出函数:
class OutputFunction extends MapFunction{
override def execute(input: TridentTuple): Values = {
val values = input.getValues.asScala.toList.toString
println(s"TWEET: $values")
new Values(values)
}
}
暂无答案!
目前还没有任何答案,快来回答吧!