groupby与spark-java

gcxthw6b  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(346)

我可以用spark从csv读取数据,但我不知道如何使用特定数组进行分组。我想 groupBy '名称'。这是我的密码:

public class readspark {
public static void main(String[] args) {
    final ObjectMapper om = new ObjectMapper();
    System.setProperty("hadoop.home.dir", "D:\\Task\\winutils-master\\hadoop-3.0.0");
    SparkConf conf = new SparkConf()
            .setMaster("local[3]")
            .setAppName("Read Spark CSV")
            .set("spark.driver.host", "localhost");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    JavaRDD<String> lines = jsc.textFile("D:\\Task\\data.csv");
    JavaRDD<DataModel> rdd = lines.map(new Function<String, DataModel>() {
        @Override
        public DataModel call(String s) throws Exception {
            String[] dataArray = s.split(",");
            DataModel dataModel = new DataModel();

            dataModel.Name(dataArray[0]);
            dataModel.ID(dataArray[1]);
            dataModel.Addres(dataArray[2]);
            dataModel.Salary(dataArray[3]);

            return dataModel;
        }
    });
    rdd.foreach(new VoidFunction<DataModel>() {
                    @Override
                    public void call(DataModel stringObjectMap) throws Exception {
                        System.out.println(om.writeValueAsString(stringObjectMap));
                    }
                }
    );
}
7lrncoxx

7lrncoxx1#

spark直接提供分组功能:

JavaPairRDD<String, Iterable<DataModel>> groupedRdd = rdd.groupBy(dataModel -> dataModel.getName());

这将返回一对rdd,其中键是名称(由提供给groupby的lambda确定),值是具有该名称的数据模型。
如果要按逻辑更改组,则只需提供相应的lambda。

相关问题