sql—如何有效地识别特定列中不同的记录

hwazgwia  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(373)

我有两个数据集 df1 以及 df2 在这里我需要检测任何不同于 df2 比较 df1 并创建一个带有附加列的结果数据集,该列标记不同的记录。下面是一个例子。

package playground

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, sum}

object sample4 {

  val spark = SparkSession
    .builder()
    .appName("Sample app")
    .master("local")
    .getOrCreate()

  val sc = spark.sparkContext

  final case class Owner(a: Long,
                         b: String,
                         c: Long,
                         d: Short,
                         e: String,
                         f: String,
                         o_qtty: Double)

  final case class Result(a: Long,
                          b: String,
                          c: Long,
                          d: Short,
                          e: String,
                          f: String,
                          o_qtty: Double,
                          isDiff: Boolean)

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)

    import spark.implicits._

    val data1 = Seq(
      Owner(11, "A", 666, 2017, "x", "y", 50),
      Owner(11, "A", 222, 2018, "x", "y", 20),
      Owner(33, "C", 444, 2018, "x", "y", 20),
      Owner(33, "C", 555, 2018, "x", "y", 120),
      Owner(22, "B", 555, 2018, "x", "y", 20),
      Owner(99, "D", 888, 2018, "x", "y", 100),
      Owner(11, "A", 888, 2018, "x", "y", 100),
      Owner(11, "A", 666, 2018, "x", "y", 80),
      Owner(33, "C", 666, 2018, "x", "y", 80),
      Owner(11, "A", 444, 2018, "x", "y", 50),
    )

    val data2 = Seq(
      Owner(11, "A", 666, 2017, "x", "y", 50),
      Owner(11, "A", 222, 2018, "x", "y", 20),
      Owner(33, "C", 444, 2018, "x", "y", 20),
      Owner(33, "C", 555, 2018, "x", "y", 55),
      Owner(22, "B", 555, 2018, "x", "y", 20),
      Owner(99, "D", 888, 2018, "x", "y", 100),
      Owner(11, "A", 888, 2018, "x", "y", 100),
      Owner(11, "A", 666, 2018, "x", "y", 80),
      Owner(33, "C", 666, 2018, "x", "y", 80),
      Owner(11, "A", 444, 2018, "x", "y", 50),
    )

    val expected = Seq(
      Result(11, "A", 666, 2017, "x", "y", 50, isDiff = false),
      Result(11, "A", 222, 2018, "x", "y", 20, isDiff = false),
      Result(33, "C", 444, 2018, "x", "y", 20, isDiff = false),
      Result(33, "C", 555, 2018, "x", "y", 55, isDiff = true),
      Result(22, "B", 555, 2018, "x", "y", 20, isDiff = false),
      Result(99, "D", 888, 2018, "x", "y", 100, isDiff = false),
      Result(11, "A", 888, 2018, "x", "y", 100, isDiff = false),
      Result(11, "A", 666, 2018, "x", "y", 80, isDiff = false),
      Result(33, "C", 666, 2018, "x", "y", 80, isDiff = false),
      Result(11, "A", 444, 2018, "x", "y", 50, isDiff = false),
    )

    val df1 = spark
      .createDataset(data1)
      .as[Owner]
      .cache()

    val df2 = spark
      .createDataset(data2)
      .as[Owner]
      .cache()
  }

}

最有效的方法是什么?

58wvjzkj

58wvjzkj1#

我想这段代码可以帮你找到答案:

val intersectDF=df1.intersect(df2)
val unionDF=df1.union(df2).dropDuplicates()
val diffDF= unionDF.except(intersectDF)

val intersectDF2=intersectDF.withColumn("isDiff",functions.lit(false))
val diffDF2=diffDF.withColumn("isDiff",functions.lit(true))
val answer=intersectDF2.union(diffDF2)

//Common data between two DataFrame
intersectDF2.show()
//Difference data between two DataFrame
diffDF2.show()
//Your answer
answer.show()
5jvtdoz2

5jvtdoz22#

也许这是有帮助的-

执行左连接并将不匹配的列标识为false

val df1_hash = df1.withColumn("x", lit(0))
    df2.join(df1_hash, df2.columns, "left")
      .select(when(col("x").isNull, false).otherwise(true).as("isDiff") +: df2.columns.map(df2(_)): _*)
      .show(false)

    /**
      * +------+---+---+---+----+---+---+------+
      * |isDiff|a  |b  |c  |d   |e  |f  |o_qtty|
      * +------+---+---+---+----+---+---+------+
      * |true  |11 |A  |666|2017|x  |y  |50.0  |
      * |true  |11 |A  |222|2018|x  |y  |20.0  |
      * |true  |33 |C  |444|2018|x  |y  |20.0  |
      * |false |33 |C  |555|2018|x  |y  |55.0  |
      * |true  |22 |B  |555|2018|x  |y  |20.0  |
      * |true  |99 |D  |888|2018|x  |y  |100.0 |
      * |true  |11 |A  |888|2018|x  |y  |100.0 |
      * |true  |11 |A  |666|2018|x  |y  |80.0  |
      * |true  |33 |C  |666|2018|x  |y  |80.0  |
      * |true  |11 |A  |444|2018|x  |y  |50.0  |
      * +------+---+---+---+----+---+---+------+
      */
sqyvllje

sqyvllje3#

我认为另外两个答案没有效率,因为 join 以及 intersect 为所有记录和所有分区创建哈希表,并比较其全部。至少你可以尝试最简单的解决方案:

df1.rdd.zip(df2.rdd).map {
    case (x,y) => (x, x != y)
  }

并在真实数据集上比较速度。
另外,最好将一个char字符串替换为char,因为char比较非常快。
我没有真正的数据集,所以我不能证明我的答案更快,但我认为这是根据小数据集上的测试,并根据文件和重新大小的问题 join 以及 intersect . zip 不交换分区 join 或者 intersect . 对不起我的英语

相关问题