Spark Dataframe验证parquet写入的列名

fhity93d  于 2023-02-05  发布在  Apache
关注(0)|答案(7)|浏览(139)

我正在使用从JSON事件流转换而来的Dataframe来处理事件,这些事件流最终会被写成Parquet格式。
然而,一些JSON事件在键中包含空格,我希望在将其转换为Parquet之前记录并过滤/删除 Dataframe 中的此类事件,因为;{}()\n\t=被视为Parquet模式(CatalystSchemaConverter)中的特殊字符,如下方的[1]中所列,因此不允许出现在列名中。
我怎样才能在Dataframe中对列名进行这样的验证,并完全删除这样的事件,而不会在Spark Streaming作业中出错。

**[1]**Spark的催化剂示意图转换器

def checkFieldName(name: String): Unit = {
  // ,;{}()\n\t= and space are special characters in Parquet schema
  checkConversionRequirement(
    !name.matches(".*[ ,;{}()\n\t=].*"),
    s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
             |Please use alias to rename it.
           """.stripMargin.split("\n").mkString(" ").trim
  )
}
hrirmatl

hrirmatl1#

对于***pyspark***的所有体验者:这甚至发生在我重命名列之后。2一个我可以在一些迭代之后让它工作的方法是:

file = "/opt/myfile.parquet"
df = spark.read.parquet(file)
for c in df.columns:
    df = df.withColumnRenamed(c, c.replace(" ", ""))

df = spark.read.schema(df.schema).parquet(file)
ca1c2owp

ca1c2owp2#

在写parquet之前,你可以使用正则表达式将所有无效字符替换为下划线。另外,也可以去掉列名中的重音符号。
下面是一个函数normalize,它可以在Scala和Python中实现这一点:

斯卡拉

/**
  * Normalize column name by replacing invalid characters with underscore
  * and strips accents
  *
  * @param columns dataframe column names list
  * @return the list of normalized column names
  */
def normalize(columns: Seq[String]): Seq[String] = {
  columns.map { c =>
    org.apache.commons.lang3.StringUtils.stripAccents(c.replaceAll("[ ,;{}()\n\t=]+", "_"))
  }
}

// using the function
val df2 = df.toDF(normalize(df.columns):_*)

巨蟒

import unicodedata
import re

def normalize(column: str) -> str:
    """
    Normalize column name by replacing invalid characters with underscore
    strips accents and make lowercase
    :param column: column name
    :return: normalized column name
    """
    n = re.sub(r"[ ,;{}()\n\t=]+", '_', column.lower())
    return unicodedata.normalize('NFKD', n).encode('ASCII', 'ignore').decode()

# using the function
df = df.toDF(*map(normalize, df.columns))
tmb3ates

tmb3ates3#

这是我使用Regex的解决方案,以便按照parquet约定重命名所有 Dataframe 的列:

df.columns.foldLeft(df){
  case (currentDf,  oldColumnName) => currentDf.withColumnRenamed(oldColumnName, oldColumnName.replaceAll("[ ,;{}()\n\t=]", ""))
}

希望能有所帮助,

liwlm1x9

liwlm1x94#

列名中包含空格时我也遇到了同样的问题。
解决方案的第一部分是将名称放在反引号中。
解决方案的第二部分是用下划线替换空格。
对不起,但我只有pyspark代码准备好了:

from pyspark.sql import functions as F

df_tmp.select(*(F.col("`" + c+ "`").alias(c.replace(' ', '_')) for c in df_tmp.columns)
jdzmm42g

jdzmm42g5#

使用alias更改字段名称,但不使用这些特殊字符。

xwbd5t1u

xwbd5t1u6#

我遇到了此错误"SQL语句中的错误:分析异常:在","中发现无效字符;{}()\n\t ="在架构的列名中。请通过将表属性" delta. columnMapping. mode "设置为" name "来启用列Map。有关详细信息,请参阅https://learn.microsoft.com/azure/databricks/delta/delta-column-mapping,或者可以使用别名对其进行重命名。"
这个问题是因为我在创建一个基于 parquet /Delta表的表时使用了MAX(COLUM_NAME),而新表的新名称是"MAX(COLUM_NAME)",因为忘记使用别名,而且 parquet 文件不支持括号'()'
通过使用别名解决(删除括号)

sy5wg1nm

sy5wg1nm7#

至少对于 parquet 文件(我测试过),它在Spark 3.3.0 release中得到了修复,它也可以与JSON一起工作。

相关问题