我试图保存一个带有分区的parquet sparkDataframe到临时目录中,以便进行单元测试,但是由于某些原因,没有创建分区。数据本身保存在目录中,可用于测试。以下是我为此创建的方法:
def saveParquet(df: DataFrame, partitions: String*): String = {
val path = createTempDir()
df.repartition(1).parquet(path)(partitions: _*)
path
}
val feedPath: String = saveParquet(feedDF.select(feed.schema), "processing_time")
此方法适用于具有各种模式的各种Dataframe,但由于某些原因,不会为此方法生成分区。我已经注销了生成的路径,它如下所示:
/var/folders/xg/fur_diuhg83b2ba15ih2rt822000dhst/T/testutils-samples8512758291/jf81n7bsj-95hs-573n-b73h-7531ug04515
但应该是这样的:
/var/folders/xg/fur_diuhg83b2ba15ih2rt822000dhst/T/testutils-samples8512758291/jf81n7bsj-95hs-573n-b73h-7531ug04515/processing_time=1591714800000/part-some-random-numbersnappy.parquet
我已经检查了数据和所有的列在分区之前都读取得很好,一旦创建了分区调用,就会出现这个问题。另外,我在目录上运行了一个regex,但在测试样本上出现匹配错误- s".*processing_time=([0-9]+)/.*parquet".r
那么这个问题的原因是什么呢?我还能怎样划分Dataframe呢?
Dataframe架构如下所示:
val schema: StructType = StructType(
Seq(
StructField("field1", StringType),
StructField("field2", LongType),
StructField("field3", StringType),
StructField("field4Id", IntegerType, nullable = true),
StructField("field4", FloatType, nullable = true),
StructField("field5Id", IntegerType, nullable = true),
StructField("field5", FloatType, nullable = true),
StructField("field6Id", IntegerType, nullable = true),
StructField("field6", FloatType, nullable = true),
//partition keys
StructField("processing_time", LongType)
)
)
暂无答案!
目前还没有任何答案,快来回答吧!