如何在Pyspark中使用广播变量的过滤函数

qlvxas9a  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(94)

当我试图过滤广播值与下面的命令是不工作。请帮助我正确的方法

from pyspark.sql import *
spark=SparkSession.builder.appName("broadcast variable").getOrCreate()

states={"CA":"California" , "NY":"Newyork" , "FL":"Florida"}
broadcaststates = spark.sparkContext.broadcast(states)
print(broadcaststates.value)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

columns=["firstname","lastname","country","statename"]

df=spark.createDataFrame(data=data,schema=columns)
df.printSchema()
df.show(truncate=False)

def state_convert(code):
    return broadcaststates.value[code]

result=df.rdd.map(lambda x : (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)

result.show(truncate=False)

fiterDF=df.where(df['states'].isin(broadcaststates.value)))
eanckbw9

eanckbw91#

基本上有两种方法来使用你的广播字典。通过使用*取消装箱键集或使用list将其转换为列表:

df.where(df['statename'].isin(*broadcaststates.value.keys())).show()
df.where(df['statename'].isin(list(broadcaststates.value.keys()))).show()
    • Spark也可以直接使用states.keys()来做同样的事情。事实上,看看解释的结果:
df.where(df['statename'].isin(list(broadcaststates.value.keys()))).explain()
== Physical Plan ==
*(1) Filter statename#3 IN (CA,NY,FL)
+- *(1) Scan ExistingRDD[firstname#0,lastname#1,country#2,statename#3]

密钥集不会广播给执行器,而是嵌入到执行计划中。如果states不太大,这是完全可以的。如果states非常大,则可能导致OutOfMemoryError。在这种情况下,实际广播states并在UDF中使用它可能更有效,如下所示:

from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType

isin_states = F.udf(lambda x : x in broadcaststates.value, BooleanType())
# the variable is broadcasted and not embedded within the execution plan
df.where(isin_states(df['statename'])).show()

相关问题