我已经遵循Bitnami Apache Spark指南,能够使用Docker容器启动一个Spark集群,其中包含一个master和几个worker,并且运行良好。现在,我想在这些容器上运行Scala脚本。我有一个道路网络图,我在上面运行Pregel算法。
我在设置容器方面没有问题,但我在如何在容器上部署应用程序方面遇到了困难。图是分区的,我的想法是,当运行应用程序时,不同的分区将在容器之间划分。
我已经克隆了Bitnami Apache Spark repo并运行IntelliJ。
在哪里定义如何部署应用程序?在一个指南中,SparkSession是在定义SPARK_MASTER_URL和SPARK_DRIVER_HOST的地方设置的,但是当我尝试这种方法时,我得到以下错误“Exception in thread“main”java.lang.NullPointerException”,它指向getSparkSession函数中的“SparkSession.builder.config(conf = conf).getOrCreate()”行。
我附加了getSparkSesson和主脚本中的一个片段。
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import scala.util.Properties
object RepeatedPregelUtils {
def getSparkSessions(appName: String): SparkSession = {
val conf = new SparkConf
conf.set("spark.master", Properties.envOrElse("SPARK_MASTER_URL", "spark://spark-master:7077"))
conf.set("spark.driver.host", Properties.envOrElse("SPARK_DRIVER_HOST", "local[*]"))
conf.set("spark.submit.deployMode", "client")
conf.set("spark.driver.bindAddress", "0.0.0.0")
conf.set("spark.app.name", appName)
SparkSession.builder.config(conf = conf).getOrCreate()
}
}
下面是我使用getSparkSession函数的地方:
def loadNetwork(): Graph[String,Double] = {
val spark: SparkSession = RepeatedPregelUtils.getSparkSessions("RepeatedRun")
var csvFile = spark.read.csv("src/main/scala/NewUpdatedSwedenNetwork.csv")
for (i <- 1 until csvFile.columns.length) {
csvFile = csvFile.withColumnRenamed(csvFile.columns(i), csvFile.head()(i).toString)
}
csvFile = csvFile.withColumnRenamed(csvFile.columns(0), "null")
val first_row = csvFile.first()
csvFile = csvFile.filter(row => row != first_row)
csvFile = csvFile.filter(col("from_y") < 55.7 && col("from_y") > 55.379 && col("from_x") > 11.975 && col("from_x") < 14.46)
val edges: RDD[Edge[Double]] = csvFile.repartition(2)
.select("from_id", "to_id", "mean_time").rdd.map(line => Edge(line.getAs("from_id").toString.toLong, line.getAs("to_id").toString.toLong, line.getAs("mean_time").toString.toFloat))
val graph = Graph.fromEdges(edges, "defaultname")
return graph
}
我对这一切都很陌生,所以我可能错过了一些明显的东西。
1条答案
按热度按时间lokaqttq1#
我想明白了。我在代码中做了很多修改,最后归结为两点:
1.将SparkSession中的master-url设置为spark://localhost:7077对我来说很有用
1.在IDE和Docker容器中设置相同版本的scala。显然,经过大量的谷歌搜索,BitnamiDocker使用Spark版本2.12。
这些变化对我很有效!