如何使用spark和java跨行集成?

aiazj4mn  于 2021-06-02  发布在  Hadoop
关注(0)|答案(20)|浏览(391)

我目前正试图用java编写一个spark作业来计算数据集中一列的积分。
数据如下所示:

DateTime                velocity (in km/h)        vehicle 
    2016-03-28 11:00:45     80                        A
    2016-03-28 11:00:45     75                        A
    2016-03-28 11:00:46     70                        A
    2016-03-28 11:00:47     68                        A
    2016-03-28 11:00:48     72                        A
    2016-03-28 11:00:48     75                        A
    ... 
    2016-03-28 11:00:47     68                        B
    2016-03-28 11:00:48     72                        B
    2016-03-28 11:00:48     75                        B

为了计算每条线路的距离(以公里为单位),我必须定义当前线路和下一条线路之间的时间差,并将其与速度相乘。然后,必须将结果添加到前一行的结果中,以检索此时行驶的“总距离”。
我现在想出了这样的办法。但它会计算出每个Map作业一辆车,可能会有数百万条记录。。。。

final JavaRDD<String[]> input = sc.parallelize(Arrays.asList(
                new String[]{"2016-03-28", "11:00", "80", "VIN1"},
                new String[]{"2016-03-28", "11:00", "60", "VIN1"},
                new String[]{"2016-03-28", "11:00", "50", "VIN1"},
                new String[]{"2016-03-28", "11:01", "80", "VIN1"},
                new String[]{"2016-03-28", "11:05", "80", "VIN1"},
                new String[]{"2016-03-28", "11:09", "80", "VIN1"},
                new String[]{"2016-03-28", "11:00", "80", "VIN2"},
                new String[]{"2016-03-28", "11:01", "80", "VIN2"}
        ));

        // grouping by vehicle and date:
        final JavaPairRDD<String, Iterable<String[]>> byVinAndDate = input.groupBy(new Function<String[], String>() {
            @Override
            public String call(String[] record) throws Exception {
                return record[0] + record[3]; // date, vin
            }
        });

        // mapping each "value" (all record matching key) to result
        final JavaRDD<String[]> result = byVinAndDate.mapValues(new Function<Iterable<String[]>, String[]>() {
            @Override
            public String[] call(Iterable<String[]> records) throws Exception {
                final Iterator<String[]> iterator = records.iterator();

                String[] previousRecord = iterator.next();

                for (String[] record : records) {

                     // Calculate difference current <-> previous record
                     // Add result to new list

                    previousRecord = record;
                }

                return new String[]{
                        previousRecord[0],
                        previousRecord[1],
                        previousRecord[2],
                        previousRecord[3],
                        NewList.get(previousRecord[0]+previousRecord[1]+previousRecord[2]+previousRecord[2])

                };
            }
        }).values();

我完全不知道如何将这个问题转化为map/reduce转换,同时又不失去分布式计算的好处。
我知道这违背了mr和spark的本质,但是任何关于如何链接数据行或以优雅的方式解决这个问题的建议都会非常有用:)
谢谢!

bvpmtnay

bvpmtnay17#

+----+ | data| id| time| avg_velocity|rank| +

raogr8fs

raogr8fs18#

--------+ |2016-03-28|VIN1|11.722222222222223| |2016-03-28|VIN2|1.3333333333333335| +

xj3cbfub

xj3cbfub20#

+----+` 使用dataframe analytics api,基于对数据和id列的分区计算时间帧,保留这些时间帧之间的秒数

val velocities = df.groupBy(df("data"), df("id"), df("time")).agg((avg("velocity") / 3600).as("avg_velocity"))
  val overDataAndId = Window.partitionBy(df("data"), df("id")).orderBy(df("time"))

  val rank = denseRank.over(overDataAndId)
  val nextTime = lead(df("time"), 1).over(overDataAndId)

  val secondsBetween = udf((start: String, end: String) => {
    val sStart = time.LocalTime.parse(start)
    val sEnd = end match {
      case null => sStart
      case t: String if t.isEmpty => sStart
      case t: String if t.equalsIgnoreCase("null") => sStart
      case t: String => time.LocalTime.parse(end)
    }
    Seconds.secondsBetween(sStart, sEnd).getSeconds
  })

  velocities.withColumn("rank", rank).show()
  velocities.withColumn("nextTime", nextTime).show()

  val seconds = velocities.withColumn("seconds", secondsBetween(df("time"), nextTime))
  seconds.show()

它将输出: `+

相关问题