我正在用spark submit运行一个fat jar文件(用sbt程序集生成)。这是scala文件的主要部分:
package antarctic
import antarctic.comparison.TablesComparison
import antarctic.utils.Utils.withSpark
import antarctic.inputs.Inputs
import antarctic.outputs.{ValOutput, ComparisonOutput}
import antarctic.parser.Parser
import antarctic.profiling.Profiling
import com.amazon.deequ.VerificationSuite
import org.apache.spark.sql.SparkSession
object DataQuality {
def main(args: Array[String]): Unit = {
val filename = args(0)
val parsedConfigs = new Parser(filename)
val processType = args(1)
val processID = args(2)
val confFilePath = args(3)
withSpark { spark =>
//Se ejecuta el proceso definido en args(0).
//En ambos se ejecuta Data Profiling.
if (processType == "comparison"){
runProfiling(parsedConfigs, spark, processID, confFilePath, filename)
runTablesComparison(parsedConfigs, spark, processID, confFilePath)
} else if(processType == "validations"){
runProfiling(parsedConfigs, spark, processID, confFilePath, filename)
runValidations(parsedConfigs, spark, processID, confFilePath)
}
}
}
def runValidations(parsedConfigs: Parser, spark: SparkSession, processID: String, confFilePath: String): Unit = {
val inputParams = parsedConfigs.input(parsedConfigs.getValidationInputId)
if (inputParams.url != "" && inputParams.driver != ""
&& inputParams.query != "" && inputParams.user != "" && inputParams.fetchSize > 0) {
val inputData = new Inputs(spark).readJDBC(inputParams)
val parsedJSON = parsedConfigs.validations(processID, inputData)
val verificationResult = VerificationSuite()
.onData(inputData)
.addChecks(parsedJSON.checks)
.run()
ValOutput.store(spark, verificationResult, confFilePath)
ValOutput.print(verificationResult)
}
}
def runProfiling(parsedConfigs: Parser, spark: SparkSession, processID: String, confFilePath: String, filename: String): Unit = {
val inputsList = parsedConfigs.inputsList
inputsList.foreach(inputID => {
new Profiling(filename, inputID, processID, spark, confFilePath).exportToDB()}
)
}
def runTablesComparison(parsedConfigs: Parser, spark: SparkSession, processID: String, confFilePath: String): Unit = {
val parsedConfig = parsedConfigs.tablesComparison()
val comparisonType = parsedConfig._1
val table1Conf = parsedConfig._2
val table2Conf = parsedConfig._3
val table1InputId = table1Conf._1
val table2InputId = table2Conf._1
val table1Column = table1Conf._2
val table2Column = table1Conf._2
if (table1InputId != "" && table2InputId != "" && table1Column != "" && table2Column != "") {
val inputConfig1 = parsedConfigs.input(table1InputId)
val inputConfig2 = parsedConfigs.input(table2InputId)
val input = new Inputs(spark)
val tablesComparison = new TablesComparison(
(table1InputId, table1Column, input.readJDBC(inputConfig1)),
(table2InputId, table2Column, input.readJDBC(inputConfig2)),
spark: SparkSession
)
val comparisonResults = tablesComparison.run(comparisonType)
val keys = comparisonResults.keys.toSeq
if (keys.contains("areEquals")){
ComparisonOutput.storeAreEquals(comparisonResults("areEquals"), confFilePath)
}
if (keys.contains("countDistinct")){
ComparisonOutput.storeCountDistinct(comparisonResults("countDistinct"), confFilePath)
}
if (keys.contains("count")){
ComparisonOutput.storeCount(comparisonResults("count"), confFilePath)
}
}
}
}
这是我运行的命令:
spark-submit --class antarctic.DataQuality --master local[*] --deploy-mode client --jars "path\to\jar\DataQuality.jar" "path\to\json\mongoDocs.json" "validations" "process-id-1" "path\to\conf\application.conf"
这是我得到的轨迹:
Exception in thread "main" java.io.FileNotFoundException: validations (El sistema no puede encontrar el archivo especificado)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at scala.io.Source$.fromFile(Source.scala:94)
at scala.io.Source$.fromFile(Source.scala:79)
at scala.io.Source$.fromFile(Source.scala:57)
at antarctic.parser.Parser.fileToText(Parser.scala:23)
at antarctic.parser.Parser.<init>(Parser.scala:13)
at antarctic.DataQuality$.main(DataQuality.scala:22)
at antarctic.DataQuality.main(DataQuality.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
20/07/02 11:23:00 INFO ShutdownHookManager: Shutdown hook called
20/07/02 11:23:00 INFO ShutdownHookManager: Deleting directory C:\Users\AKAINIX
分析\appdata\local\temp\spark-7da12975-bd26-4728-a1e0-941252d9ab86
文件的路径很好,我认为这是scala接受参数的方式。
1条答案
按热度按时间eblbsuwk1#
当我从spark submit中删除--jars参数,scala读取参数时,它就工作了。