如何提高spark性能?

7jmck4yq  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(572)

我有一个java程序,可以处理大数据集。数据集存储在hdfs(csv)中。
这个程序运行得很好,但速度很慢。
程序的作用:
加载csv文件
将行分隔为字符串[]
筛选字符串数组
Map到myobject
将我的对象保存到cassandra
我的主要方法是:

public static void main(String[] args) {

        // configure spark
        SparkConf sparkConf = new SparkConf().setAppName("Write to cassandra app")
                .setMaster("local[*]")
                .set("spark.executor.memory", "4g");

        if (args.length > 1)
            sparkConf.set("spark.cassandra.connection.host", args[1]);

        // start a spark context
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        // read text file to RDD
        JavaRDD<String> lines = sc.textFile(args[0]);

        JavaRDD<MyObject> myObjectJavaRDD = lines
                .map(line -> line.split(","))
                .filter(someFilter)
                .map(MyObject::new);

        javaFunctions(myObjectJavaRDD).writerBuilder("ks", "table", mapToRow(MyObject.class)).saveToCassandra();
    }

如何提高绩效?
谢谢你的回答。

rqqzpn5f

rqqzpn5f1#

您的代码没有无序问题(除非您必须写出到hdfs),默认分区是由输入格式定义的,在hadoop上,hdfs内核和filter或map不会更改分区。如果你能先过滤,你会看到一些改进

JavaRDD<MyObject> myObjectJavaRDD = lines
                .filter(someFilter)
                .map(line -> line.split(","))
                .map(MyObject::new);

spark只能为rdd的每个分区运行一个并发任务,最多可达集群中的核心数。因此,如果您有一个50核的集群,那么您希望您的rdd至少有50个分区。至于选择一个“好”数量的分区,您通常希望至少与并行执行器的数量相同。您可以通过调用

sc.defaultParallelism

或按编号检查rdd分区

someRDD.partitions.size

使用读取文件创建rdd时

rdd = SparkContext().textFile("hdfs://…/file.txt")

分区的数量可以更小。理想情况下,您将获得与hdfs中相同数量的块,但是如果文件中的行太长(长于块大小),则分区会更少。
为rdd设置分区数的首选方法是直接将其作为调用中的第二个输入参数传递,如

rdd = sc.textFile("hdfs://… /file.txt", 400)

其中400是分区数。在这种情况下,分区可以进行400次拆分,这将由hadoop的textinputformat完成,而不是spark,而且工作速度会快得多。代码还生成了400个并发任务,试图将file.txt直接加载到400个分区中。
重新分区:增加分区,在过滤器增加并行后重新平衡分区

repartition(numPartitions: Int)

合并:在输出到hdfs/external之前,减少分区而不进行无序合并

coalesce(numPartitions: Int, suffle: Boolean = false)

最后,同样重要的是,您可以使用不同的值和基准进行一些试验,看看这个过程需要多少时间

val start = System.nanoTime()

  // my process

  val end = System.nanoTime()

  val time = end - start
  println(s"My App takes: $time")

希望有帮助

相关问题