我负责intellij的一切。应用程序是scala。
scalaversion:=“2.12.10”sparkversion=“2.4.4”
所以我在处理纽约出租车的数据。近11亿条记录,120gb数据。我读取它,删除不必要的数据,清理它,然后以parquet格式以默认压缩方式写入分区(日、月、年)中的hive meta store。到目前为止还不错,它在几分钟(2-3分钟)内工作得相当快。现在我再次从metastore中读取数据并进行一些操作,基本上,我想计算所有来自曼哈顿肯尼迪机场的车程,最后计算几个udf和一个计数。这需要很长时间。
然后我有一个小于1mb的天气数据集。差不多有50列,我用它做了一个新的df,只有7列。必须找到乘车和天气等之间的一些相关性,所以对日期进行连接,然后使用spark.ml相关函数来获得相关矩阵。这也需要时间。
总的来说,所有这些都需要6个小时来运行(我在4-5个小时,我搞砸了一些东西,不得不重新启动)。我等不及要检查它是否准确。
从我阅读和理解Parquet应该比阅读和使用csv快得多,但我发现恰恰相反。所以我觉得我做错了什么。可能是一些配置或设置等?我是一个初学者与Spark和学习它自己。所以如果我犯了一个错误,请原谅我。任何帮助都会大有帮助。
如果我应该张贴任何更新或信息让我知道。我可以编辑和发布它。
谢谢
def analysis() = {
var parquetDF = SparkObject.spark.read.parquet("spark-warehouse/location")
// .cache()
val manhattanTojfkDF = countManhattanToJKF(parquetDF)
findCorrelation(manhattanTojfkDF)
}
def countManhattanToJKF(df:DataFrame):DataFrame = {
var parquetDF = df
// val geojson = scala.io.Source.fromURL(this.getClass.getResource("/nyc-borough-boundaries-polygon.geojson")).mkString
val geojson = scala.io.Source.fromURL(this.getClass.getResource("/NYC Taxi Zones.geojson")).mkString
val features = geojson.parseJson.convertTo[FeatureCollection]
val broadcastFeatures = SparkObject.spark.sparkContext.broadcast(features)
val lonlatToZoneID = (longitude: Double, latitude: Double) => {
val feature: Option[Feature] = broadcastFeatures.value.find(f => {
f.geometry.contains(new Point(longitude, latitude))
})
feature.map(f => {
f("location_id").convertTo[String]
}).getOrElse("NA")
}
val latlonToZoneIDUDF = udf(lonlatToZoneID)
parquetDF = parquetDF.withColumn("pickupZoneID", when(parquetDF("pickupZoneID") === "NA",
latlonToZoneIDUDF(parquetDF("pickup_longitude"), parquetDF("pickup_latitude")))
.otherwise(parquetDF("pickup_longitude")))
parquetDF = parquetDF.withColumn("dropoffZoneID", when(parquetDF("dropoffZoneID") === "NA",
latlonToZoneIDUDF(parquetDF("dropoff_longitude"), parquetDF("dropoff_latitude")))
.otherwise(parquetDF("dropoff_longitude")))
val boroughLookupID = (pickupID:String) => {
val feature: Option[Feature] = broadcastFeatures.value.find(f => {
f.properties("location_id").convertTo[String] == pickupID
})
feature.map(f => {
f("borough").convertTo[String]
}).getOrElse("NA")
}
val boroughUDF = udf(boroughLookupID)
parquetDF = parquetDF.withColumn("pickupBorough", boroughUDF(parquetDF("pickupZoneID")))
parquetDF = parquetDF.withColumn("dropoffBorough", boroughUDF(parquetDF("dropoffZoneID")))
val manhattanToJFK = (borough:String, dropOffID:String) => {
(borough == "Manhattan" && dropOffID == "132")
}
val manhattanToJFKUDF = udf(manhattanToJFK)
parquetDF = parquetDF.withColumn("manhattanToJFK",
manhattanToJFKUDF(parquetDF("pickupBorough"), parquetDF("dropoffZoneID")))
val filteredDF = parquetDF.filter(parquetDF("ManhattanToJFK") === true)
val totalRidesFromManhattanTOJFK = filteredDF.count()
println(totalRidesFromManhattanTOJFK)
print(parquetDF.show())
filteredDF
}
def findCorrelation(filteredDF:DataFrame) = {
var weatherDF = SparkObject.spark.read.format("csv")
.option("header", true)
.load(URLs.weatherData:_*)
weatherDF = weatherDF.select(weatherDF("DATE").cast("date"), weatherDF("AWND").cast("float"),
weatherDF("SNOW").cast("float"), weatherDF("SNWD").cast("float"), weatherDF("TMIN").cast("float"),
weatherDF("TMAX").cast("float"), weatherDF("PRCP").cast("float"))
val joinedDF = weatherDF.join(filteredDF, weatherDF("DATE") === filteredDF("pickupDate"))
.select(weatherDF("DATE"), weatherDF("AWND"), weatherDF("SNOW"), weatherDF("SNWD"), weatherDF("TMIN"),
weatherDF("TMAX"), weatherDF("PRCP"))
// .cache()
val ridesPerDay = joinedDF.groupBy("DATE").agg(count("DATE").alias("rides_per_day"))
val cleanedDF = ridesPerDay.join(joinedDF, "DATE").dropDuplicates().drop("DATE")
cleanedDF.printSchema()
val assembler = new VectorAssembler()
.setInputCols(cleanedDF.columns)
.setOutputCol("features")
val corrFeatures = assembler.transform(cleanedDF)
val Row(coeff1: Matrix) = Correlation.corr(corrFeatures, "features").head
println(s"Pearson correlation matrix:\n $coeff1")
val Row(coeff2: Matrix) = Correlation.corr(corrFeatures, "features", "spearman").head
println(s"Spearman correlation matrix:\n $coeff2")
}
sparksession看起来像
lazy val spark = {
SparkSession
.builder()
.master("local[*]")
.appName("NYCTaxiDataKlarna")
.getOrCreate()
}
我将-xms4g-xmx4g作为vm选项传递,因此每个内存为4.14.1gb。
编辑:所以我现在只运行manhatantojfk函数,在最后做了一点修改,基本上将数据持久化到hive中。下次我可以从那里开始。它已经运行了将近5个小时,还没有完成。
val dw = new DataWriter()
dw.writeToHive(parquetDF, "parquet", "location_with_borough", "pickupDate")
print(parquetDF.count())
val filteredDF = parquetDF.filter(parquetDF("ManhattanToJFK") === true)
dw.writeToHive(parquetDF, "parquet", "manhattan_to_jfk", "pickupDate")
// val totalRidesFromManhattanTOJFK = filteredDF.count()
// println(totalRidesFromManhattanTOJFK)
// print(parquetDF.show())
// filteredDF
1条答案
按热度按时间dced5bon1#
很可能你没有io限制,所以你读什么格式都不重要。您真正需要做的是找到查询计划(也就是spark正在做的实际工作)并对其进行解密。如果没有查询计划,您就无法判断问题是初始读取还是连接、组或关联本身。第一个好的地方是spark历史服务器的sql选项卡。另外,在执行ml(又称cleaneddf)之前,最好先将预处理的数据转储到存储器中,这样就不会经常重新运行预处理。
“我在用intellij管理一切。”
spark是如何配置的?您是否使用master=local[*]运行?
编辑:所以我建议启动spark shell并进行一些repl风格的代码探索。如果您的本地计算机上安装了spark,那么在启动spark shell时,您将看到它打印出spark历史服务器url。从这里运行您的代码,然后打开spark历史服务器,找出spark花费的时间。
在我看来,你的代码看起来比它需要的复杂。你可能无意中击中了自己的脚,你不会知道,直到你潜得更深一点。