我正在尝试将Dataframe保存到hdfs位置。但我的储蓄要花很长时间。在此之前的操作是使用sparksql连接两个表。需要知道为什么保存有四个阶段,以及如何提高性能。我已经附上了工作的阶段清单在这里。
我还附上了我的代码片段。
Spark代码:
此函数从主类获取数据,models变量从xml获取表信息数据。最初它获取源表的数据,然后尝试从其他联接表检索数据。
def sourceGen(spark: SparkSession,
minBatchLdNbr: Int,
maxBatchLdNbr: Int,
batchLdNbrList: String,
models: (GModel, TModel, NModel)): Unit = {
val configJson = models._3
val gblJson = models._1
println("Source Loading started")
val sourceColumns = configJson.transformationJob.sourceDetails.sourceSchema
val query = new StringBuilder("select ")
sourceColumns.map { SrcColumn =>
if (SrcColumn.isKey == "nak") {
query.append(
"cast(" + SrcColumn.columnExpression + " as " + SrcColumn.columnDataType + ") as " + SrcColumn.columnName + ",")
}
}
var tableQuery: String =
if (!configJson.transformationJob.sourceDetails.sourceTableSchemaName.isEmpty) {
if (!batchLdNbrList.trim.isEmpty)
query.dropRight(1) + " from " + configJson.transformationJob.sourceDetails.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr + "or batch_ld_nbr in ( " + batchLdNbrList + " )"
else
query.dropRight(1) + " from " + configJson.transformationJob.sourceDetails.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr
} else {
if (!batchLdNbrList.trim.isEmpty)
query.dropRight(1) + " from " + gblJson.gParams.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr + "or batch_ld_nbr in ( " + batchLdNbrList + " )"
else
query.dropRight(1) + " from " + gblJson.gParams.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr
}
if (minBatchLdNbr == 0 && maxBatchLdNbr == 0) {
tableQuery = tableQuery.split("where")(0)
}
println("Time"+LocalDateTime.now());
val tableQueryDf: DataFrame = spark.sql(tableQuery)
println("tableQueryDf"+tableQueryDf);
println("Time"+LocalDateTime.now());
println("Source Loading ended")
println("Parent Loading Started")
val parentColumns = configJson.transformationJob.sourceDetails.parentTables
val parentSourceJoinDF: DataFrame = if (!parentColumns.isEmpty) {
parentChildJoin(tableQueryDf,
parentColumns,
spark,
gblJson.gParams.pSchemaName)
} else {
tableQueryDf
}
println("tableQueryDf"+tableQueryDf);
println("Parent Loading ended")
println("Key Column Generation Started")
println("Time"+LocalDateTime.now());
val arrOfCustomExprs = sourceColumns
.filter(_.isKey.toString != "nak")
.map(
f =>
functions
.expr(f.columnExpression)
.as(f.columnName)
.cast(f.columnDataType))
val colWithExpr = parentSourceJoinDF.columns.map(f =>
parentSourceJoinDF.col(f)) ++ arrOfCustomExprs
val finalQueryDF = parentSourceJoinDF.select(colWithExpr: _*)
println("finalQueryDF"+finalQueryDF);
println("Time"+LocalDateTime.now());
keyGenUtils.writeParquetTemp(
finalQueryDF,
configJson.transformationJob.globalParams.hdfsInterimPath + configJson.transformationJob.sourceDetails.sourceTableName + "/temp_" + configJson.transformationJob.sourceDetails.sourceTableName
)
println("PrintedTime"+LocalDateTime.now());
println("Key Column Generation Ended")
}
下面的代码用于从联接表中检索数据。
private def parentChildJoin(tableQueryDf: DataFrame,
ptJoin: Array[ParentTables],
sparkSession: SparkSession,
gParentSchema: String): DataFrame = {
if (ptJoin.isEmpty) {
tableQueryDf
} else {
val parentJoin = ptJoin.head
val columns = new StringBuilder("select ")
for (ptCols <- parentJoin.columns) {
columns.append(
ptCols.columnExpression + " as " + ptCols.columnName + ",")
}
val statement = columns.dropRight(1)
if (!parentJoin.pSchemaName.isEmpty) {
statement.append(
" from " + parentJoin.pSchemaName + "." + parentJoin.pTableName)
} else {
statement.append(" from " + gParentSchema + "." + parentJoin.pTableName)
}
println("Time"+LocalDateTime.now());
println("parentJoin.pTableName"+parentJoin.pTableName);
val pQueryDF =
if (parentJoin.pTableName.equalsIgnoreCase("order_summary_si_fact_t")) {
val ordCalDt = "ord_cal_dt"
val distinctDates = tableQueryDf
.selectExpr(ordCalDt)
.distinct
.collect
.map(_.getAs[String](0))
sparkSession.sql(statement.toString).where(col(ordCalDt).isin(distinctDates: _*)).distinct
} else {
sparkSession.sql(statement.toString).distinct
}
println("Time"+LocalDateTime.now());
//val pQueryDF = sparkSession.sql(statement.toString).distinct
println("statement-"+parentJoin.pTableName+"-"+statement);
parentChildJoin(
tableQueryDf.join(pQueryDF,
parentJoin.pJoinCondition.map(_.sourceKey).toSeq,
parentJoin.joinType),
ptJoin.tail,
sparkSession,
gParentSchema)
}
}
这是写入hdfs的函数。
def writeParquetTemp(df: DataFrame, hdfsPath: String): Unit = {
df.write.format("parquet").option("compression", "none").mode(SaveMode.Overwrite).save(hdfsPath)
}
spark提交配置:
/usr/hdp/2.6.3.0-235/spark2/bin//spark-submit --master yarn --deploy-mode client --driver-memory 30G --executor-memory 25G --executor-cores 6 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=774857600 --conf spark.kryoserializer.buffer.max.mb=512 --conf spark.dynamicAllocation.maxExecutors=40 --conf spark.eventLog.enabled=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.parquet.binaryAsString=true --conf spark.sql.broadcastTimeout=36000 --conf spark.sql.shuffle.partitions=500
暂无答案!
目前还没有任何答案,快来回答吧!