使用apachespark自动运行任务

bttbmeg0  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(269)

我是apachespark的新手,我尝试使用spark运行一个测试应用程序。我面临的问题是,当我使用要处理的数据集合创建rdd时,它会被创建,但它不会开始处理它,除非调用rdd类中的.collect方法。这样我就必须等待spark处理rdd。有没有什么方法可以让spark在我形成rdd后立即自动处理收集,然后我可以调用.collect方法随时获取处理后的数据,而不必等待spark?
此外,有没有什么方法可以使用spark将处理后的数据放入数据库而不是返回给我?
我使用的代码如下:

object appMain extends App {
    val spark = new SparkContext("local", "SparkTest")

    val list = List(1,2,3,4,5)

    // i want this rdd to be processed as soon as it is created
    val rdd = spark.parallelize(list.toSeq, 1).map{ i =>
    i%2 // checking if the number is even or odd
    }

    // some more functionality here

    // the job above starts when the line below is executed
    val result = rdd.collect

    result.foreach(println)
}
ki0zmccv

ki0zmccv1#

马萨格已经详细解释了事情。您可以在创建rdd之后应用count。
rdd.count()
另一种方法是打印几行rdd,就像
rdd.take(10)foreach(打印)

h7wcgrx3

h7wcgrx32#

spark使用了一个惰性的评估模型,在这个模型中,只有在rdd上调用了一个“action”之后,才会应用转换操作。该模型适用于大数据集的批处理操作。可以使用rdd.cache()来“缓存”计算的某些部分,但这并不强制计算,只表示一旦rdd可用,就应该缓存它。
评论的进一步澄清表明,使用流式模型(传入数据以“微批量”处理)可能更好地服务于op
这是一个“紧急事件计数”流式作业的示例(未测试,仅用于说明目的)(基于网络wordcountexample)

object UrgentEventCount {
  def main(args: Array[String]) {

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("UrgentEventCount").setMaster(SPARK_MASTER)

    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val dbConnection = db.connect(dbHost, dbPort)
    val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
    //we assume events are tab separated
    val events = lines.flatMap(_.split("\t"))
    val urgentEventCount = events.filter(_.contains("URGENT"))
    dbConnection.execute("Insert into UrgentEvents ...)
    ssc.start()
}

如您所见,如果您需要连接到数据库,您所需要做的就是提供执行db交互所需的驱动程序和代码。确保在jobjar文件中包含驱动程序的依赖项。

相关问题