我试着在我的家庭网络上建立一个spark集群,但与单机版相比,我看不到任何性能提升—事实上,它比在本地[*]上运行时稍微慢一点。有人能帮忙/解释一下原因吗?
我所做的如下:
我正在使用movielens 10m数据集。http://files.grouplens.org/datasets/movielens/ml-10m.zip
对于我的本地集群,我有两台规格相同的现代高性能mac,每台都有32gbram
当我只运行独立的spark示例ie local[*]时,我的时间是30.82分钟
当我遇到一个连接两台mac的Spark簇时,我的时间是35分钟
我的spark提交spark集群的参数如下
spark-submit --class com.sundogsoftware.spark.MovieSimilarities10MDataset --deploy-mode cluster --master spark://mbp2.lan:7077 --driver-memory 1g
--num-executors 2 --executor-cores 8 --executor-memory 28g
这将导致在每个mac上使用8个内核,并使用28gb ram(通过spark webapp确认)
当然,我希望给定两台硬件规格相同的机器,并将它们连接到同一个spark集群,使用相同的数据集(movielens 10m)可以提高性能
如有任何建议,我将不胜感激。我已经调整了执行器/内核/内存的数量很多次了,但没有任何影响。
谢谢
课程如下:
package com.sundogsoftware.spark
import java.util.concurrent.TimeUnit
import org.apache.log4j._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.{Dataset, SparkSession}
// To run on EMR successfully + output results for Star Wars:
// aws s3 cp s3://sundog-spark/MovieSimilarities1MDataset.jar ./
// aws s3 cp s3://sundog-spark/ml-10M100K/movies.dat ./
// spark-submit --executor-memory 1g MovieSimilarities1MDataset.jar 260
object MovieSimilarities10MDataset {
case class Movies(userID: Int, movieID: Int, rating: Int, timestamp: Long)
case class MoviesNames(movieID: Int, movieTitle: String)
case class MoviePairs(movie1: Int, movie2: Int, rating1: Int, rating2: Int)
case class MoviePairsSimilarity(movie1: Int, movie2: Int, score: Double, numPairs: Long)
def computeCosineSimilarity(spark: SparkSession, data: Dataset[MoviePairs]): Dataset[MoviePairsSimilarity] = {
// Compute xx, xy and yy columns
val pairScores = data
.withColumn("xx", col("rating1") * col("rating1"))
.withColumn("yy", col("rating2") * col("rating2"))
.withColumn("xy", col("rating1") * col("rating2"))
// Compute numerator, denominator and numPairs columns
val calculateSimilarity = pairScores
.groupBy("movie1", "movie2")
.agg(
sum(col("xy")).alias("numerator"),
(sqrt(sum(col("xx"))) * sqrt(sum(col("yy")))).alias("denominator"),
count(col("xy")).alias("numPairs")
)
// Calculate score and select only needed columns (movie1, movie2, score, numPairs)
import spark.implicits._
val result = calculateSimilarity
.withColumn("score",
when(col("denominator") =!= 0, col("numerator") / col("denominator"))
.otherwise(null)
).select("movie1", "movie2", "score", "numPairs").as[MoviePairsSimilarity]
result
}
/**Get movie name by given movie id */
def getMovieName(movieNames: Dataset[MoviesNames], movieId: Int): String = {
val result = movieNames.filter(col("movieID") === movieId)
.select("movieTitle").collect()(0)
result(0).toString
}
/**Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
val startTime = System.nanoTime
// Create a SparkSession without specifying master
val spark = SparkSession
.builder
.appName("MovieSimilarities10M")
.getOrCreate()
// Create schema when reading u.item
val moviesNamesSchema = new StructType()
.add("movieID", IntegerType, nullable = true)
.add("movieTitle", StringType, nullable = true)
// Create schema when reading u.data
val moviesSchema = new StructType()
.add("userID", IntegerType, nullable = true)
.add("movieID", IntegerType, nullable = true)
.add("rating", IntegerType, nullable = true)
.add("timestamp", LongType, nullable = true)
println("\nLoading movie names...")
import spark.implicits._
// Create a broadcast dataset of movieID and movieTitle.
// Apply ISO-885901 charset
val movieNames = spark.read
.option("sep", "::")
.option("charset", "ISO-8859-1")
.schema(moviesNamesSchema)
.csv("movies.dat")
.as[MoviesNames]
// Load up movie data as dataset
val movies = spark.read
.option("sep", "::")
.schema(moviesSchema)
.csv("ratings.dat")
.as[Movies]
val ratings = movies.select("userId", "movieId", "rating")
// Emit every movie rated together by the same user.
// Self-join to find every combination.
// Select movie pairs and rating pairs
val moviePairs = ratings.as("ratings1")
.join(ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" && $"ratings1.movieId" < $"ratings2.movieId")
.select($"ratings1.movieId".alias("movie1"),
$"ratings2.movieId".alias("movie2"),
$"ratings1.rating".alias("rating1"),
$"ratings2.rating".alias("rating2")
).repartition(100).as[MoviePairs]
val moviePairSimilarities = computeCosineSimilarity(spark, moviePairs).cache()
if (args.length > 0) {
val scoreThreshold = 0.88
val coOccurenceThreshold = 1000.0
val movieID: Int = args(0).toInt
// Filter for movies with this sim that are "good" as defined by
// our quality thresholds above
val filteredResults = moviePairSimilarities.filter(
(col("movie1") === movieID || col("movie2") === movieID) &&
col("score") > scoreThreshold && col("numPairs") > coOccurenceThreshold)
// Sort by quality score.
val results = filteredResults.sort(col("score").desc).take(50)
println("\nTop 50 similar movies for " + getMovieName(movieNames, movieID))
for (result <- results) {
// Display the similarity result that isn't the movie we're looking at
var similarMovieID = result.movie1
if (similarMovieID == movieID) {
similarMovieID = result.movie2
}
println(getMovieName(movieNames, similarMovieID) + "\tscore: " + result.score + "\tstrength: " + result.numPairs)
}
val stopTime = System.nanoTime
val elapsedTime = (stopTime - startTime)
// TimeUnit
val convert = TimeUnit.SECONDS.convert(elapsedTime, TimeUnit.NANOSECONDS)
// System.out.println(convert + " seconds")
println(s"elapsedTime sec=$convert")
}
}
}
2条答案
按热度按时间9q78igpj1#
分布式spark有很多开销,你在单机版上是得不到的:通信,需要在有线网络上整理数据等等。我认为你需要两个以上的工人才能离开单机版。
确保数据的格式可以很容易地在工作人员之间进行拆分。考虑将加载.dat文件的工作流程中的第一步作为一个步骤,并将其保存为parquet格式,或者至少是avro格式,可以使用snappy(而不是gzip)进行压缩。
您将需要一个共享的文件系统来保证所有的工作;对于两台机器,您使用的是nfs,对吗?将(转换的)源文件粘贴在本地fs上完全相同的路径中,并将共享nfs用于所有其他工作。这样:没有网络io来加载数据。
qlvxas9a2#
您从internet下载同一数据集两次而不是一次。这比下载一次要长。
顺便说一句,你在这里做数值处理。你可能会得到更多的加速使用gpu和这样做作为一个大容量矩阵运算的gpu。