我正在使用从JSON事件流转换而来的Dataframe来处理事件,这些事件流最终会被写成Parquet格式。
然而,一些JSON事件在键中包含空格,我希望在将其转换为Parquet之前记录并过滤/删除 Dataframe 中的此类事件,因为;{}()\n\t=
被视为Parquet模式(CatalystSchemaConverter)中的特殊字符,如下方的[1]中所列,因此不允许出现在列名中。
我怎样才能在Dataframe中对列名进行这样的验证,并完全删除这样的事件,而不会在Spark Streaming作业中出错。
**[1]**Spark的催化剂示意图转换器
def checkFieldName(name: String): Unit = {
// ,;{}()\n\t= and space are special characters in Parquet schema
checkConversionRequirement(
!name.matches(".*[ ,;{}()\n\t=].*"),
s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
|Please use alias to rename it.
""".stripMargin.split("\n").mkString(" ").trim
)
}
7条答案
按热度按时间hrirmatl1#
对于***pyspark***的所有体验者:这甚至发生在我重命名列之后。2一个我可以在一些迭代之后让它工作的方法是:
ca1c2owp2#
在写parquet之前,你可以使用正则表达式将所有无效字符替换为下划线。另外,也可以去掉列名中的重音符号。
下面是一个函数
normalize
,它可以在Scala和Python中实现这一点:斯卡拉
巨蟒
tmb3ates3#
这是我使用Regex的解决方案,以便按照parquet约定重命名所有 Dataframe 的列:
希望能有所帮助,
liwlm1x94#
列名中包含空格时我也遇到了同样的问题。
解决方案的第一部分是将名称放在反引号中。
解决方案的第二部分是用下划线替换空格。
对不起,但我只有pyspark代码准备好了:
jdzmm42g5#
使用
alias
更改字段名称,但不使用这些特殊字符。xwbd5t1u6#
我遇到了此错误"SQL语句中的错误:分析异常:在","中发现无效字符;{}()\n\t ="在架构的列名中。请通过将表属性" delta. columnMapping. mode "设置为" name "来启用列Map。有关详细信息,请参阅https://learn.microsoft.com/azure/databricks/delta/delta-column-mapping,或者可以使用别名对其进行重命名。"
这个问题是因为我在创建一个基于 parquet /Delta表的表时使用了MAX(COLUM_NAME),而新表的新名称是"MAX(COLUM_NAME)",因为忘记使用别名,而且 parquet 文件不支持括号'()'
通过使用别名解决(删除括号)
sy5wg1nm7#
至少对于 parquet 文件(我测试过),它在Spark 3.3.0 release中得到了修复,它也可以与JSON一起工作。