sqlcontext.read…load()和sqlcontext.write…save()代码在spark cluster上运行在哪里?

ubbxdtey  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(361)

我正在使用sparkDataframeapi从nfs共享加载/读取文件,然后将该文件的数据保存/写入hdfs。
我有一个三节点的spark集群,有一个主节点和两个工作节点。我的spark集群使用yarn作为集群管理器,因此两个工作节点是yarn nodemanager节点,主节点是yarn resourcemanager节点。
我有一个远程位置,比如/data/files,它安装到所有三个yarn/spark节点上,因为它是[/data/files],所有csv文件[不止一个]都存在,我想从中读取并最终写入hdfs。
我正在主节点上运行以下代码

import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext

object TestMoreThan1CSV2DF {
  private val source: String = "file:///data/files/"
  private val destination = "hdfs://<myHostIP>:8020/raw/"
  private val fileFormat : String = "com.databricks.spark.csv"

  def main(args:Array[String]):Unit={
    val conf = new SparkConf().setAppName("TestMoreThan1CSV2DF").setMaster("local")
    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val fileArray: Array[File] = new java.io.File(source).listFiles.filter(_.getName.endsWith(".csv"))

    for(file<-fileArray){
//  reading csv file from shared location and taking whole data in a dataframe
    var df = loadCSV2DF(sqlContext, fileFormat, "true", "true", file.getName)

//      variable for holding destination location : HDFS Location
    var finalDestination: String = destination+file.getName

//  saving data into HDFS
    writeDF2HDFS(df,fileFormat,"true",finalDestination) /// saved using default number of partition = 1
    }
  }

 def loadCSV2DF(sqlContext : SQLContext, fileFormat: String, header : String, inferSchema: String, source: String) : DataFrame = {
   try{
       sqlContext.read.format(fileFormat)
                       .option("header", header) // Use first line of all files as header
                       .option("inferSchema", inferSchema) // Automatically infer data types
                       .load(source)
   }
   catch{
     case ex: OnboardingException => {
            throw ex;
        }
   }
 }

 def writeDF2HDFS(df: DataFrame, fileFormat: String, header: String, destination: String, partitions: Integer = 1){
   try{
       df.repartition(partitions).write.format(fileFormat).option("header",header).save(destination)
   }
   catch{
     Case ez : OnboardingException => {
            throw ez;
        }
   }
 }
}

这段代码读取共享位置/data/files/中存在的所有csv文件,并将每个文件写入hdfs。例如:/data/files/f1.csv将作为/raw/f1.csv/part-xx文件加载到hdfs中
运行此代码时,我看不出:
1) 整个代码在哪里运行?它在司机身上运行吗?或者同时使用两个工人?
2) load()和save()api是否在工作节点上运行,是否并行工作?如果是的话,那么两个工作人员如何跟踪它读或写的部分呢?
3) 到目前为止,我正在“for”循环中按顺序读取每个文件,并按顺序处理其中的每个文件,是否可以使其成为一个多线程应用程序,其中每个文件分配给一个线程,以便并行执行端到端读写。执行此操作时,磁盘io是否有任何限制?
如有任何快速回复/参考/提示,将不胜感激。
你好,布比什

xnifntxz

xnifntxz1#

很好的实验!!。
1) 你的代码总是在worker上运行。司机程序只是为了管理工人。
2) yes load()和save()api在工作节点上运行。它们按顺序工作。
3) 使用多线程应用程序:我还没有尝试。祝你好运“去试试吧!!”。但你为什么要把自己置于一个复杂的境地呢!!。spark知道如何处理这种情况。

raogr8fs

raogr8fs2#

从我的查询的另一个线程复制了非常好的解释:区分apachespark中的驱动程序代码和工作代码
在这里也复制它的某些部分:在转换创建的闭包中发生的所有事情都发生在一个worker上。这意味着,如果在map(…)、filter(…)、mappartitions(…)、groupby*(…)内部传递了某些内容,则在worker上执行aggregateby*(…)。它包括从持久存储或远程源读取数据。
像count、reduce(…)、fold(…)这样的操作通常在驱动程序和工作程序上执行。重型起重作业由工人并行进行,一些最后的步骤,如减少工人的输出,在司机上依次进行。
其他的一切,比如触发一个动作或者转换,都发生在驱动程序上。尤其是指需要访问sparkcontext的每个操作。
就我的查询而言:1)是的,main()方法的一部分在驱动程序上运行,但转换在驱动程序上发生
2) load()和save()在worker上运行,因为我们可以看到loading会创建dataframe[存储在分区中的内存中],save会在hdfs中创建部分x文件,这表明worker正在这样做
3) 仍然在努力做到这一点,一旦做到了就会回答这个问题。
谢谢

相关问题