我有一个Spark的工作,运行没有任何问题,在Spark壳。我目前正试图提交这个工作Yarn使用Spark的api。
我正在利用下面的班级来做一个有Spark的工作
import java.util.ResourceBundle;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;
public class SubmitSparkJobToYARNFromJavaCode {
public static void main(String[] arguments) throws Exception {
ResourceBundle bundle = ResourceBundle.getBundle("device_compare");
String accessKey = bundle.getString("accessKey");
String secretKey = bundle.getString("secretKey");
String[] args = new String[] {
// path to my application's JAR file
// required in yarn-cluster mode
"--jar",
"my_s3_path_to_jar",
// name of my application's main class (required)
"--class", "com.abc.SampleIdCount",
// comma separated list of local jars that want
// SparkContext.addJar to work with
// "--addJars", arguments[1]
};
// create a Hadoop Configuration object
Configuration config = new Configuration();
// identify that I will be using Spark as YARN mode
System.setProperty("SPARK_YARN_MODE", "true");
System.setProperty("spark.local.dir", "/tmp");
// create an instance of SparkConf object
SparkConf sparkConf = new SparkConf();
sparkConf.set("fs.s3n.awsAccessKeyId", accessKey);
sparkConf.set("fs.s3n.awsSecretAccessKey", secretKey);
sparkConf.set("spark.local.dir", "/tmp");
// create ClientArguments, which will be passed to Client
ClientArguments cArgs = new ClientArguments(args);
// create an instance of yarn Client client
Client client = new Client(cArgs, config, sparkConf);
// submit Spark job to YARN
client.run();
}
}
这就是我想做的最有活力的工作
package com.abc;
import java.util.ResourceBundle;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class SampleIdCount {
private static String accessKey;
private static String secretKey;
public SampleIdCount() {
ResourceBundle bundle = ResourceBundle.getBundle("device_compare");
accessKey = bundle.getString("accessKey");
secretKey = bundle.getString("secretKey");
}
public static void main(String[] args) {
System.out.println("Started execution");
SampleIdCount sample = new SampleIdCount();
System.setProperty("SPARK_YARN_MODE", "true");
System.setProperty("spark.local.dir", "/tmp");
SparkConf conf = new SparkConf();
{
conf = new SparkConf().setAppName("SampleIdCount").setMaster("yarn-cluster");
}
JavaSparkContext sc = new JavaSparkContext(conf);
sc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", accessKey);
sc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", secretKey);
JavaRDD<String> installedDeviceIdsRDD = sc.emptyRDD();
installedDeviceIdsRDD = sc.textFile("my_s3_input_path");
installedDeviceIdsRDD.saveAsTextFile("my_s3_output_path");
sc.close();
}
}
当我运行java代码时,spark作业正在提交给yarn,但问题是我面临以下错误
Diagnostics: File file:/mnt/tmp/spark-1b86d806-5c8f-4ae6-a486-7b68d46c759a/__spark_libs__8257948728364304288.zip does not exist
java.io.FileNotFoundException: File file:/mnt/tmp/spark-1b86d806-5c8f-4ae6-a486-7b68d46c759a/__spark_libs__8257948728364304288.zip does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:616)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:829)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:606)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:431)
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我认为的问题是/mnt文件夹在从属节点中不可用,因此我尝试通过执行以下操作将spark本地目录更改为/tmp
我在java代码中将系统环境变量设置为“/tmp”
我通过执行export local\u dirs=/tmp来设置系统环境变量
我通过执行export spark\u local\u dirs=/tmp来设置系统环境变量
这些都没有任何效果,我仍然面临着同样的错误。其他链接中的建议对我也没有帮助。我真的被困在这里面了。任何帮助都将不胜感激。提前谢谢。干杯!
暂无答案!
目前还没有任何答案,快来回答吧!