当我试图过滤广播值与下面的命令是不工作。请帮助我正确的方法
from pyspark.sql import *
spark=SparkSession.builder.appName("broadcast variable").getOrCreate()
states={"CA":"California" , "NY":"Newyork" , "FL":"Florida"}
broadcaststates = spark.sparkContext.broadcast(states)
print(broadcaststates.value)
data = [("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","Williams","USA","CA"),
("Maria","Jones","USA","FL")
]
columns=["firstname","lastname","country","statename"]
df=spark.createDataFrame(data=data,schema=columns)
df.printSchema()
df.show(truncate=False)
def state_convert(code):
return broadcaststates.value[code]
result=df.rdd.map(lambda x : (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)
result.show(truncate=False)
fiterDF=df.where(df['states'].isin(broadcaststates.value)))
1条答案
按热度按时间eanckbw91#
基本上有两种方法来使用你的广播字典。通过使用
*
取消装箱键集或使用list
将其转换为列表:states.keys()
来做同样的事情。事实上,看看解释的结果:密钥集不会广播给执行器,而是嵌入到执行计划中。如果
states
不太大,这是完全可以的。如果states
非常大,则可能导致OutOfMemoryError
。在这种情况下,实际广播states
并在UDF中使用它可能更有效,如下所示: