如何在PySpark中实现广播变量而不使用UDF函数?用于生成新列

ylamdve6  于 2023-08-02  发布在  Spark
关注(0)|答案(1)|浏览(128)

我一直在探索广播变量的实现,其中我在示例数据集中有一个名为“State_Code”的列。我想利用广播变量来使它像'CA':'California', 'NJ' : 'New Jersey'一样
我已经在Spark中实现了下面的代码,它完成了创建新列所需的工作

val = {"CA": "California", "NY": "New York", "NJ": "New Jersey"}

broad = sc.broadcast(val)

def broad_function(a):
    return broad.value[a]

broad_udf = udf(broad_function)

df.withColumn('State_Name',broad_udf('State_code')).show()

字符串
上面的代码是为了用State Names创建新的列,但是使用UDF不会使用spark Optimization。使用广播变量的全部目的是为了优化,我们如何使用广播变量来创建一个新的列,而不使用UDF,也不将其转换为RDD?
我尝试使用when,col,但是它们不会使用广播变量。期望使用广播变量创建新列,而不使用 Dataframe 中的UDF

bf1o4zei

bf1o4zei1#

有多种方法可以做到这一点,而UDF通常不是继续进行的选择。
我不是100%确定你是否想要这样的答案,如果你的目标不同,请让我知道。
因此,这种方法基于使用Spark Config autoBroadcastJoinThreshold

STEP 1:将HashMap/Dict转换为dataframe

import scala.collection.mutable.HashMap
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StringType,StructField,StructType}

spark.conf.set("autoBroadcastJoinThreshold",104857600)

def convertHashMapToDataframe(hash:HashMap[String,String],keyCol:String,valueCol:String):DataFrame = { 
      val rows = hash.toSeq.map { case (keyCol,valueCol) => Row(keyCol,valueCol) }
      val convertedRDD = spark.sparkContext.parallelize(rows)
      val schema = StructType(Seq(
        StructField(keyCol,StringType,true),
        StructField(valueCol,StringType,true)
      ))
      val df = spark.createDataFrame(convertedRDD,schema=schema)
      df
}

val states = HashMap("CA"->"California", "NY"-> "New York", "NJ"-> "New Jersey")

val state_df = convertHashMapToDataframe(state_dict,"code","name")
state_df.show()

+----+----------+
|code|      name|
+----+----------+
|  NJ|New Jersey|
|  NY|  New York|
|  CA|California|
+----+----------+

val data = Seq((11,"CA","Jose"),(21,"NJ","Shaun"),(113,"NY","Terry")).toDF("id","state_code","name")
data.show()

+---+----------+-----+
| id|state_code| name|
+---+----------+-----+
| 11|        CA| Jose|
| 21|        NJ|Shaun|
|113|        NY|Terry|
+---+----------+-----+

字符串

第二步:开启配置,加入

val result = data.join(
                      broadcast(state_df),
                      col("state_code")===col("code"),
                      "left"
                      )
result.show()

+---+----------+-----+----+----------+
| id|state_code| name|code|      name|
+---+----------+-----+----+----------+
| 11|        CA| Jose|  CA|California|
| 21|        NJ|Shaun|  NJ|New Jersey|
|113|        NY|Terry|  NY|  New York|
+---+----------+-----+----+----------+


我希望这能给你解决问题的另一个视角。我会更新这个答案,提出更多的方法。
参考文献-
SparkByExamplesLink

相关问题