spark保存作业需要很长时间

mm9b1k5b  于 2021-05-27  发布在  Hadoop
关注(0)|答案(0)|浏览(461)

我正在尝试将Dataframe保存到hdfs位置。但我的储蓄要花很长时间。在此之前的操作是使用sparksql连接两个表。需要知道为什么保存有四个阶段,以及如何提高性能。我已经附上了工作的阶段清单在这里。
我还附上了我的代码片段。
Spark代码:
此函数从主类获取数据,models变量从xml获取表信息数据。最初它获取源表的数据,然后尝试从其他联接表检索数据。

def sourceGen(spark: SparkSession,
                minBatchLdNbr: Int,
                maxBatchLdNbr: Int,
                batchLdNbrList: String,
                models: (GModel, TModel, NModel)): Unit = {
    val configJson = models._3
    val gblJson = models._1
    println("Source Loading started")
    val sourceColumns = configJson.transformationJob.sourceDetails.sourceSchema
    val query = new StringBuilder("select ")
    sourceColumns.map { SrcColumn =>
      if (SrcColumn.isKey == "nak") {
        query.append(
          "cast(" + SrcColumn.columnExpression + " as " + SrcColumn.columnDataType + ") as " + SrcColumn.columnName + ",")
      }
    }
    var tableQuery: String =
      if (!configJson.transformationJob.sourceDetails.sourceTableSchemaName.isEmpty) {
        if (!batchLdNbrList.trim.isEmpty)
          query.dropRight(1) + " from " + configJson.transformationJob.sourceDetails.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr + "or batch_ld_nbr in ( " + batchLdNbrList + " )"
        else
          query.dropRight(1) + " from " + configJson.transformationJob.sourceDetails.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr
      } else {
        if (!batchLdNbrList.trim.isEmpty)
          query.dropRight(1) + " from " + gblJson.gParams.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr + "or batch_ld_nbr in ( " + batchLdNbrList + " )"
        else
          query.dropRight(1) + " from " + gblJson.gParams.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr
      }
    if (minBatchLdNbr == 0 && maxBatchLdNbr == 0) {
      tableQuery = tableQuery.split("where")(0)
    }
    println("Time"+LocalDateTime.now());
    val tableQueryDf: DataFrame = spark.sql(tableQuery)
    println("tableQueryDf"+tableQueryDf);
    println("Time"+LocalDateTime.now());
    println("Source Loading ended")
    println("Parent Loading Started")
    val parentColumns = configJson.transformationJob.sourceDetails.parentTables
    val parentSourceJoinDF: DataFrame = if (!parentColumns.isEmpty) {
      parentChildJoin(tableQueryDf,
                      parentColumns,
                      spark,
                      gblJson.gParams.pSchemaName)
    } else {
      tableQueryDf
    }
    println("tableQueryDf"+tableQueryDf);
    println("Parent Loading ended")
    println("Key Column Generation Started")
    println("Time"+LocalDateTime.now());
    val arrOfCustomExprs = sourceColumns
      .filter(_.isKey.toString != "nak")
      .map(
        f =>
          functions
            .expr(f.columnExpression)
            .as(f.columnName)
            .cast(f.columnDataType))
    val colWithExpr = parentSourceJoinDF.columns.map(f =>
      parentSourceJoinDF.col(f)) ++ arrOfCustomExprs
    val finalQueryDF = parentSourceJoinDF.select(colWithExpr: _*)
    println("finalQueryDF"+finalQueryDF);
    println("Time"+LocalDateTime.now());
    keyGenUtils.writeParquetTemp(
      finalQueryDF,
      configJson.transformationJob.globalParams.hdfsInterimPath + configJson.transformationJob.sourceDetails.sourceTableName + "/temp_" + configJson.transformationJob.sourceDetails.sourceTableName
    )
    println("PrintedTime"+LocalDateTime.now());
    println("Key Column Generation Ended")
  }

下面的代码用于从联接表中检索数据。

private def parentChildJoin(tableQueryDf: DataFrame,
                              ptJoin: Array[ParentTables],
                              sparkSession: SparkSession,
                              gParentSchema: String): DataFrame = {
    if (ptJoin.isEmpty) {
      tableQueryDf
    } else {
      val parentJoin = ptJoin.head
      val columns = new StringBuilder("select ")
      for (ptCols <- parentJoin.columns) {
        columns.append(
          ptCols.columnExpression + " as " + ptCols.columnName + ",")
      }
      val statement = columns.dropRight(1)

      if (!parentJoin.pSchemaName.isEmpty) {
        statement.append(
          " from " + parentJoin.pSchemaName + "." + parentJoin.pTableName)
      } else {
        statement.append(" from " + gParentSchema + "." + parentJoin.pTableName)
      }
      println("Time"+LocalDateTime.now());
      println("parentJoin.pTableName"+parentJoin.pTableName);
      val pQueryDF =
        if (parentJoin.pTableName.equalsIgnoreCase("order_summary_si_fact_t")) {
          val ordCalDt = "ord_cal_dt"
          val distinctDates = tableQueryDf
            .selectExpr(ordCalDt)
            .distinct
            .collect
            .map(_.getAs[String](0))
          sparkSession.sql(statement.toString).where(col(ordCalDt).isin(distinctDates: _*)).distinct
        } else {
          sparkSession.sql(statement.toString).distinct
        }
      println("Time"+LocalDateTime.now());
      //val pQueryDF = sparkSession.sql(statement.toString).distinct
      println("statement-"+parentJoin.pTableName+"-"+statement);
      parentChildJoin(
        tableQueryDf.join(pQueryDF,
                          parentJoin.pJoinCondition.map(_.sourceKey).toSeq,
                          parentJoin.joinType),
        ptJoin.tail,
        sparkSession,
        gParentSchema)
    }
  }

这是写入hdfs的函数。

def writeParquetTemp(df: DataFrame, hdfsPath: String): Unit = {
    df.write.format("parquet").option("compression", "none").mode(SaveMode.Overwrite).save(hdfsPath)
  }

spark提交配置:

/usr/hdp/2.6.3.0-235/spark2/bin//spark-submit --master yarn --deploy-mode client --driver-memory 30G --executor-memory 25G --executor-cores 6 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=774857600 --conf spark.kryoserializer.buffer.max.mb=512 --conf spark.dynamicAllocation.maxExecutors=40 --conf spark.eventLog.enabled=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.parquet.binaryAsString=true  --conf spark.sql.broadcastTimeout=36000 --conf spark.sql.shuffle.partitions=500

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题