我有两个数据集 df1
以及 df2
在这里我需要检测任何不同于 df2
比较 df1
并创建一个带有附加列的结果数据集,该列标记不同的记录。下面是一个例子。
package playground
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, sum}
object sample4 {
val spark = SparkSession
.builder()
.appName("Sample app")
.master("local")
.getOrCreate()
val sc = spark.sparkContext
final case class Owner(a: Long,
b: String,
c: Long,
d: Short,
e: String,
f: String,
o_qtty: Double)
final case class Result(a: Long,
b: String,
c: Long,
d: Short,
e: String,
f: String,
o_qtty: Double,
isDiff: Boolean)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
import spark.implicits._
val data1 = Seq(
Owner(11, "A", 666, 2017, "x", "y", 50),
Owner(11, "A", 222, 2018, "x", "y", 20),
Owner(33, "C", 444, 2018, "x", "y", 20),
Owner(33, "C", 555, 2018, "x", "y", 120),
Owner(22, "B", 555, 2018, "x", "y", 20),
Owner(99, "D", 888, 2018, "x", "y", 100),
Owner(11, "A", 888, 2018, "x", "y", 100),
Owner(11, "A", 666, 2018, "x", "y", 80),
Owner(33, "C", 666, 2018, "x", "y", 80),
Owner(11, "A", 444, 2018, "x", "y", 50),
)
val data2 = Seq(
Owner(11, "A", 666, 2017, "x", "y", 50),
Owner(11, "A", 222, 2018, "x", "y", 20),
Owner(33, "C", 444, 2018, "x", "y", 20),
Owner(33, "C", 555, 2018, "x", "y", 55),
Owner(22, "B", 555, 2018, "x", "y", 20),
Owner(99, "D", 888, 2018, "x", "y", 100),
Owner(11, "A", 888, 2018, "x", "y", 100),
Owner(11, "A", 666, 2018, "x", "y", 80),
Owner(33, "C", 666, 2018, "x", "y", 80),
Owner(11, "A", 444, 2018, "x", "y", 50),
)
val expected = Seq(
Result(11, "A", 666, 2017, "x", "y", 50, isDiff = false),
Result(11, "A", 222, 2018, "x", "y", 20, isDiff = false),
Result(33, "C", 444, 2018, "x", "y", 20, isDiff = false),
Result(33, "C", 555, 2018, "x", "y", 55, isDiff = true),
Result(22, "B", 555, 2018, "x", "y", 20, isDiff = false),
Result(99, "D", 888, 2018, "x", "y", 100, isDiff = false),
Result(11, "A", 888, 2018, "x", "y", 100, isDiff = false),
Result(11, "A", 666, 2018, "x", "y", 80, isDiff = false),
Result(33, "C", 666, 2018, "x", "y", 80, isDiff = false),
Result(11, "A", 444, 2018, "x", "y", 50, isDiff = false),
)
val df1 = spark
.createDataset(data1)
.as[Owner]
.cache()
val df2 = spark
.createDataset(data2)
.as[Owner]
.cache()
}
}
最有效的方法是什么?
3条答案
按热度按时间58wvjzkj1#
我想这段代码可以帮你找到答案:
5jvtdoz22#
也许这是有帮助的-
执行左连接并将不匹配的列标识为false
sqyvllje3#
我认为另外两个答案没有效率,因为
join
以及intersect
为所有记录和所有分区创建哈希表,并比较其全部。至少你可以尝试最简单的解决方案:并在真实数据集上比较速度。
另外,最好将一个char字符串替换为char,因为char比较非常快。
我没有真正的数据集,所以我不能证明我的答案更快,但我认为这是根据小数据集上的测试,并根据文件和重新大小的问题
join
以及intersect
.zip
不交换分区join
或者intersect
. 对不起我的英语