spark正在洗牌大量数据

pdsfdshx  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(503)

我写了一篇精彩的文章。如下所示:

public class TestClass {

public static void main(String[] args){
String masterIp = args[0];
String appName = args[1];
String inputFile = args[2];
String output = args[3];
SparkConf conf = new SparkConf().setMaster(masterIp).setAppName(appName);
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaRDD<String> rdd = sparkContext.textFile(inputFile);
Integer[] keyColumns = new Integer[] {0,1,2};
Broadcast<Integer[]> broadcastJob = sparkContext.broadcast(keyColumns);

Function<Integer,Long> createCombiner = v1 -> Long.valueOf(v1);
Function2<Long, Integer, Long> mergeValue = (v1,v2) -> v1+v2;
Function2<Long, Long, Long> mergeCombiners = (v1,v2) -> v1+v2;

JavaPairRDD<String, Long> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() {
      private static final long serialVersionUID = -6293440291696487370L;
      @Override
      public Tuple2<String, Integer> call(String t) throws Exception {
        String[] record = t.split(",");
        Integer[] keyColumns = broadcastJob.value();
        StringBuilder key = new StringBuilder();
        for (int index = 0; index < keyColumns.length; index++) {
          key.append(record[keyColumns[index]]);
        }
        key.append("|id=1");
        Integer value = new Integer(record[4]);
        return new Tuple2<String, Integer>(key.toString(),value);
      }}).combineByKey(createCombiner, mergeValue, mergeCombiners).reduceByKey((v1,v2) -> v1+v2);
      pairRDD.saveAsTextFile(output);
   }
}

程序计算每个键的值之和。根据我的理解,本地合并器应该在每个节点上运行,并将相同键的值相加,然后用少量数据进行洗牌。但在sparkui上,它显示了大量的无序读写(几乎58gb)。我做错什么了吗?如何知道本地合路器是否工作?
群集详细信息:-
20节点群集
每个节点有80gb硬盘,8gb ram,4核
hadoop-2.7.2版本
spark-2.0.2(预装配hadoop-2.7.x发行版)
输入文件详细信息:-
输入文件存储在hdfs上
输入文件大小:400gb
记录数:1612999990
记录列:string(2个字符)、int、int、string(2个字符)、int、int、string(2个字符)、string(2个字符)、string(2个字符)
注意:不同键的最大数目是1081600。
在spark日志中,我看到任务使用localtylevel node\u local运行。

ncecgwcz

ncecgwcz1#

让我们分解这个问题,看看能得到什么。为了简化计算,假设:
记录总数为1.6e8
唯一密钥数为1e6
分割大小是128mb(这似乎与ui中的任务数一致)。
有了这些值,数据将被放入3200个分区(在您的例子中是3125个分区)。这将为您提供大约51200条记录。此外,如果每个键的值数分布是均匀的,则每个键平均应该有160条记录。
如果数据是随机分布的(例如,它不是按键排序的),您可以预期每个分区每个键的平均记录数将接近1*。这基本上是最坏的情况,在这种情况下,map-side combine根本不会减少数据量。
此外,您必须记住,平面文件的大小通常会大大小于序列化对象的大小。
对于实际数据,您通常可以预期数据收集过程中会出现某种类型的顺序,因此情况应该比我们上面计算的要好,但底线是,如果数据尚未按分区分组,则map-side combine可能根本无法提供任何改进。
您可能可以通过使用更大一点的分割(256mb会使每个分区的容量超过100k)来减少无序数据的数量,但代价是gc暂停时间更长,并且可能会出现其他gc问题。

  • 您可以通过使用替换进行采样来模拟:
import pandas as pd
import numpy as np

(pd
    .DataFrame({"x": np.random.choice(np.arange(3200), size=160, replace=True)})
    .groupby("x")
    .x.count()
    .mean())

或者想想把160个球随机分配给3200个桶的问题。

相关问题