在带有docker容器的Spark集群上运行脚本

41zrol4v  于 2023-04-20  发布在  Docker
关注(0)|答案(1)|浏览(111)

我已经遵循Bitnami Apache Spark指南,能够使用Docker容器启动一个Spark集群,其中包含一个master和几个worker,并且运行良好。现在,我想在这些容器上运行Scala脚本。我有一个道路网络图,我在上面运行Pregel算法。
我在设置容器方面没有问题,但我在如何在容器上部署应用程序方面遇到了困难。图是分区的,我的想法是,当运行应用程序时,不同的分区将在容器之间划分。
我已经克隆了Bitnami Apache Spark repo并运行IntelliJ。
在哪里定义如何部署应用程序?在一个指南中,SparkSession是在定义SPARK_MASTER_URL和SPARK_DRIVER_HOST的地方设置的,但是当我尝试这种方法时,我得到以下错误“Exception in thread“main”java.lang.NullPointerException”,它指向getSparkSession函数中的“SparkSession.builder.config(conf = conf).getOrCreate()”行。
我附加了getSparkSesson和主脚本中的一个片段。

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import scala.util.Properties

object RepeatedPregelUtils {

  def getSparkSessions(appName: String): SparkSession = {

    val conf = new SparkConf

    conf.set("spark.master", Properties.envOrElse("SPARK_MASTER_URL", "spark://spark-master:7077"))
    conf.set("spark.driver.host", Properties.envOrElse("SPARK_DRIVER_HOST", "local[*]"))
    conf.set("spark.submit.deployMode", "client")
    conf.set("spark.driver.bindAddress", "0.0.0.0")

    conf.set("spark.app.name", appName)

    SparkSession.builder.config(conf = conf).getOrCreate()
  }
}

下面是我使用getSparkSession函数的地方:

def loadNetwork(): Graph[String,Double] = {

    val spark: SparkSession = RepeatedPregelUtils.getSparkSessions("RepeatedRun")
    var csvFile = spark.read.csv("src/main/scala/NewUpdatedSwedenNetwork.csv")
    for (i <- 1 until csvFile.columns.length) {
      csvFile = csvFile.withColumnRenamed(csvFile.columns(i), csvFile.head()(i).toString)
    }
    csvFile = csvFile.withColumnRenamed(csvFile.columns(0), "null")
    val first_row = csvFile.first()
    csvFile = csvFile.filter(row => row != first_row)

    csvFile = csvFile.filter(col("from_y") < 55.7 && col("from_y") > 55.379 && col("from_x") > 11.975 && col("from_x") < 14.46)
    val edges: RDD[Edge[Double]] = csvFile.repartition(2)
      .select("from_id", "to_id", "mean_time").rdd.map(line => Edge(line.getAs("from_id").toString.toLong, line.getAs("to_id").toString.toLong, line.getAs("mean_time").toString.toFloat))
    val graph = Graph.fromEdges(edges, "defaultname")
    return graph
  }

我对这一切都很陌生,所以我可能错过了一些明显的东西。

lokaqttq

lokaqttq1#

我想明白了。我在代码中做了很多修改,最后归结为两点:
1.将SparkSession中的master-url设置为spark://localhost:7077对我来说很有用
1.在IDE和Docker容器中设置相同版本的scala。显然,经过大量的谷歌搜索,BitnamiDocker使用Spark版本2.12。
这些变化对我很有效!

相关问题