spark性能:本地比集群快

cnh2zyt3  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(562)

我试着在我的家庭网络上建立一个spark集群,但与单机版相比,我看不到任何性能提升—事实上,它比在本地[*]上运行时稍微慢一点。有人能帮忙/解释一下原因吗?
我所做的如下:
我正在使用movielens 10m数据集。http://files.grouplens.org/datasets/movielens/ml-10m.zip
对于我的本地集群,我有两台规格相同的现代高性能mac,每台都有32gbram
当我只运行独立的spark示例ie local[*]时,我的时间是30.82分钟
当我遇到一个连接两台mac的Spark簇时,我的时间是35分钟
我的spark提交spark集群的参数如下

spark-submit --class com.sundogsoftware.spark.MovieSimilarities10MDataset --deploy-mode cluster --master spark://mbp2.lan:7077  --driver-memory 1g
--num-executors 2 --executor-cores 8 --executor-memory 28g

这将导致在每个mac上使用8个内核,并使用28gb ram(通过spark webapp确认)
当然,我希望给定两台硬件规格相同的机器,并将它们连接到同一个spark集群,使用相同的数据集(movielens 10m)可以提高性能
如有任何建议,我将不胜感激。我已经调整了执行器/内核/内存的数量很多次了,但没有任何影响。
谢谢
课程如下:

package com.sundogsoftware.spark

import java.util.concurrent.TimeUnit

import org.apache.log4j._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.{Dataset, SparkSession}

// To run on EMR successfully + output results for Star Wars:
// aws s3 cp s3://sundog-spark/MovieSimilarities1MDataset.jar ./
// aws s3 cp s3://sundog-spark/ml-10M100K/movies.dat ./
// spark-submit --executor-memory 1g MovieSimilarities1MDataset.jar 260

object MovieSimilarities10MDataset {

  case class Movies(userID: Int, movieID: Int, rating: Int, timestamp: Long)
  case class MoviesNames(movieID: Int, movieTitle: String)
  case class MoviePairs(movie1: Int, movie2: Int, rating1: Int, rating2: Int)
  case class MoviePairsSimilarity(movie1: Int, movie2: Int, score: Double, numPairs: Long)

  def computeCosineSimilarity(spark: SparkSession, data: Dataset[MoviePairs]): Dataset[MoviePairsSimilarity] = {
    // Compute xx, xy and yy columns
    val pairScores = data
      .withColumn("xx", col("rating1") * col("rating1"))
      .withColumn("yy", col("rating2") * col("rating2"))
      .withColumn("xy", col("rating1") * col("rating2"))

    // Compute numerator, denominator and numPairs columns
    val calculateSimilarity = pairScores
      .groupBy("movie1", "movie2")
      .agg(
        sum(col("xy")).alias("numerator"),
        (sqrt(sum(col("xx"))) * sqrt(sum(col("yy")))).alias("denominator"),
        count(col("xy")).alias("numPairs")
      )

    // Calculate score and select only needed columns (movie1, movie2, score, numPairs)
    import spark.implicits._
    val result = calculateSimilarity
      .withColumn("score",
        when(col("denominator") =!= 0, col("numerator") / col("denominator"))
          .otherwise(null)
      ).select("movie1", "movie2", "score", "numPairs").as[MoviePairsSimilarity]

    result
  }

  /**Get movie name by given movie id */
  def getMovieName(movieNames: Dataset[MoviesNames], movieId: Int): String = {
    val result = movieNames.filter(col("movieID") === movieId)
      .select("movieTitle").collect()(0)

    result(0).toString
  }
  /**Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    val startTime = System.nanoTime

    // Create a SparkSession without specifying master
    val spark = SparkSession
      .builder
      .appName("MovieSimilarities10M")
      .getOrCreate()

    // Create schema when reading u.item
    val moviesNamesSchema = new StructType()
      .add("movieID", IntegerType, nullable = true)
      .add("movieTitle", StringType, nullable = true)

    // Create schema when reading u.data
    val moviesSchema = new StructType()
      .add("userID", IntegerType, nullable = true)
      .add("movieID", IntegerType, nullable = true)
      .add("rating", IntegerType, nullable = true)
      .add("timestamp", LongType, nullable = true)

    println("\nLoading movie names...")
    import spark.implicits._
    // Create a broadcast dataset of movieID and movieTitle.
    // Apply ISO-885901 charset
    val movieNames = spark.read
      .option("sep", "::")
      .option("charset", "ISO-8859-1")
      .schema(moviesNamesSchema)
      .csv("movies.dat")
      .as[MoviesNames]

    // Load up movie data as dataset
    val movies = spark.read
      .option("sep", "::")
      .schema(moviesSchema)
      .csv("ratings.dat")
      .as[Movies]

    val ratings = movies.select("userId", "movieId", "rating")

    // Emit every movie rated together by the same user.
    // Self-join to find every combination.
    // Select movie pairs and rating pairs
    val moviePairs = ratings.as("ratings1")
      .join(ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" && $"ratings1.movieId" < $"ratings2.movieId")
      .select($"ratings1.movieId".alias("movie1"),
        $"ratings2.movieId".alias("movie2"),
        $"ratings1.rating".alias("rating1"),
        $"ratings2.rating".alias("rating2")
      ).repartition(100).as[MoviePairs]

    val moviePairSimilarities = computeCosineSimilarity(spark, moviePairs).cache()

    if (args.length > 0) {
      val scoreThreshold = 0.88
      val coOccurenceThreshold = 1000.0

      val movieID: Int = args(0).toInt

      // Filter for movies with this sim that are "good" as defined by
      // our quality thresholds above
      val filteredResults = moviePairSimilarities.filter(
        (col("movie1") === movieID || col("movie2") === movieID) &&
          col("score") > scoreThreshold && col("numPairs") > coOccurenceThreshold)

      // Sort by quality score.
      val results = filteredResults.sort(col("score").desc).take(50)

      println("\nTop 50 similar movies for " + getMovieName(movieNames, movieID))
      for (result <- results) {
        // Display the similarity result that isn't the movie we're looking at
        var similarMovieID = result.movie1
        if (similarMovieID == movieID) {
          similarMovieID = result.movie2
        }
        println(getMovieName(movieNames, similarMovieID) + "\tscore: " + result.score + "\tstrength: " + result.numPairs)
      }

      val stopTime = System.nanoTime
      val elapsedTime = (stopTime - startTime)

      // TimeUnit
      val convert = TimeUnit.SECONDS.convert(elapsedTime, TimeUnit.NANOSECONDS)

      //    System.out.println(convert + " seconds")
      println(s"elapsedTime sec=$convert")
    }
  }
}
9q78igpj

9q78igpj1#

分布式spark有很多开销,你在单机版上是得不到的:通信,需要在有线网络上整理数据等等。我认为你需要两个以上的工人才能离开单机版。
确保数据的格式可以很容易地在工作人员之间进行拆分。考虑将加载.dat文件的工作流程中的第一步作为一个步骤,并将其保存为parquet格式,或者至少是avro格式,可以使用snappy(而不是gzip)进行压缩。
您将需要一个共享的文件系统来保证所有的工作;对于两台机器,您使用的是nfs,对吗?将(转换的)源文件粘贴在本地fs上完全相同的路径中,并将共享nfs用于所有其他工作。这样:没有网络io来加载数据。

qlvxas9a

qlvxas9a2#

您从internet下载同一数据集两次而不是一次。这比下载一次要长。
顺便说一句,你在这里做数值处理。你可能会得到更多的加速使用gpu和这样做作为一个大容量矩阵运算的gpu。

相关问题