这个问题在这里已经有答案了:
分解sparkDataframe中的嵌套结构(3个答案)
六个月前关门了。
我想删除数据集的重复值
前任:
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| e_key|f_timestamp_day| key| value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| Tryout| 2020-04-01| item_guid_list| a^a^a^b | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| c^c^d^e^f^f| FR| iOS| 2020-04-01|
到
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| e_key|f_timestamp_day| key| value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| Tryout| 2020-04-01| item_guid_list| a | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| item_guid_list| b | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| c | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| d | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| e | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| f | FR| iOS| 2020-04-01|
但是
当我使用平面图时
结果是
++
||
++
||
||
||
||
||
我的密码是
StructType structType = new StructType();
structType.add("e_key", DataTypes.StringType);
structType.add("f_timestamp_day", DataTypes.StringType);
structType.add("key", DataTypes.StringType);
structType.add("value", DataTypes.StringType);
structType.add("f_country", DataTypes.StringType);
structType.add("f_os", DataTypes.StringType);
structType.add("received_date", DataTypes.StringType);
Dataset<Row> drop_duplicate_feature =
explode_events.flatMap(
(FlatMapFunction<Row, Row>)row->{
List<Row> list = new ArrayList<Row>();
String value = row.getString(3);
String[] array_of_value = value.split("\\^");
array_of_value = new HashSet<String>(Arrays.asList(array_of_value)).toArray(new String[0]);
for(int index = 0; index < array_of_value.length; index++){
list.add(
RowFactory.create(row.get(0),row.get(1),row.get(2),array_of_value[index],row.get(4),row.get(5),row.get(6))
);
}
return list.iterator();
}
, RowEncoder.apply(structType)
);
我使用flatmap生成不同的行并将其添加到列表中
为什么rowencoder.apply()不起作用?
3条答案
按热度按时间zaqlnxep1#
试试这个-
1. 加载提供的测试数据
从数组中移除distinct并将其分解
hfsqlsce2#
使用spark sql和org.apache.spark.sql.functions可以获得相同的结果:
blmhpbnm3#
使用split函数拆分值列,使用array\u disticnct和explode函数实现结果。