以下面的dataframe为例:
val df = Seq(Seq("xxx")).toDF("a")
架构:
root
|-- a: array (nullable = true)
| |-- element: string (containsNull = true)
如何修改 df
使生成的Dataframe在任何地方都不可为空,即具有以下架构:
root
|-- a: array (nullable = false)
| |-- element: string (containsNull = false)
我知道我可以重新创建另一个Dataframe来强制一个不可为null的模式,比如在sparkDataframe中更改column的nullable属性
spark.createDataFrame(df.rdd, StructType(StructField("a", ArrayType(StringType, false), false) :: Nil))
但这不是结构化流媒体下的选项,所以我希望它是某种就地修改。
1条答案
按热度按时间wr98u20j1#
因此,实现这一目标的方法是
UserDefinedFunction
```// Problem setup
val df = Seq(Seq("xxx")).toDF("a")
df.printSchema
root
|-- a: array (nullable = true)
| |-- element: string (containsNull = true)
import org.apache.spark.sql.types.{ArrayType, StringType}
import org.apache.spark.sql.functions.{udf, col}
// We define a sub schema with the appropriate data type and null condition
val subSchema = ArrayType(StringType, containsNull = false)
// We create a UDF that applies this sub schema
// while specifying the output of the UDF to be non-nullable
val applyNonNullableSchemaUdf = udf((x:Seq[String]) => x, subSchema).asNonNullable
// We apply the UDF
val newSchemaDF = df.withColumn("a", applyNonNullableSchemaUdf(col("a")))
// Check new schema
newSchemaDF.printSchema
root
|-- a: array (nullable = false)
| |-- element: string (containsNull = false)
// Check that it actually works
newSchemaDF.show
+-----+
| a|
+-----+
|[xxx]|
+-----+