dataframe转换产生空值

camsedfj  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(444)

我一直试图列出所有SparkDataframe从Parquet文件目录除了元数据目录。目录的结构如下所示:

dumped_data/
 - time=19424145
 - time=19424146
 - time=19424147
 - _spark_metadata

主要目标是避免从元数据目录读取数据。我创建了一个解决方案,但由于某些原因,它不断返回空值。原因可能是什么?
以下是解决方案:

val dirNamesRegex: Regex = s"\\_spark\\_metadata*".r

def transformDf: Option[DataFrame] = {
 val filesDf = listPath(new Path(feedPath))(fsConfig)
      .map(_.getName)
      .filter(name => !dirNamesRegex.pattern.matcher(name).matches)
      .flatMap(path => sparkSession.parquet(Some(feedSchema))(path))

    if (!filesDf.isEmpty)
      Some(filesDf.reduce(_ union _))
    else None
 }

listpath—在hdfs中列出数据文件的自定义方法。feedschema是structtype
如果没有if,我会得到这个例外:

java.lang.UnsupportedOperationException: empty.reduceLeft
    at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:137)
    at scala.collection.immutable.List.reduceLeft(List.scala:84)
    at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:208)
    at scala.collection.AbstractTraversable.reduce(Traversable.scala:104)
pn9klfpd

pn9klfpd1#

在代码中有3个问题:
看来你可以用 == 运算符而不是正则表达式匹配。您知道要过滤的目录的具体名称,只需使用按名称过滤即可。
我拿到你的密码了, filesDf 有点像 Traversable[DataFrame] . 如果你想降低它的安全性,即使这个集合是空的,你也可以使用 reduceLeftOption 而不是 reduce .
在你的 transformDf 方法您试图使用spark筛选目录名和读取数据,它可能太重,无法使用spark进行调试。我建议您将逻辑分为两种不同的方法:先读取目录并对其进行过滤,再读取数据并 union 把他们变成一个将军 DataFrame .
我建议这样的代码示例:
不分逻辑的情况:

def transformDf: Option[DataFrame] = {
  listPath(new Path(feedPath))(fsConfig)
    .map(_.getName)
    .filter(name => name == "_spark_metadata")
    .flatMap(path => sparkSession.parquet(Some(feedSchema))(path))
    .reduceLeftOption(_ union _)
}

逻辑分割的情况:

def getFilteredPaths: List[String] =
  listPath(new Path(feedPath))(fsConfig)
    .map(_.getName)
    .filter(name => name == "_spark_metadata")

def transformDf: Option[DataFrame] = {
  getFilteredPaths
    .flatMap(path => sparkSession.parquet(Some(feedSchema))(path))
    .reduceLeftOption(_ union _)
}

第二种方法是编写一些轻量级的单元测试来调试路径提取,当您拥有正确的路径时,您可以轻松地从目录中读取数据并合并它们。

相关问题