自定义类型数据集如何调用groupby方法?

velaa5lx  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(369)

我通过spark创建了一个自定义类型的数据集。

public class KeyValuePair {
    String source;
    String target;
    int value;
    getter...
    setter...
}
...
List<KeyValuePair> list = generateList();
Dataset<KeyValuePair> dataset = spark.createDataset(list, Encoders.kryo(KeyValuePair.class));
Dataset<Row> agg = dataset.groupBy("source", "target").agg(avg("value"));

当数据集调用groupby方法时,它将抛出一个异常。线程“main”org.apache.spark.sql.analysisexception中出现异常:无法解析(值)中的列名“source”
我想知道自定义类型数据集是否可以调用groupby方法。行类型数据集是唯一可以调用此方法而不引发异常的数据集吗?
如何聚合自定义类型数据集?

yk9xbfzb

yk9xbfzb1#

出现异常的原因是:数据集没有聚合所需的列。在创建数据集时使用encoders.bean(class)可以得到预期的结果。
代码:

public class DatasetAggregation {
     public static void main(String[] args) {
        SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
        List<KeyValuePair> list = generateList();
        Dataset<KeyValuePair> dataset = spark.createDataset(list, Encoders.bean(KeyValuePair.class));
        Dataset<Row> agg = dataset.groupBy("source", "target").agg(avg("value"));
        agg.show();
    }

    public static List<KeyValuePair> generateList() {
        KeyValuePair k = new KeyValuePair();
        k.setSource("a");
        k.setTarget("b");
        k.setValue(10);
        return Arrays.asList(k, k,k);

    }
}

 /* output
 +------+------+----------+
 |source|target|avg(value)|
 +------+------+----------+
 |     a|     b|      10.0|
 +------+------+----------+

* /

相关问题