我正在编写一个sparkscala代码来将输出写入bq,下面是用于形成具有两列(id和关键字)的输出表的代码
val df1 = Seq("tamil", "telugu", "hindi").toDF("language")
val df2 = Seq(
(101, Seq("tamildiary", "tamilkeyboard", "telugumovie")),
(102, Seq("tamilmovie")),
(103, Seq("hindirhymes", "hindimovie"))
).toDF("id", "keywords")
val pattern = concat(lit("^"), df1("language"), lit(".*"))
import org.apache.spark.sql.Row
val arrayToMap = udf{ (arr: Seq[Row]) =>
arr.map{ case Row(k: String, v: Int) => (k, v) }.toMap
}
val final_df = df2.
withColumn("keyword", explode($"keywords")).as("df2").
join(df1.as("df1"), regexp_replace($"df2.keyword", pattern, lit("")) =!= $"df2.keyword").
groupBy("id", "language").agg(size(collect_list($"language")).as("count")).
groupBy("id").agg(arrayToMap(collect_list(struct($"language", $"count"))).as("keywords"))
最终的输出是:
+---+--------------------+
| id| app_language|
+---+--------------------+
|101|Map(tamil -> 2, t...|
|103| Map(hindi -> 2)|
|102| Map(tamil -> 1)|
+---+--------------------+
我正在定义下面的函数来传递这个输出表的模式(因为bq不支持map字段,所以我使用struct数组。但这也不起作用)
def createTableIfNotExists(outputTable: String) = {
spark.createBigQueryTable(
s"""
|CREATE TABLE IF NOT EXISTS $outputTable(
|ds date,
|id int64,
|keywords ARRAY<STRUCT<key STRING, value INT64>>
|)
|PARTITION BY ds
|CLUSTER BY user_id
""".stripMargin)
}
谁能帮我写一个正确的模式,以便它在bq兼容。
1条答案
按热度按时间ifmq2ha21#
可以按如下方式收集结构数组:
然后你可以有一个类似