在一列中获取spark dataframe的所有非空列

dgiusagp  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(733)

我需要从配置单元表中选择所有not nulls列并将它们插入hbase。例如,考虑下表:

Name      Place         Department  Experience
==============================================
Ram      | Ramgarh      |  Sales      |  14
Lakshman | Lakshmanpur  |Operations   | 
Sita     | Sitapur      |             |  14
Ravan    |              |             |  25

我必须将上表中的所有非空列写入hbase。因此,我编写了一个逻辑,以便在dataframe的一列中获取非空列,如下所示。“名称”列是必需的。

Name        Place       Department  Experience      Not_null_columns
================================================================================
Ram         Ramgarh     Sales        14            Name, Place, Department, Experience
Lakshman    Lakshmanpur Operations                 Name, Place, Department
Sita        Sitapur                  14            Name, Place, Experience
Ravan                                25            Name, Experience

现在,我的要求是在dataframe中创建一个列,在一个列中包含所有非空列的值,如下所示。

Name      Place        Department   Experience    Not_null_columns_values
Ram       Ramgarh      Sales        14           Name: Ram, Place: Ramgarh, Department: Sales, Experince: 14
Lakshman  Lakshmanpur  Operations                Name:    Lakshman, Place: Lakshmanpur, Department: Operations
Sita      Sitapur                   14           Name:    Sita, Place: Sitapur, Experience: 14
Ravan                               25           Name:    Ravan, Experience: 25

一旦我得到上面的df,我将把它写到hbase中,name作为键,last列作为值。
请告诉我是否有更好的方法来做这件事。

eit6fx6z

eit6fx6z1#

试试这个-

加载提供的测试数据

val data =
      """
        |Name    |  Place    |     Department | Experience
        |
        |Ram      | Ramgarh      |  Sales      |  14
        |
        |Lakshman | Lakshmanpur  |Operations   |
        |
        |Sita     | Sitapur      |             |  14
        |
        |Ravan   |              |              |  25
      """.stripMargin

    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
//      .option("nullValue", "null")
      .csv(stringDS)

    df.show(false)
    df.printSchema()
    /**
      * +--------+-----------+----------+----------+
      * |Name    |Place      |Department|Experience|
      * +--------+-----------+----------+----------+
      * |Ram     |Ramgarh    |Sales     |14        |
      * |Lakshman|Lakshmanpur|Operations|null      |
      * |Sita    |Sitapur    |null      |14        |
      * |Ravan   |null       |null      |25        |
      * +--------+-----------+----------+----------+
      *
      * root
      * |-- Name: string (nullable = true)
      * |-- Place: string (nullable = true)
      * |-- Department: string (nullable = true)
      * |-- Experience: integer (nullable = true)
      */

先转换struct,然后转换json

val x = df.withColumn("Not_null_columns_values",
      to_json(struct(df.columns.map(col): _*)))
    x.show(false)
    x.printSchema()

    /**
      * +--------+-----------+----------+----------+---------------------------------------------------------------------+
      * |Name    |Place      |Department|Experience|Not_null_columns_values                                              |
      * +--------+-----------+----------+----------+---------------------------------------------------------------------+
      * |Ram     |Ramgarh    |Sales     |14        |{"Name":"Ram","Place":"Ramgarh","Department":"Sales","Experience":14}|
      * |Lakshman|Lakshmanpur|Operations|null      |{"Name":"Lakshman","Place":"Lakshmanpur","Department":"Operations"}  |
      * |Sita    |Sitapur    |null      |14        |{"Name":"Sita","Place":"Sitapur","Experience":14}                    |
      * |Ravan   |null       |null      |25        |{"Name":"Ravan","Experience":25}                                     |
      * +--------+-----------+----------+----------+---------------------------------------------------------------------+
      */

相关问题