spark数据集筛选器元素

5t7ly7z5  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(307)

我有两个spark数据集:lessonds和latestlessonds;
这是我的spark数据集pojo:

Lesson class:
    private List<SessionFilter> info;
    private lessonId;

LatestLesson class:

   private String id:

SessionFilter class:
   private String id;
   private String sessionName;

我想获取所有课程数据,其中info.id在lesson class中,而不是在latestlesson id中。
像这样:

lessonDS.filter(explode(col("info.id")).notEqual(latestLessonDS.col("value"))).show();

latestLessonDS contain:

100A
200C
300A
400A

lessonDS contain:

1,[[100A,jon],[200C,natalie]]
2,[[100A,jon]]
3,[[600A,jon],[400A,Kim]]

result:
3,[[600A,jon]
tvz2xvvm

tvz2xvvm1#

通常情况下,数组_contains函数可以用作连接时的连接条件 lessonDs 以及 latestLessonDs . 但是这个函数在这里不起作用,因为连接条件要求 lessonDs.info.id 出现在 latestLessonDS .
得到结果的方法是爆炸 lessonDs ,加入 latestLessonDs 然后检查 lessonDs.info 进入 latestLessonDs 通过比较联接前后的信息元素数而存在:

lessonDs
  .withColumn("noOfEntries", size('info))
  .withColumn("id", explode(col("info.id")))
  .join(latestLessonDs, "id" )
  .groupBy("lessonId", "info", "noOfEntries").count()
  .filter("noOfEntries = count")
  .drop("noOfEntries", "count")
  .show(false)

印刷品

+--------+------------------------------+
|lessonId|info                          |
+--------+------------------------------+
|1       |[[100A, jon], [200C, natalie]]|
|2       |[[100A, jon]]                 |
+--------+------------------------------+
voj3qocg

voj3qocg2#

如果你的数据集大小latestlessonds是合理的,你可以收集它,广播,然后简单的过滤变换的第二课将给你想要的结果。
喜欢

import scala.collection.JavaConversions._
 import spark.implicits._ 
 val bc = spark.sparkContext.broadcast(latestLessonDS.collectAsList().toSeq)
    lessonDS.mapPartitions(itr => {
      val cache = bc.value;
      itr.filter(x => {
        //check in cache 
      })
    })

相关问题