scala—在生产中对spark行执行.getas操作时出现异常在当地运作良好

ie3xauqp  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(269)

我有一个通用代码,给定一组键,一个dataframe将在dataframe中找到该键集的副本
不起作用的代码:

case class DuplicateRecord(
    datasetName: String,
    duplicateKeys: String,
    duplicateCount: Long
  )

def findDuplicatesInDF(
    spark: SparkSession
    inputName: String,
    inputDataFrame: DataFrame,
    groupColumns: Seq[String]): Dataset[DuplicateRecord] = {

    import spark.implicits._

    val keys = groupColumns.map(x => col(x))
    val idToCounts = inputDataFrame
      .groupBy(keys: _*)
      .agg(count(keys(0)).as("duplicateKeyCount"))

    idToCounts
      .filter(col("duplicateKeyCount") > 1)
      .map { idToCount =>
        DuplicateRecord(
          inputName,
          groupColumns.map(x => idToCount.getAs(x).toString).mkString(","),
          idToCount.getAs("duplicateKeyCount").toString.toLong)
      }
  }

上述代码在本地运行良好。然而,它在生产中失败了

Error initializing SparkContext.
org.apache.spark.SparkException: A master URL must be set in your configuration
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:375)

有效的代码:

case class DuplicateRecord(
    datasetName: String,
    duplicateKeys: String,
    duplicateCount: Long
  )

case class IdToCounts(
    mergedKey: String,
    duplicateKeyCount: Long
  )

def findDuplicatesInDF(
    spark: SparkSession,
    inputName: String,
    inputDataFrame: DataFrame,
    groupColumns: Seq[String]): Dataset[DuplicateRecord] = {

    import spark.implicits._

    val keys = groupColumns.map(x => col(x))
    val idToCounts = inputDataFrame
      .withColumn("mergedKey", concat_ws(",", keys: _*))
      .groupBy(col("mergedKey"))
      .agg(count(col("mergedKey")).as("duplicateKeyCount"))
      .as[IdToCounts]

    idToCounts
      .filter(idToCount => idToCount.duplicateKeyCount > 1)
      .map { idToCount =>
        DuplicateRecord(inputName, idToCount.mergedKey, idToCount.duplicateKeyCount)
      }
  }

我知道这与spark以本地模式在单个jvm示例上运行有关。但是由于有多个执行者,并且prod中的数据被分区,导致了一种不确定的行为,spark无法理解从何处提取数据来完成操作。但是,我想了解确切的问题,在现有的与此相关的堆栈溢出问题中还没有得到令人信服的答案。任何对此的洞察都会非常有用!谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题