scala udf函数,用于对数组列进行操作并返回自定义值

a14dhokn  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(454)

我有两个这样的数据集

val jsonStr ="""{

        "TransactionId": 1,
        "TransactionName": "Name",
        "Order": 12, 
        "ReplaceStrings": [
            "UNDEFINED","INVALID"
        ],
        "Country" : "China"           

}"""

val configurations = spark.read.json(Seq(jsonStr).toDS)

这有我所有的配置和过滤器

My Data
val data =  Seq((1,"Mindy","Devaney","mdevaney0@cnbc.com","Female","United States","UTF-8"),(2,"Charmain","Clear","candriolli1@miitbeian.gov.cn","Female","**China**","UTF-8"),(3,"Dilan","**UNDEFINED**","dphilipeaux2@jalbum.net","Male","**China**","Windows-1252")).toDF("id","Fname","LName","mailid","Gender","Country","Codepage" )

现在,我的任务是将带有过滤器的配置数据连接起来,并在过滤器应用于中国国家时使用上述数据检索相应的结果,所有未定义为值的lname将被替换为空字符串。
我试着用一些udf来定义这个函数,但还是停留在如何发送一个 Package 数组的json值上,或者尝试使用seq数据类型
如果有人看了类似的案例或想法请与我分享。

tkclm6bt

tkclm6bt1#

检查以下代码。

scala> data.show(false)
+---+--------+-------------+----------------------------+------+-------------+------------+
|id |Fname   |LName        |mailid                      |Gender|Country      |Codepage    |
+---+--------+-------------+----------------------------+------+-------------+------------+
|1  |Mindy   |Devaney      |mdevaney0@cnbc.com          |Female|United States|UTF-8       |
|2  |Charmain|Clear        |candriolli1@miitbeian.gov.cn|Female|**China**  |UTF-8       |
|3  |Dilan   |**UNDEFINED**|dphilipeaux2@jalbum.net     |Male  |**China**  |Windows-1252|
+---+--------+-------------+----------------------------+------+-------------+------------+
scala> configurations.show(false)
+-------+-----+--------------------+-------------+---------------+
|Country|Order|ReplaceStrings      |TransactionId|TransactionName|
+-------+-----+--------------------+-------------+---------------+
|China  |12   |[UNDEFINED, INVALID]|1            |Name           |
+-------+-----+--------------------+-------------+---------------+
scala> val check = udf((lname:String,replaceStrings:Seq[String]) => if(replaceStrings.map(d => s"**${d}**").contains(lname)) "" else lname )
scala> data.join(configurations,data("Country").contains(configurations("Country")),"inner").withColumn("LName",check($"LName",$"ReplaceStrings")).drop(configurations("Country")).show(false)
+---+--------+-----+----------------------------+------+---------+------------+-----+--------------------+-------------+---------------+
|id |Fname   |LName|mailid                      |Gender|Country  |Codepage    |Order|ReplaceStrings      |TransactionId|TransactionName|
+---+--------+-----+----------------------------+------+---------+------------+-----+--------------------+-------------+---------------+
|2  |Charmain|Clear|candriolli1@miitbeian.gov.cn|Female|**China**|UTF-8       |12   |[UNDEFINED, INVALID]|1            |Name           |
|3  |Dilan   |     |dphilipeaux2@jalbum.net     |Male  |**China**|Windows-1252|12   |[UNDEFINED, INVALID]|1            |Name           |
+---+--------+-----+----------------------------+------+---------+------------+-----+--------------------+-------------+---------------+

相关问题