我一直试图列出所有SparkDataframe从Parquet文件目录除了元数据目录。目录的结构如下所示:
dumped_data/
- time=19424145
- time=19424146
- time=19424147
- _spark_metadata
主要目标是避免从元数据目录读取数据。我创建了一个解决方案,但由于某些原因,它不断返回空值。原因可能是什么?
以下是解决方案:
val dirNamesRegex: Regex = s"\\_spark\\_metadata*".r
def transformDf: Option[DataFrame] = {
val filesDf = listPath(new Path(feedPath))(fsConfig)
.map(_.getName)
.filter(name => !dirNamesRegex.pattern.matcher(name).matches)
.flatMap(path => sparkSession.parquet(Some(feedSchema))(path))
if (!filesDf.isEmpty)
Some(filesDf.reduce(_ union _))
else None
}
listpath—在hdfs中列出数据文件的自定义方法。feedschema是structtype
如果没有if,我会得到这个例外:
java.lang.UnsupportedOperationException: empty.reduceLeft
at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:137)
at scala.collection.immutable.List.reduceLeft(List.scala:84)
at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:208)
at scala.collection.AbstractTraversable.reduce(Traversable.scala:104)
1条答案
按热度按时间pn9klfpd1#
在代码中有3个问题:
看来你可以用
==
运算符而不是正则表达式匹配。您知道要过滤的目录的具体名称,只需使用按名称过滤即可。我拿到你的密码了,
filesDf
有点像Traversable[DataFrame]
. 如果你想降低它的安全性,即使这个集合是空的,你也可以使用reduceLeftOption
而不是reduce
.在你的
transformDf
方法您试图使用spark筛选目录名和读取数据,它可能太重,无法使用spark进行调试。我建议您将逻辑分为两种不同的方法:先读取目录并对其进行过滤,再读取数据并union
把他们变成一个将军DataFrame
.我建议这样的代码示例:
不分逻辑的情况:
逻辑分割的情况:
第二种方法是编写一些轻量级的单元测试来调试路径提取,当您拥有正确的路径时,您可以轻松地从目录中读取数据并合并它们。