如何使用flink传输json?

laik7k3q  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(316)

实际上,我正在处理一个流,接收一堆字符串,需要对所有字符串进行计数。总和是加总的,这意味着对于第二条记录,总和是在输出的前一天添加的,必须是类似于

{
"aggregationType" : "day",
"days before" : 2,
"aggregates" : [
    {"date" : "2018-03-03",
    "sum" : 120},
  {"date" :"2018-03-04",
  "sum" : 203}
  ]
}

我创建了一条流,看起来像:

val eventStream : DataStream [String] = 
eventStream
    .addSource(source)
    .keyBy("")
    .TimeWindow(Time.days(1), Time.days(1))
    .trigger(new MyTriggerFunc)
    .aggregation(new MyAggregationFunc)
    .addSink(sink)

提前感谢您的帮助:)

dgjrabp2

dgjrabp21#

在flink中使用json的注意事项:
使用 JSONDeserializationSchema 对事件进行反序列化 ObjectNode s。你可以绘制Map ObjectNodeYourObject 为了方便或继续与 ObjectNode .
使用的教程 ObjectNode : http://www.baeldung.com/jackson-json-node-tree-model
回到您的案例,您可以按以下方式进行:

val eventStream : DataStream [ObjectNode] = 
oneMinuteAgg
    .addSource(source)
    .windowAll()
    .TimeWindow(Time.minutes(1))
    .trigger(new MyTriggerFunc)
    .aggregation(new MyAggregationFunc)

将输出1分钟的聚合流

[     
      {
      "date" :2018-03-03
      "sum" : 120
      }, 
      {
      "date" :2018-03-03
      "sum" : 120
      }
]

然后将另一个操作符链接到“oneminuteagg”,该操作符将把1分钟的聚合添加到1天的聚合中:

[...]
oneMinuteAgg
        .windowAll()
        .TimeWindow(Time.days(1))
        .trigger(new Whatever)
        .aggregation(new YourDayAggF)

这将输出您所需要的

{
    "aggregationType" : "day"
    "days before" : 4
    "aggregates : [{
      "date" :2018-03-03
      "sum" : 120
      }, 
      {
      "date" :2018-03-03
      "sum" : 120
      }]
}

我曾经 windowAll() 假设您不需要为流设置密钥。

相关问题