我试图通过读取Dataframe来比较两个表()。对于这些表中的每个公共列,使用主键(如order\u id)与其他列(如order\u date、order\u name、order\u event)串联。
我使用的scala代码
val primary_key=order_id
for (i <- commonColumnsList){
val column_name = i
val tempDataFrameForNew = newDataFrame.selectExpr(s"concat($primaryKey,$i) as concatenated")
val tempDataFrameOld = oldDataFrame.selectExpr(s"concat($primaryKey,$i) as concatenated")
//Get those records which aren common in both old/new tables
matchCountCalculated = tempDataFrameForNew.intersect(tempDataFrameOld)
//Get those records which aren't common in both old/new tables
nonMatchCountCalculated = tempDataFrameOld.unionAll(tempDataFrameForNew).except(matchCountCalculated)
//Total Null/Non-Null Counts in both old and new tables.
nullsCountInNewDataFrame = newDataFrame.select(s"$i").filter(x => x.isNullAt(0)).count().toInt
nullsCountInOldDataFrame = oldDataFrame.select(s"$i").filter(x => x.isNullAt(0)).count().toInt
nonNullsCountInNewDataFrame = newDFCount - nullsCountInNewDataFrame
nonNullsCountInOldDataFrame = oldDFCount - nullsCountInOldDataFrame
//Put the result for a given column in a Seq variable, later convert it to Dataframe.
tempSeq = tempSeq :+ Row(column_name, matchCountCalculated.toString, nonMatchCountCalculated.toString, (nullsCountInNewDataFrame - nullsCountInOldDataFrame).toString,
(nonNullsCountInNewDataFrame - nonNullsCountInOldDataFrame).toString)
}
// Final Step: Create DataFrame using Seq and some Schema.
spark.createDataFrame(spark.sparkContext.parallelize(tempSeq), schema)
上面的代码对于中等数据集来说运行良好,但是随着新旧表中列和记录数量的增加,执行时间也在增加。任何建议都很感激。先谢谢你。
1条答案
按热度按时间j5fpnvbx1#
您可以执行以下操作:
1外部连接priamary键上的新旧Dataframe
joined_df = df_old.join(df_new, primary_key, "outer")
2如果可能的话,把它藏起来。这会节省你很多时间三。现在可以使用spark函数对列进行迭代和比较(
.isNull
对于不匹配,==
用于匹配等)这应该要快得多,尤其是当您可以缓存Dataframe时。如果不能,那么最好将加入的df保存到磁盘,以避免每次洗牌