spark.savetableas无法写入glue目录

ttygqcqt  于 2021-06-25  发布在  Hive
关注(0)|答案(1)|浏览(317)

在scala中,我使用 .saveTableAs ,但glue似乎没有在数据库位置、格式等方面进行适当的自我更新。对于后台,传入的数据集是json中的1.5tb,目标数据格式是parquet;所有Parquet文件都会写入,尽管重命名过程相当缓慢。

val writeMode = "Overwrite"
val destinationFormatType = "parquet"
val s3PathBase = "s3://foo_bucket"
val currentDatabase = "bar"
val replaceTable = true
val jsonColumn = "json"
val partitionBy = Array("year", "month", "day", "hour")

val currentEvent = "fooBar"
val tableLowerCase = glueCatalog.fixTableName(currentEvent.asInstanceOf[String])
val s3Path = s"${s3PathBase}/${tableLowerCase}"
val tablePathInDb = s"${currentDatabase}.${tableLowerCase}"
println(tablePathInDb)

val currentEventDf = spark.read.json(
  dfWithJson
    .filter(col("event") === lit(currentEvent))
    .select(jsonColumn)
    .as[String]
)
// Adds partitions to have input data retain the same paths as the output data, since this is Kinesis
val dfToWrite = s3Reader.addKinesisPartitionsToDataFrameRows(currentEventDf, inputBasePath)

val dfWriter = dfToWrite
  .repartition(partitionBy.map(col): _*)
  .write
  .option("mode", "DROPMALFORMED")
  .mode(writeMode)
  .format(destinationFormatType)
  .option(
    "path",
    s3Path
  )
if (replaceTable) {
  println("\t- .saveAsTable")
  dfWriter
    .partitionBy(partitionBy: _*)
    .saveAsTable(tablePathInDb)
} else {
  println("\t- .insertInto")
  dfWriter.insertInto(tablePathInDb)
}

当数据被写入时,它会正确地显示并通过spark在s3中可读,但是glue不正确地注册hive表:
名称foobar
描述
数据库栏
分类未知
位置s3://foo\u bucket/hive metastore/bar.db/foobar-placeholder
连接
不推荐的否
上次更新时间:周四1月9日16:55:23 gmt-800 2020
输入格式org.apache.hadoop.mapred.sequencefileinputformat
输出格式org.apache.hadoop.hive.ql.io.hivesequencefileoutputformat
serde序列化lib org.apache.hadoop.hive.serde2.lazy.lazysimpleserde
serde参数
模式错误
路径s3://foo\u bucket/foobar
序列化格式1

eni9jsuy

eni9jsuy1#

为了解决将数据视为序列文件的问题,请执行以下操作:

val destinationFormatType = "hive"

此外,将以下内容添加到 dfWriter :

.option(
    "fileFormat",
    destinationFormatType
  )

相关问题