scala—在spark中从HiveMetaStore(Parquet格式)本地处理10亿条记录永远需要6个小时如何加速?

2o7dmzc5  于 2021-06-25  发布在  Hive
关注(0)|答案(1)|浏览(341)

我负责intellij的一切。应用程序是scala。
scalaversion:=“2.12.10”sparkversion=“2.4.4”
所以我在处理纽约出租车的数据。近11亿条记录,120gb数据。我读取它,删除不必要的数据,清理它,然后以parquet格式以默认压缩方式写入分区(日、月、年)中的hive meta store。到目前为止还不错,它在几分钟(2-3分钟)内工作得相当快。现在我再次从metastore中读取数据并进行一些操作,基本上,我想计算所有来自曼哈顿肯尼迪机场的车程,最后计算几个udf和一个计数。这需要很长时间。
然后我有一个小于1mb的天气数据集。差不多有50列,我用它做了一个新的df,只有7列。必须找到乘车和天气等之间的一些相关性,所以对日期进行连接,然后使用spark.ml相关函数来获得相关矩阵。这也需要时间。
总的来说,所有这些都需要6个小时来运行(我在4-5个小时,我搞砸了一些东西,不得不重新启动)。我等不及要检查它是否准确。
从我阅读和理解Parquet应该比阅读和使用csv快得多,但我发现恰恰相反。所以我觉得我做错了什么。可能是一些配置或设置等?我是一个初学者与Spark和学习它自己。所以如果我犯了一个错误,请原谅我。任何帮助都会大有帮助。
如果我应该张贴任何更新或信息让我知道。我可以编辑和发布它。
谢谢

def analysis() = {
    var parquetDF = SparkObject.spark.read.parquet("spark-warehouse/location")
//      .cache()
    val manhattanTojfkDF = countManhattanToJKF(parquetDF)
    findCorrelation(manhattanTojfkDF)
  }

  def countManhattanToJKF(df:DataFrame):DataFrame = {
    var parquetDF = df
    //  val geojson = scala.io.Source.fromURL(this.getClass.getResource("/nyc-borough-boundaries-polygon.geojson")).mkString
    val geojson = scala.io.Source.fromURL(this.getClass.getResource("/NYC Taxi Zones.geojson")).mkString
    val features = geojson.parseJson.convertTo[FeatureCollection]
    val broadcastFeatures = SparkObject.spark.sparkContext.broadcast(features)
    val lonlatToZoneID = (longitude: Double, latitude: Double) => {
      val feature: Option[Feature] = broadcastFeatures.value.find(f => {
        f.geometry.contains(new Point(longitude, latitude))
      })
      feature.map(f => {
        f("location_id").convertTo[String]
      }).getOrElse("NA")
    }
    val latlonToZoneIDUDF = udf(lonlatToZoneID)

    parquetDF = parquetDF.withColumn("pickupZoneID", when(parquetDF("pickupZoneID") === "NA",
      latlonToZoneIDUDF(parquetDF("pickup_longitude"), parquetDF("pickup_latitude")))
      .otherwise(parquetDF("pickup_longitude")))

    parquetDF = parquetDF.withColumn("dropoffZoneID", when(parquetDF("dropoffZoneID") === "NA",
      latlonToZoneIDUDF(parquetDF("dropoff_longitude"), parquetDF("dropoff_latitude")))
      .otherwise(parquetDF("dropoff_longitude")))

    val boroughLookupID = (pickupID:String) => {
      val feature: Option[Feature] = broadcastFeatures.value.find(f => {
        f.properties("location_id").convertTo[String] == pickupID
      })
      feature.map(f => {
        f("borough").convertTo[String]
      }).getOrElse("NA")
    }

    val boroughUDF = udf(boroughLookupID)
    parquetDF = parquetDF.withColumn("pickupBorough", boroughUDF(parquetDF("pickupZoneID")))
    parquetDF = parquetDF.withColumn("dropoffBorough", boroughUDF(parquetDF("dropoffZoneID")))

    val manhattanToJFK = (borough:String, dropOffID:String) => {
      (borough == "Manhattan" && dropOffID == "132")
    }

    val manhattanToJFKUDF = udf(manhattanToJFK)
    parquetDF = parquetDF.withColumn("manhattanToJFK",
      manhattanToJFKUDF(parquetDF("pickupBorough"), parquetDF("dropoffZoneID")))

    val filteredDF =  parquetDF.filter(parquetDF("ManhattanToJFK") === true)
    val totalRidesFromManhattanTOJFK = filteredDF.count()
    println(totalRidesFromManhattanTOJFK)
    print(parquetDF.show())
    filteredDF
  }

  def findCorrelation(filteredDF:DataFrame) = {
    var weatherDF = SparkObject.spark.read.format("csv")
      .option("header", true)
      .load(URLs.weatherData:_*)

    weatherDF = weatherDF.select(weatherDF("DATE").cast("date"), weatherDF("AWND").cast("float"),
      weatherDF("SNOW").cast("float"), weatherDF("SNWD").cast("float"), weatherDF("TMIN").cast("float"),
      weatherDF("TMAX").cast("float"), weatherDF("PRCP").cast("float"))

     val joinedDF = weatherDF.join(filteredDF, weatherDF("DATE") === filteredDF("pickupDate"))
      .select(weatherDF("DATE"), weatherDF("AWND"), weatherDF("SNOW"), weatherDF("SNWD"), weatherDF("TMIN"),
        weatherDF("TMAX"), weatherDF("PRCP"))
    //    .cache()

    val ridesPerDay = joinedDF.groupBy("DATE").agg(count("DATE").alias("rides_per_day"))
    val cleanedDF =  ridesPerDay.join(joinedDF, "DATE").dropDuplicates().drop("DATE")
    cleanedDF.printSchema()

    val assembler = new VectorAssembler()
      .setInputCols(cleanedDF.columns)
      .setOutputCol("features")

    val corrFeatures = assembler.transform(cleanedDF)

    val Row(coeff1: Matrix) = Correlation.corr(corrFeatures, "features").head
    println(s"Pearson correlation matrix:\n $coeff1")

    val Row(coeff2: Matrix) = Correlation.corr(corrFeatures, "features", "spearman").head
    println(s"Spearman correlation matrix:\n $coeff2")
  }

sparksession看起来像

lazy val spark = {
    SparkSession
      .builder()
      .master("local[*]")
      .appName("NYCTaxiDataKlarna")
      .getOrCreate()
  }

我将-xms4g-xmx4g作为vm选项传递,因此每个内存为4.14.1gb。
编辑:所以我现在只运行manhatantojfk函数,在最后做了一点修改,基本上将数据持久化到hive中。下次我可以从那里开始。它已经运行了将近5个小时,还没有完成。

val dw = new DataWriter()
      dw.writeToHive(parquetDF, "parquet", "location_with_borough", "pickupDate")
      print(parquetDF.count())

      val filteredDF =  parquetDF.filter(parquetDF("ManhattanToJFK") === true)
      dw.writeToHive(parquetDF, "parquet", "manhattan_to_jfk", "pickupDate")
//      val totalRidesFromManhattanTOJFK = filteredDF.count()
//      println(totalRidesFromManhattanTOJFK)
//      print(parquetDF.show())
//      filteredDF
dced5bon

dced5bon1#

很可能你没有io限制,所以你读什么格式都不重要。您真正需要做的是找到查询计划(也就是spark正在做的实际工作)并对其进行解密。如果没有查询计划,您就无法判断问题是初始读取还是连接、组或关联本身。第一个好的地方是spark历史服务器的sql选项卡。另外,在执行ml(又称cleaneddf)之前,最好先将预处理的数据转储到存储器中,这样就不会经常重新运行预处理。
“我在用intellij管理一切。”
spark是如何配置的?您是否使用master=local[*]运行?
编辑:所以我建议启动spark shell并进行一些repl风格的代码探索。如果您的本地计算机上安装了spark,那么在启动spark shell时,您将看到它打印出spark历史服务器url。从这里运行您的代码,然后打开spark历史服务器,找出spark花费的时间。
在我看来,你的代码看起来比它需要的复杂。你可能无意中击中了自己的脚,你不会知道,直到你潜得更深一点。

➜  ~ spark-shell 
20/02/23 04:06:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.43.210:4040
Spark context available as 'sc' (master = local[*], app id = local-1582459622890).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.2
      /_/

Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

//enter code

相关问题