实际上,我正在处理一个流,接收一堆字符串,需要对所有字符串进行计数。总和是加总的,这意味着对于第二条记录,总和是在输出的前一天添加的,必须是类似于
{
"aggregationType" : "day",
"days before" : 2,
"aggregates" : [
{"date" : "2018-03-03",
"sum" : 120},
{"date" :"2018-03-04",
"sum" : 203}
]
}
我创建了一条流,看起来像:
val eventStream : DataStream [String] =
eventStream
.addSource(source)
.keyBy("")
.TimeWindow(Time.days(1), Time.days(1))
.trigger(new MyTriggerFunc)
.aggregation(new MyAggregationFunc)
.addSink(sink)
提前感谢您的帮助:)
1条答案
按热度按时间dgjrabp21#
在flink中使用json的注意事项:
使用
JSONDeserializationSchema
对事件进行反序列化ObjectNode
s。你可以绘制MapObjectNode
至YourObject
为了方便或继续与ObjectNode
.使用的教程
ObjectNode
: http://www.baeldung.com/jackson-json-node-tree-model回到您的案例,您可以按以下方式进行:
将输出1分钟的聚合流
然后将另一个操作符链接到“oneminuteagg”,该操作符将把1分钟的聚合添加到1天的聚合中:
这将输出您所需要的
我曾经
windowAll()
假设您不需要为流设置密钥。