我在google数据融合中有一个管道,它在google云存储桶的目标目录中生成一个名为“part-00000-”的csv文件(以及一个名为“\u success”的文件)。“part-00000”后面的文件名的其余部分总是不同的和随机的。
管道通过解析、处理和连接输入文件(都来自一些google云存储位置)来生成新的输出,然后将新的输出与旧的现有输出文件连接起来,并将“part-00000”文件放在与名为“internal\u dashboard.csv”的旧输出文件相同的位置。
无论如何,我需要做的是手动将“part-00000”文件重命名为“internal\u dashboard.csv”并替换旧文件。
以下是我在spark sink中的尝试(我从这里,这里,这里,这里,这里)。我们的想法是首先找到一个文件名中有“part-00000”的文件,然后重命名它并覆盖旧文件。到目前为止,我所有的尝试都失败了:
尝试1
import java.nio.file.{Files, Paths, StandardCopyOption}
import scala.util.matching.Regex
def recursiveListFiles(f: File, r: Regex): Array[File] = {
val these = f.listFiles
val good = these.filter(f => r.findFirstIn(f.getName).isDefined)
good ++ these.filter(_.isDirectory).flatMap(recursiveListFiles(_,r))
}
def moveRenameFile(source: String, destination: String): Unit = {
val path = Files.move(
Paths.get(source),
Paths.get(destination),
StandardCopyOption.REPLACE_EXISTING
)
// could return `path`
}
def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {
val fullpath = "gs://some_bucket/output/internal_dashboard"
val targetfilename = "internal_dashboad.csv"
df.coalesce(1)
.write.format("csv")
.option("header", "true")
.mode("append") // "overwrite" "append"
.save(fullpath)
val existingfilename = recursiveListFiles(new File(fullpath), "part-00000-.*")
moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}
尝试2:
import java.io.File
def getListOfFiles(dir: File, extensions: List[String]): List[File] = {
dir.listFiles.filter(_.isFile).toList.filter { file =>
extensions.exists(file.getName.startsWith(_))
}
}
def moveRenameFile(source: String, destination: String): Unit = {
val path = Files.move(
Paths.get(source),
Paths.get(destination),
StandardCopyOption.REPLACE_EXISTING
)
// could return `path`
}
def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {
val fullpath = "gs://some_bucket/output/internal_dashboard"
val targetfilename = "internal_dashboad.csv"
df.coalesce(1)
.write.format("csv")
.option("header", "true")
.mode("append") // "overwrite" "append"
.save(fullpath)
val suffixList = List("part-00000")
val existingfilename = getListOfFiles(new File(fullpath), suffixList )
moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}
尝试3:
def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {
val fullpath = "gs://some_bucket/output/internal_dashboard"
val targetfilename = "internal_dashboad.csv"
val pathandfile = fullpath + "/" + targefilename
df.coalesce(1)
.write.format("csv")
.option("header", "true")
.mode("append") // "overwrite" "append"
.save(pathandfile )
dbutils.fs.ls(fullpath).filter(file=>file.name.endsWith("csv")).foreach(f => dbutils.fs.rm(f.path,true))
dbutils.fs.mv(dbutils.fs.ls(pathandfile).filter(file=>file.name.startsWith("part-00000"))(0).path,pathandfile ")
dbutils.fs.rm(pathandfile,true)
}
我需要scala或其他方面的帮助,将“part-00000”文件重命名为“internal\u dashboard.csv”并覆盖旧版本。
对于那些没有使用过数据融合的人,我可以使用的工具有:
Spark槽:
scala spark程序(可以在sink之前或之后):
Description
Executes user-provided Spark code in Scala.
Use Case
This plugin can be used when you want arbitrary Spark code.
Properties
mainClass: The fully qualified class name for the Spark application. It must either be an object that has a main method define inside, with the method signature as def main(args: Array[String]): Unit; or it is a class that extends from the CDAP co.cask.cdap.api.spark.SparkMain trait that implements the run method, with the method signature as def run(implicit sec: SparkExecutionContext): Unit
pyspark程序(可以在sink之前或之后出现):
Description
Executes user-provided Spark code in Python.
Use Case
This plugin can be used when you want to run arbitrary Spark code.
编辑:
(2020年11月2日)我刚刚了解到,还有一些googlecloud函数可以用python(或java)编写,并在它所在的bucket中发生更改时触发。如果有人知道如何使这样一个功能,可以重命名和覆盖'第00000部分'文件时触发,请让我知道。如果其他都失败了,我就试试看。
1条答案
按热度按时间m0rkklqb1#
避免在aws s3上重命名对象。没有这种东西,它所做的只是“剪切粘贴”=>非常昂贵的操作。
您可以尝试:
如果您坚持使用“重命名”,请使用hadoop库,而不是java:
注意:使用awss3时,两个路径必须在同一个bucket中(它们具有不同的文件系统对象,在使用rename(…)时适用)。