spark在传递参数时提交filenotfoundexception

i2byvkas  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(468)

我正在用spark submit运行一个fat jar文件(用sbt程序集生成)。这是scala文件的主要部分:

package antarctic

import antarctic.comparison.TablesComparison
import antarctic.utils.Utils.withSpark
import antarctic.inputs.Inputs
import antarctic.outputs.{ValOutput, ComparisonOutput}
import antarctic.parser.Parser
import antarctic.profiling.Profiling
import com.amazon.deequ.VerificationSuite
import org.apache.spark.sql.SparkSession

object DataQuality  {

  def main(args: Array[String]): Unit = {

    val filename = args(0)
    val parsedConfigs = new Parser(filename)
    val processType = args(1)
    val processID = args(2)
    val confFilePath = args(3)
      withSpark { spark =>

        //Se ejecuta el proceso definido en args(0).
        //En ambos se ejecuta Data Profiling.
        if (processType == "comparison"){
          runProfiling(parsedConfigs, spark, processID, confFilePath, filename)
          runTablesComparison(parsedConfigs, spark, processID, confFilePath)
        } else if(processType == "validations"){
          runProfiling(parsedConfigs, spark, processID, confFilePath, filename)
          runValidations(parsedConfigs, spark, processID, confFilePath)
        }
      }
    }

  def runValidations(parsedConfigs: Parser, spark: SparkSession, processID: String, confFilePath: String): Unit = {

    val inputParams = parsedConfigs.input(parsedConfigs.getValidationInputId)

    if (inputParams.url != "" &&  inputParams.driver != ""
      && inputParams.query != "" && inputParams.user != "" && inputParams.fetchSize > 0) {

      val inputData = new Inputs(spark).readJDBC(inputParams)
      val parsedJSON = parsedConfigs.validations(processID, inputData)

      val verificationResult = VerificationSuite()
        .onData(inputData)
        .addChecks(parsedJSON.checks)
        .run()
      ValOutput.store(spark, verificationResult, confFilePath)
      ValOutput.print(verificationResult)

    }
  }

  def runProfiling(parsedConfigs: Parser, spark: SparkSession, processID: String, confFilePath: String, filename: String): Unit = {
    val inputsList = parsedConfigs.inputsList

    inputsList.foreach(inputID => {
      new Profiling(filename, inputID, processID, spark, confFilePath).exportToDB()}
    )
  }

  def runTablesComparison(parsedConfigs: Parser, spark: SparkSession, processID: String, confFilePath: String): Unit = {
    val parsedConfig = parsedConfigs.tablesComparison()

    val comparisonType = parsedConfig._1
    val table1Conf = parsedConfig._2
    val table2Conf = parsedConfig._3
    val table1InputId = table1Conf._1
    val table2InputId = table2Conf._1
    val table1Column = table1Conf._2
    val table2Column = table1Conf._2

    if (table1InputId != "" && table2InputId != "" && table1Column != "" && table2Column != "") {
      val inputConfig1 = parsedConfigs.input(table1InputId)
      val inputConfig2 = parsedConfigs.input(table2InputId)

      val input = new Inputs(spark)
      val tablesComparison = new TablesComparison(
        (table1InputId, table1Column, input.readJDBC(inputConfig1)),
        (table2InputId, table2Column, input.readJDBC(inputConfig2)),
        spark: SparkSession
      )
      val comparisonResults = tablesComparison.run(comparisonType)
      val keys = comparisonResults.keys.toSeq
      if (keys.contains("areEquals")){
        ComparisonOutput.storeAreEquals(comparisonResults("areEquals"), confFilePath)
      }
      if (keys.contains("countDistinct")){
        ComparisonOutput.storeCountDistinct(comparisonResults("countDistinct"), confFilePath)
      }
      if (keys.contains("count")){
        ComparisonOutput.storeCount(comparisonResults("count"), confFilePath)
      }
    }

  }
}

这是我运行的命令:

spark-submit --class antarctic.DataQuality --master local[*] --deploy-mode client --jars "path\to\jar\DataQuality.jar" "path\to\json\mongoDocs.json" "validations" "process-id-1" "path\to\conf\application.conf"

这是我得到的轨迹:

Exception in thread "main" java.io.FileNotFoundException: validations (El sistema no puede encontrar el archivo especificado)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at scala.io.Source$.fromFile(Source.scala:94)
        at scala.io.Source$.fromFile(Source.scala:79)
        at scala.io.Source$.fromFile(Source.scala:57)
        at antarctic.parser.Parser.fileToText(Parser.scala:23)
        at antarctic.parser.Parser.<init>(Parser.scala:13)
        at antarctic.DataQuality$.main(DataQuality.scala:22)
        at antarctic.DataQuality.main(DataQuality.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
20/07/02 11:23:00 INFO ShutdownHookManager: Shutdown hook called
20/07/02 11:23:00 INFO ShutdownHookManager: Deleting directory C:\Users\AKAINIX

分析\appdata\local\temp\spark-7da12975-bd26-4728-a1e0-941252d9ab86
文件的路径很好,我认为这是scala接受参数的方式。

eblbsuwk

eblbsuwk1#

当我从spark submit中删除--jars参数,scala读取参数时,它就工作了。

spark-submit --class antarctic.DataQuality --master local[*] --deploy-mode client "path\to\jar\DataQuality.jar" "path\to\json\mongoDocs.json" "validations" "process-id-1" "path\to\conf\application.conf"

相关问题