在google data fusion中使用spark工具重命名输出文件

a1o7rhls  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(464)

我在google数据融合中有一个管道,它在google云存储桶的目标目录中生成一个名为“part-00000-”的csv文件(以及一个名为“\u success”的文件)。“part-00000”后面的文件名的其余部分总是不同的和随机的。

管道通过解析、处理和连接输入文件(都来自一些google云存储位置)来生成新的输出,然后将新的输出与旧的现有输出文件连接起来,并将“part-00000”文件放在与名为“internal\u dashboard.csv”的旧输出文件相同的位置。
无论如何,我需要做的是手动将“part-00000”文件重命名为“internal\u dashboard.csv”并替换旧文件。
以下是我在spark sink中的尝试(我从这里,这里,这里,这里,这里)。我们的想法是首先找到一个文件名中有“part-00000”的文件,然后重命名它并覆盖旧文件。到目前为止,我所有的尝试都失败了:
尝试1

import java.nio.file.{Files, Paths, StandardCopyOption}
import scala.util.matching.Regex

def recursiveListFiles(f: File, r: Regex): Array[File] = {
  val these = f.listFiles
  val good = these.filter(f => r.findFirstIn(f.getName).isDefined)
  good ++ these.filter(_.isDirectory).flatMap(recursiveListFiles(_,r))
}

def moveRenameFile(source: String, destination: String): Unit = {
    val path = Files.move(
        Paths.get(source),
        Paths.get(destination),
        StandardCopyOption.REPLACE_EXISTING
    )
    // could return `path`
}

def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {

  val fullpath = "gs://some_bucket/output/internal_dashboard"
  val targetfilename = "internal_dashboad.csv"

  df.coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .mode("append") // "overwrite" "append"
    .save(fullpath)

  val existingfilename = recursiveListFiles(new File(fullpath), "part-00000-.*")
  moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}

尝试2:

import java.io.File

def getListOfFiles(dir: File, extensions: List[String]): List[File] = {
    dir.listFiles.filter(_.isFile).toList.filter { file =>
        extensions.exists(file.getName.startsWith(_))
    }
}

def moveRenameFile(source: String, destination: String): Unit = {
    val path = Files.move(
        Paths.get(source),
        Paths.get(destination),
        StandardCopyOption.REPLACE_EXISTING
    )
    // could return `path`
}

def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {

  val fullpath = "gs://some_bucket/output/internal_dashboard"
  val targetfilename = "internal_dashboad.csv"

  df.coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .mode("append") // "overwrite" "append"
    .save(fullpath)

  val suffixList = List("part-00000")
  val existingfilename = getListOfFiles(new File(fullpath), suffixList )
  moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}

尝试3:

def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {

  val fullpath = "gs://some_bucket/output/internal_dashboard"
  val targetfilename = "internal_dashboad.csv"
  val pathandfile = fullpath + "/" + targefilename

  df.coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .mode("append") // "overwrite" "append"
    .save(pathandfile )

dbutils.fs.ls(fullpath).filter(file=>file.name.endsWith("csv")).foreach(f => dbutils.fs.rm(f.path,true))
dbutils.fs.mv(dbutils.fs.ls(pathandfile).filter(file=>file.name.startsWith("part-00000"))(0).path,pathandfile ")
dbutils.fs.rm(pathandfile,true)
}

我需要scala或其他方面的帮助,将“part-00000”文件重命名为“internal\u dashboard.csv”并覆盖旧版本。
对于那些没有使用过数据融合的人,我可以使用的工具有:
Spark槽:

scala spark程序(可以在sink之前或之后):

Description
Executes user-provided Spark code in Scala.

Use Case
This plugin can be used when you want arbitrary Spark code.

Properties
mainClass: The fully qualified class name for the Spark application. It must either be an object that has a main method define inside, with the method signature as def main(args: Array[String]): Unit; or it is a class that extends from the CDAP co.cask.cdap.api.spark.SparkMain trait that implements the run method, with the method signature as def run(implicit sec: SparkExecutionContext): Unit

pyspark程序(可以在sink之前或之后出现):

Description
Executes user-provided Spark code in Python.

Use Case
This plugin can be used when you want to run arbitrary Spark code.

编辑:

(2020年11月2日)我刚刚了解到,还有一些googlecloud函数可以用python(或java)编写,并在它所在的bucket中发生更改时触发。如果有人知道如何使这样一个功能,可以重命名和覆盖'第00000部分'文件时触发,请让我知道。如果其他都失败了,我就试试看。

m0rkklqb

m0rkklqb1#

避免在aws s3上重命名对象。没有这种东西,它所做的只是“剪切粘贴”=>非常昂贵的操作。
您可以尝试:

import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).parquet(outputBasePath)

如果您坚持使用“重命名”,请使用hadoop库,而不是java:

import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration

val srcPath = new Path("source/...")
val destPath = new Path("dest/...")

srcPath.getFileSystem(new Configuration()).rename(srcPath, destPath)

注意:使用awss3时,两个路径必须在同一个bucket中(它们具有不同的文件系统对象,在使用rename(…)时适用)。

相关问题