删除重复项(将密钥视为航班号),只获取最新的记录w.r.t时间戳

axzmvihb  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(308)

我可以上传数据吗,这是Kafka的avro模式?
我想从主题中选取记录,然后过滤航班(例如:考虑两个记录有相同的航班号。我们只需要通过考虑avro模式中提到的时间戳来选择最新的一个
我要删除同一航班号的重复项,怎么做

{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "oldsomething random" }

输出流应该是这样的,

{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
builder.stream(inputTopic, Consumed.with(Serdes.String(), flightDataSerde))

        .map((k, v) -> new KeyValue<>((String) v.getFlightStatus(), (Integer) v.getFlightNumber()))

        .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
        // Apply COUNT method
      .count()
        // Write to stream specified by outputTopic
        .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));

avro公司:

"namespace": "io.confluent.developer.avro",
  "type": "record",
  "name": "FlightData",
  "fields": [
    {"name": "FlightNumber", "type": "int"},
    {"name": "OriginAirport", "type": "string"},
    {"name": "DestinationAirport", "type": "string"},
        {"name": "OriginDate", "type": "string"},
        {"name": "OriginTime", "type": "string"},
        {"name": "DestinationDate", "type": "string"},
        {"name": "DestinationTime", "type": "string"},
        {"name": "FlightStatus", "type": "string"},

        {"name": "GateOut", "type": "string"},
        {"name": "GateIn", "type": "string"},
        {"name": "RecordDateTime", "type": "string"}
  ]
}
mitkmikd

mitkmikd1#

通过考虑avro模式中提到的时间戳
这就是timestampextractor接口的用途。否则,您可以调整上游生产者,使该时间戳成为实际的记录时间戳
两条记录有相同的航班号。我们只需要选一个最新的
这是到达源主题的相同键的有序记录的默认行为。不过,您需要考虑处理延迟到达的数据的逻辑,并跳过任何具有更晚时间戳的数据。这可以用处理器api来完成,比streams dsl简单,您无论如何都需要使用streams dsl来获取对表内容的检查

相关问题