在Spark SQL的Java API中对Map进行并行化

z9smfwbn  于 2023-05-27  发布在  Java
关注(0)|答案(1)|浏览(241)

Java 11在这里使用Spark SQL的Java API(不是Scala!)。我正在尝试将Map<String,String>转换为具有2个String列的Dataset<Row>。我最好的尝试是:

public Dataset<Row> createKeyValueDataFrame(Map<String,String> dfMap, String keyColName, String valueColName) {

    JavaSparkContext sparkContext = new JavaSparkContext(sparkSession.sparkContext());

    JavaRDD<Row> rdd = sparkContext.parallelize(Arrays.asList(dfMap.entrySet().toArray()))
        .map(entry -> RowFactory.create(entry.getKey(), entry.getValue()));

    StructType schema = new StructType()
        .add(keyColName, DataTypes.StringType)
        .add(valueColName, DataTypes.StringType);

    return sparkSession.createDataFrame(rdd, schema);

}

但是我在entry.getKey()entry.getValue()上得到编译器错误。有人能指出为什么我得到这些错误,并帮助我找出修复是什么?先谢谢你了!

pvcm50d1

pvcm50d11#

问题是当你在做

Arrays.asList(
  dfMap         // Map<String,String>
    .entrySet() // Set<Map.Entry<String,String>>
    .toArray()  // Object[]
)

所以,当你尝试

sparkContext.parallelize(Arrays.asList(dfMap.entrySet().toArray()))

该调用的返回值是JavaRDD<Object>。这就是编译器显示Cannot resolve method 'getKey' in 'Object'的原因,因为当您调用map时,它推断lambda参数是Object
不知道你到底想用这段代码做什么。方法entrySet()将返回一个Map.EntrySet,它表示一个键值对。如果您只需要将其转换为List的键值对,则应该替换

Arrays.asList(dfMap.entrySet().toArray())

new ArrayList<>(dfMap.entrySet())

相关问题