q:dataset< row>flatmap到空数据集spark java

hsvhsicv  于 2021-05-29  发布在  Spark
关注(0)|答案(3)|浏览(573)

这个问题在这里已经有答案了

分解sparkDataframe中的嵌套结构(3个答案)
六个月前关门了。
我想删除数据集的重复值
前任:

+----------+---------------+--------------------+--------------------+---------+----+-------------+
|     e_key|f_timestamp_day|                 key|               value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
|    Tryout|     2020-04-01|      item_guid_list|            a^a^a^b |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|         c^c^d^e^f^f|       FR| iOS|   2020-04-01|

+----------+---------------+--------------------+--------------------+---------+----+-------------+
|     e_key|f_timestamp_day|                 key|               value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
|    Tryout|     2020-04-01|      item_guid_list|                  a |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|      item_guid_list|                  b |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  c |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  d |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  e |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  f |       FR| iOS|   2020-04-01|

但是
当我使用平面图时
结果是

++
||
++
||
||
||
||
||

我的密码是

StructType structType = new StructType();
            structType.add("e_key", DataTypes.StringType);
            structType.add("f_timestamp_day", DataTypes.StringType);
            structType.add("key", DataTypes.StringType);
            structType.add("value", DataTypes.StringType);
            structType.add("f_country", DataTypes.StringType);
            structType.add("f_os", DataTypes.StringType);
            structType.add("received_date", DataTypes.StringType);

            Dataset<Row> drop_duplicate_feature = 
            explode_events.flatMap(
                (FlatMapFunction<Row, Row>)row->{
                    List<Row> list = new ArrayList<Row>();
                    String value = row.getString(3);
                    String[] array_of_value = value.split("\\^");
                    array_of_value = new HashSet<String>(Arrays.asList(array_of_value)).toArray(new String[0]);
                    for(int index = 0; index < array_of_value.length; index++){
                        list.add(
                            RowFactory.create(row.get(0),row.get(1),row.get(2),array_of_value[index],row.get(4),row.get(5),row.get(6))
                        );
                    }
                    return list.iterator();
                }
                , RowEncoder.apply(structType)
            );

我使用flatmap生成不同的行并将其添加到列表中
为什么rowencoder.apply()不起作用?

zaqlnxep

zaqlnxep1#

试试这个-

1. 加载提供的测试数据

String data = "   e_key|f_timestamp_day|                 key|               value|f_country|f_os|received_date\n" +
                "  Tryout|     2020-04-01|      item_guid_list|            a^a^a^b |       FR| iOS|   2020-04-01\n" +
                "  Tryout|     2020-04-01|            sku_list|         c^c^d^e^f^f|       FR| iOS|   2020-04-01";

        List<String> list1 = Arrays.stream(data.split(System.lineSeparator()))
                .map(s -> Arrays.stream(s.split("\\|"))
                        .map(s1 -> s1.replaceAll("^[ \t]+|[ \t]+$", ""))
                        .collect(Collectors.joining(","))
                )
                .collect(Collectors.toList());

        Dataset<Row> dataset = spark.read()
                .option("header", true)
                .option("inferSchema", true)
                .option("sep", ",")
                .option("nullValue", "null")
                .csv(spark.createDataset(list1, Encoders.STRING()));
        dataset.show(false);
        dataset.printSchema();
        /**
         * +------+-------------------+--------------+-----------+---------+----+-------------------+
         * |e_key |f_timestamp_day    |key           |value      |f_country|f_os|received_date      |
         * +------+-------------------+--------------+-----------+---------+----+-------------------+
         * |Tryout|2020-04-01 00:00:00|item_guid_list|a^a^a^b    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |c^c^d^e^f^f|FR       |iOS |2020-04-01 00:00:00|
         * +------+-------------------+--------------+-----------+---------+----+-------------------+
         *
         * root
         *  |-- e_key: string (nullable = true)
         *  |-- f_timestamp_day: timestamp (nullable = true)
         *  |-- key: string (nullable = true)
         *  |-- value: string (nullable = true)
         *  |-- f_country: string (nullable = true)
         *  |-- f_os: string (nullable = true)
         *  |-- received_date: timestamp (nullable = true)
         */

从数组中移除distinct并将其分解

dataset.withColumn("value", explode(array_distinct(split(col("value"), "\\^"))))
                .show(false);
        /**
         * +------+-------------------+--------------+-----+---------+----+-------------------+
         * |e_key |f_timestamp_day    |key           |value|f_country|f_os|received_date      |
         * +------+-------------------+--------------+-----+---------+----+-------------------+
         * |Tryout|2020-04-01 00:00:00|item_guid_list|a    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|item_guid_list|b    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |c    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |d    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |e    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |f    |FR       |iOS |2020-04-01 00:00:00|
         * +------+-------------------+--------------+-----+---------+----+-------------------+
         */
hfsqlsce

hfsqlsce2#

使用spark sql和org.apache.spark.sql.functions可以获得相同的结果:

explode_events.select(
    $"e_key",
    $"f_timestamp_day",
    $"key",
    explode(split($"value","\\^")),
    $"f_country",
    $"f_os",
    $"received_date"
).show()
blmhpbnm

blmhpbnm3#

使用split函数拆分值列,使用array\u disticnct和explode函数实现结果。

from pyspark.sql.functions import *

# create df1

df1= df.withColumn("value",explode(array_distinct((split("VALUES","\\^")))))

相关问题