我正在使用pyspark,我需要比较两个文件的内容-做一个diffcheck。所以我把测试分为两部分:
我在文件之间取连接列,并检查记录是否相同
我从每个文件中获取唯一的列,并保存它们(手动测试)
第一步是采取行动
joined_columns = set(tbl1.columns).intersection(set(tbl2.columns))
joined_columns_str = str(joined_columns)[1:-1].replace("'", "")
tbl1_set = spark.sql("SELECT " + joined_columns_str + " FROM tbl1")
tbl2_set = spark.sql("SELECT " + joined_columns_str + " FROM tbl2")
sql = "SELECT * FROM tbl1_set " \
"EXCEPT " \
"SELECT * FROM tbl2_set "
different_records = spark.sql(sql)
但是,在这些文件中,我有Map列-因此我得到以下错误:
org.apache.spark.sql.AnalysisException: Cannot have map type columns in DataFrame which calls set operations(intersect, except, etc.)
有没有人知道如何解决这个问题,只得到不完全相同的记录?我考虑过使用自定义项,但在“select*”上找不到如何使用它,只有在引用特定字段时。。。
任何帮助都将不胜感激。
谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!