在scala中,我使用 .saveTableAs
,但glue似乎没有在数据库位置、格式等方面进行适当的自我更新。对于后台,传入的数据集是json中的1.5tb,目标数据格式是parquet;所有Parquet文件都会写入,尽管重命名过程相当缓慢。
val writeMode = "Overwrite"
val destinationFormatType = "parquet"
val s3PathBase = "s3://foo_bucket"
val currentDatabase = "bar"
val replaceTable = true
val jsonColumn = "json"
val partitionBy = Array("year", "month", "day", "hour")
val currentEvent = "fooBar"
val tableLowerCase = glueCatalog.fixTableName(currentEvent.asInstanceOf[String])
val s3Path = s"${s3PathBase}/${tableLowerCase}"
val tablePathInDb = s"${currentDatabase}.${tableLowerCase}"
println(tablePathInDb)
val currentEventDf = spark.read.json(
dfWithJson
.filter(col("event") === lit(currentEvent))
.select(jsonColumn)
.as[String]
)
// Adds partitions to have input data retain the same paths as the output data, since this is Kinesis
val dfToWrite = s3Reader.addKinesisPartitionsToDataFrameRows(currentEventDf, inputBasePath)
val dfWriter = dfToWrite
.repartition(partitionBy.map(col): _*)
.write
.option("mode", "DROPMALFORMED")
.mode(writeMode)
.format(destinationFormatType)
.option(
"path",
s3Path
)
if (replaceTable) {
println("\t- .saveAsTable")
dfWriter
.partitionBy(partitionBy: _*)
.saveAsTable(tablePathInDb)
} else {
println("\t- .insertInto")
dfWriter.insertInto(tablePathInDb)
}
当数据被写入时,它会正确地显示并通过spark在s3中可读,但是glue不正确地注册hive表:
名称foobar
描述
数据库栏
分类未知
位置s3://foo\u bucket/hive metastore/bar.db/foobar-placeholder
连接
不推荐的否
上次更新时间:周四1月9日16:55:23 gmt-800 2020
输入格式org.apache.hadoop.mapred.sequencefileinputformat
输出格式org.apache.hadoop.hive.ql.io.hivesequencefileoutputformat
serde序列化lib org.apache.hadoop.hive.serde2.lazy.lazysimpleserde
serde参数
模式错误
路径s3://foo\u bucket/foobar
序列化格式1
1条答案
按热度按时间eni9jsuy1#
为了解决将数据视为序列文件的问题,请执行以下操作:
此外,将以下内容添加到
dfWriter
: