对于spark和rdd来说是非常新的,所以我希望我能很好地解释我想要的东西,以便有人能够理解和帮助:)
我有两个非常大的数据集,比如300万行50列,存储在hadoop hdfs中。我想做的是将这两个都读入rdd,以便它使用并行性&我想返回第三个rdd,其中包含所有不匹配的记录(来自任一rdd)。
下面希望有助于显示我要做什么。。。只是想以最快最有效的方式找到所有不同的记录。。。
数据的顺序不一定相同—rdd1的第1行可能是rdd2的第4行。
非常感谢!!
所以。。。这似乎是做什么,我想它,但它似乎远容易被正确。。。
%spark
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD
import sqlContext.implicits._
import org.apache.spark.sql._
//create the tab1 rdd.
val rdd1 = sqlContext.sql("select * FROM table1").withColumn("source",lit("tab1"))
//create the tab2 rdd.
val rdd2 = sqlContext.sql("select * FROM table2").withColumn("source",lit("tab2"))
//create the rdd of all misaligned records between table1 and the table2.
val rdd3 = rdd1.except(rdd2).unionAll(rdd2.except(rdd1))
//rdd3.printSchema()
//val rdd3 = rdd1.except(rdd2)
//drop the temporary table that was used to create a hive compatible table from the last run.
sqlContext.dropTempTable("table3")
//register the new temporary table.
rdd3.toDF().registerTempTable("table3")
//drop the old compare table.
sqlContext.sql("drop table if exists data_base.compare_table")
//create the new version of the s_asset compare table.
sqlContext.sql("create table data_base.compare_table as select * from table3")
这是我到目前为止完成的最后一段代码,它似乎在做这项工作-不确定在整个数据集上的性能,我会祈祷。。。
非常感谢所有花时间帮助这个可怜的人:)
p、 如果有人有一个更高性能的解决方案,我很乐意听到!或者如果你能看到一些问题,这可能意味着它将返回错误的结果。
3条答案
按热度按时间e4eetjau1#
两者都可以用“full\ u outer”连接,然后应用过滤器,其中字段值在两者中进行比较:
输出:
83qze16e2#
将两个Dataframe加载为
df1
,df2
添加source
默认值为的列rdd1
以及rdd2
分别地工会
df1
以及df2
分组依据"rowid", "name", "status", "lastupdated"
把它的来源收集起来筛选具有单个源的所有行
1wnzp6jl3#
您可以将数据读入dataframes而不是rdd,然后使用union和groupby来实现结果