如何从eclipse/intellijide运行简单的spark应用程序?

zaqlnxep  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(386)

为了在实际将任务部署到hadoop之前简化在hadoop上运行的map reduce任务的开发,我使用一个简单的map reducer进行了测试,我编写了:

object mapreduce {
  import scala.collection.JavaConversions._

  val intermediate = new java.util.HashMap[String, java.util.List[Int]]
                                                  //> intermediate  : java.util.HashMap[String,java.util.List[Int]] = {}
  val result = new java.util.ArrayList[Int]       //> result  : java.util.ArrayList[Int] = []

  def emitIntermediate(key: String, value: Int) {
    if (!intermediate.containsKey(key)) {
      intermediate.put(key, new java.util.ArrayList)
    }
    intermediate.get(key).add(value)
  }                                               //> emitIntermediate: (key: String, value: Int)Unit

  def emit(value: Int) {
    println("value is " + value)
    result.add(value)
  }                                               //> emit: (value: Int)Unit

  def execute(data: java.util.List[String], mapper: String => Unit, reducer: (String, java.util.List[Int]) => Unit) {

    for (line <- data) {
      mapper(line)
    }

    for (keyVal <- intermediate) {
      reducer(keyVal._1, intermediate.get(keyVal._1))
    }

    for (item <- result) {
      println(item)
    }
  }                                               //> execute: (data: java.util.List[String], mapper: String => Unit, reducer: (St
                                                  //| ring, java.util.List[Int]) => Unit)Unit

  def mapper(record: String) {
    var jsonAttributes = com.nebhale.jsonpath.JsonPath.read("$", record, classOf[java.util.ArrayList[String]])
    println("jsonAttributes are " + jsonAttributes)
    var key = jsonAttributes.get(0)
    var value = jsonAttributes.get(1)

    println("key is " + key)
    var delims = "[ ]+";
    var words = value.split(delims);
    for (w <- words) {
      emitIntermediate(w, 1)
    }
  }                                               //> mapper: (record: String)Unit

  def reducer(key: String, listOfValues: java.util.List[Int]) = {
    var total = 0
    for (value <- listOfValues) {
      total += value;
    }

    emit(total)
  }                                               //> reducer: (key: String, listOfValues: java.util.List[Int])Unit
  var dataToProcess = new java.util.ArrayList[String]
                                                  //> dataToProcess  : java.util.ArrayList[String] = []
  dataToProcess.add("[\"test1\" , \"test1 here is another test1 test1 \"]")
                                                  //> res0: Boolean = true
  dataToProcess.add("[\"test2\" , \"test2 here is another test2 test1 \"]")
                                                  //> res1: Boolean = true

  execute(dataToProcess, mapper, reducer)         //> jsonAttributes are [test1, test1 here is another test1 test1 ]
                                                  //| key is test1
                                                  //| jsonAttributes are [test2, test2 here is another test2 test1 ]
                                                  //| key is test2
                                                  //| value is 2
                                                  //| value is 2
                                                  //| value is 4
                                                  //| value is 2
                                                  //| value is 2
                                                  //| 2
                                                  //| 2
                                                  //| 4
                                                  //| 2
                                                  //| 2

  for (keyValue <- intermediate) {
      println(keyValue._1 + "->"+keyValue._2.size)//> another->2
                                                  //| is->2
                                                  //| test1->4
                                                  //| here->2
                                                  //| test2->2
   }

}

这允许我在部署到实际的hadoop集群之前,在windows上的eclipseide中运行mapreduce任务。我希望为spark执行类似的操作,或者能够在部署到spark集群之前从eclipse中编写spark代码进行测试。Spark有可能吗?既然spark运行在hadoop之上,这是否意味着我不能在没有安装hadoop的情况下运行spark?换句话说,我可以只用spark库来运行代码吗

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME",
      List("target/scala-2.10/simple-project_2.10-1.0.jar"))
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

取自https://spark.apache.org/docs/0.9.0/quick-start.html#a-scala中的独立应用程序
如果是这样的话,我需要在我的项目中包括哪些spark库?

uqdfh47h

uqdfh47h1#

将以下内容添加到build.sbt libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.1" 确保你的 scalaVersion 已设置(如。 scalaVersion := "2.10.3" )
另外,如果您只是在本地运行程序,可以跳过sparkcontext的最后两个参数,如下所示 val sc = new SparkContext("local", "Simple App") 最后,spark可以在hadoop上运行,但也可以在独立模式下运行。请参见:https://spark.apache.org/docs/0.9.1/spark-standalone.html

相关问题