使用字典中的值过滤sparkDataframe

niwlg2el  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(332)

我需要使用dict来过滤Dataframe,dict的构造方式是,key是列名,value是我要过滤的值:

filter = {'column_1' = 'Y', 'column_2' = 'N'}

我理解如何通过使用下面的函数来使用panda的Dataframe

def filter(df, filters):
    for i in filters:
         filtered_df = df.loc[(df[list(filters)] == pd.Series(filters)).all(axis=1)]
  return filtered_df

然而, .loc 不是Pandas之外使用的方法,我还没有完全掌握spark复制品将是什么。我知道他们有 .locate 方法,但语法似乎完全不同。
非常感谢

aydmsdu9

aydmsdu91#

下面是一个例子。您可以构造一个sql表达式字符串来过滤sparkDataframe。

filter = {'column_1': 'Y', 'column_2': 'N'}

df = spark.createDataFrame([['Y', 'N'], ['Y', 'Y']], ['column_1', 'column_2'])
df.show()
+--------+--------+
|column_1|column_2|
+--------+--------+
|       Y|       N|
|       Y|       Y|
+--------+--------+

filter_string = ' and '.join([f"{k} = '{v}'" for (k, v) in filter.items()])
print(filter_string)

# column_1 = 'Y' and column_2 = 'N'

filtered_df = df.filter(filter_string)
filtered_df.show()
+--------+--------+
|column_1|column_2|
+--------+--------+
|       Y|       N|
+--------+--------+

或者可以将过滤条件构造为Spark柱:

from functools import reduce

filter_col = reduce(lambda x, y: x & y, [F.col(k) == v for (k, v) in filter.items()])

filtered_df = df.filter(filter_col)

相关问题