scala—如何在java对象数据集之上优化数据集聚合

q5lcpyga  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(376)

在这个愚蠢的问题上,你能支持我吗
我有一些java类:

public class ProbePoint implements Serializable, Cloneable {
    private long arrivalTimeMillis = 0;
    private long captureTimeMillis = 0;
//...
}
public class Trip implements Serializable, Cloneable {
    private ArrayList<ProbePoint> points = new ArrayList<>();
//...
}

我有 Dataset[Trip] . 我需要收集一些最小/最大值。下一步的更好实施是什么:

public class DataRanges implements Serializable {
    private long minCaptureTs;
    private long maxArrivalTs;

}

 val timesDs: Dataset[DataRanges] = trips.mapPartitions(t => {
      var minCaptTime = Long.MaxValue
      var maxArrTime = Long.MinValue
      t.foreach(f => {
        if (f.points.head < minCaptTime) minCaptTime = f.points.head
        if (f.points.last.getArrivalTimeMillis > maxArrTime) maxArrTime = f.points.last.getArrivalTimeMillis
      })
      Iterator[DataRanges](
        new DataRanges(minStartTime, maxEndTime, minArrTime, maxArrTime))
    })(Encoders.bean(classOf[DataRanges]))
    val times = timesDs.agg(min("minCaptureTs"), max("maxArrivalTs")).head()
}
}
k5hmc34c

k5hmc34c1#

看看java类 Dataset[Trip] 应该是

root
 |-- points: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- arrivalTimeMillis: long (nullable = false)
 |    |    |-- captureTimeMillis: long (nullable = false)

有可能先炸开阵列,然后采取行动 min 以及 max 从而简化了代码:

val df = tripsDF
  .withColumn("exploded", explode($"points"))
  .withColumn("arrivalTimeMillis", $"exploded.arrivalTimeMillis")
  .withColumn("captureTimeMillis", $"exploded.captureTimeMillis")

val Row(minArrivaltime: Long, maxCaptureTimeMillis: Long) = 
  df.agg(min("arrivalTimeMillis"), max("captureTimeMillis")).head

println(minArrivaltime)
println(maxCaptureTimeMillis)

问题中的代码假定 Trip 类被排序:最小捕获时间总是取自数组的第一个元素,最大到达时间总是取自最后一个元素。这个代码取最小值和最大值 ProbePoint s、 所以逻辑略有不同。

相关问题