用于pyspark Dataframe 比较pytestAssert

esyap4oy  于 2022-11-01  发布在  Spark
关注(0)|答案(7)|浏览(131)

我有2个pyspark Dataframe ,如附件中所示。expected_df和actual_df

在我的单元测试中,我试图检查两者是否相等。
我的代码是

expected = map(lambda row: row.asDict(), expected_df.collect()) 
actual = map(lambda row: row.asDict(), actaual_df.collect()) 
assert expected = actual

因为两个DFS相同,但是行顺序不同,所以Assert在这里失败比较这样DFS最好方法是什么

gdx19jrr

gdx19jrr1#

你可以试试pyspark-test
https://pypi.org/project/pyspark-test/
这是受pyspark的panadas测试模块的启发。
使用简单

from pyspark_test import assert_pyspark_df_equal

assert_pyspark_df_equal(df_1, df_2)

此外,除了比较 Dataframe ,就像Pandas测试模块一样,它还接受许多可选参数,您可以在文档中检查这些参数。
注意事项:

  1. panda和pysaprk中的数据类型有点不同,这就是为什么直接转换为.toPandas并使用panadas测试模块可能不是正确的方法。
    1.此软件包用于单元/集成测试,因此适用于小型DFS
6psbrbz9

6psbrbz92#

这在某些pyspark文档中已完成:
assert sorted(expected_df.collect()) == sorted(actaual_df.collect())

rmbxnbpk

rmbxnbpk3#

不幸的是,如果不对任何列(特别是键列)进行排序,就无法实现这一点,原因是没有guarantee for ordering of records in a DataFrame。您无法预测记录在 Dataframe 中的出现顺序。下面的方法对我来说很有效:

expected = expected_df.orderBy('period_start_time').collect()
actual = actaual_df.orderBy('period_start_time').collect() 
assert expected == actual
k4emjkb1

k4emjkb14#

我们用Spark的散列函数对每一行进行散列,然后对结果列求和,从而解决了这个问题。

from pyspark.sql import DataFrame
import pyspark.sql.functions as F

def hash_df(df):
    """Hashes a DataFrame for comparison.

    Arguments:
        df (DataFrame): A dataframe to generate a hash from

    Returns:
        int: Summed value of hashed rows of an input DataFrame
    """
    # Hash every row into a new hash column
    df = df.withColumn('hash_value', F.hash(*sorted(df.columns))).select('hash_value')

    # Sum the hashes, see https://shortest.link/28YE
    value = df.agg(F.sum('hash_value')).collect()[0][0]

    return value

expected_hash = hash_df(expected_df)
actual_hash = hash_df(actual_df)
assert expected_hash == actual_hash
ercv8c1e

ercv8c1e5#

尝试使用“==”而不是“="。assert预期==实际

wj8zmpe1

wj8zmpe16#

我有两个顺序相同的 Dataframe 。比较这两个我用途:

def test_df(df1, df2):
    assert df1.values.tolist() == df2.values.tolist()
eqqqjvef

eqqqjvef7#

另一种确保排序顺序的方法是:

from pandas.testing import assert_frame_equal

def assert_frame_with_sort(results, expected, key_columns):
    results_sorted = results.sort_values(by=key_columns).reset_index(drop=True)
    expected_sorted = expected.sort_values(by=key_columns).reset_index(drop=True)
    assert_frame_equal(results_sorted, expected_sorted)

相关问题