如何删除pyspark中不明确的列?

fzwojiic  于 2021-05-29  发布在  Spark
关注(0)|答案(3)|浏览(491)

有许多类似的问题,这是问一个不同的问题,以避免重复列连接;这不是我要问的。
假设我已经有了一个列不明确的Dataframe,如何删除一个特定的列?
例如,给定:

df = spark.createDataFrame(
    spark.sparkContext.parallelize([
        [1, 0.0, "ext-0.0"],
        [1, 1.0, "ext-1.0"],
        [2, 1.0, "ext-2.0"],
        [3, 2.0, "ext-3.0"],
        [4, 3.0, "ext-4.0"],
    ]),
    StructType([
        StructField("id", IntegerType(), True),
        StructField("shared", DoubleType(), True),
        StructField("shared", StringType(), True),
    ])
)

我只希望保留数字列。
但是,尝试 df.select("id", "shared").show() 结果:

raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "Reference 'shared' is ambiguous, could be: shared, shared.;"

这个问题的许多相关解决方案都只是“避免陷入这种情况”,例如使用 ['joinkey'] 而不是 a.joinkey = b.joinkey 在连接上。我重申,这里的情况并非如此;这与已经转换成这种形式的Dataframe有关。
df中的元数据消除了这些列的歧义:

$ df.dtypes
[('id', 'int'), ('shared', 'double'), ('shared', 'string')]

$ df.schema
StructType(List(StructField(id,IntegerType,true),StructField(shared,DoubleType,true),StructField(shared,StringType,true)))

所以数据被保存在内部。。。我就是不知道怎么用。
我如何选择一列而不是另一列?
我希望能使用。 col('shared#11') 或者类似的。。。但我看不到这样的东西?
这在spark中根本不可能吗?
为了回答这个问题,我想问,请发布a)一个解决上述问题的工作代码片段,或者b)链接到spark开发人员提供的一些官方信息,这些信息根本不受支持?

dz6r00yl

dz6r00yl1#

这似乎可以通过使用替换模式来实现 .rdd.toDf() 在Dataframe上。
不过,我还是会接受任何比下面的答案更复杂、更烦人的答案:

import random
import string
from pyspark.sql.types import DoubleType, LongType

def makeId():
    return ''.join(random.choice(string.ascii_lowercase) for _ in range(6))

def makeUnique(column):
    return "%s---%s" % (column.name, makeId())

def makeNormal(column):
    return column.name.split("---")[0]

unique_schema = list(map(makeUnique, df.schema))
df_unique = df.rdd.toDF(schema=unique_schema)
df_unique.show()

numeric_cols = filter(lambda c: c.dataType.__class__ in [LongType, DoubleType], df_unique.schema)
numeric_col_names = list(map(lambda c: c.name, numeric_cols))
df_filtered = df_unique.select(*numeric_col_names)
df_filtered.show()

normal_schema = list(map(makeNormal, df_filtered.schema))
df_fixed = df_filtered.rdd.toDF(schema=normal_schema)
df_fixed.show()

给予:

+-----------+---------------+---------------+
|id---chjruu|shared---aqboua|shared---ehjxor|
+-----------+---------------+---------------+
|          1|            0.0|        ext-0.0|
|          1|            1.0|        ext-1.0|
|          2|            1.0|        ext-2.0|
|          3|            2.0|        ext-3.0|
|          4|            3.0|        ext-4.0|
+-----------+---------------+---------------+

+-----------+---------------+
|id---chjruu|shared---aqboua|
+-----------+---------------+
|          1|            0.0|
|          1|            1.0|
|          2|            1.0|
|          3|            2.0|
|          4|            3.0|
+-----------+---------------+

+---+------+
| id|shared|
+---+------+
|  1|   0.0|
|  1|   1.0|
|  2|   1.0|
|  3|   2.0|
|  4|   3.0|
+---+------+
72qzrwbm

72qzrwbm2#

解决方法:只需重命名列(按顺序),然后执行您想执行的操作!

renamed_df = df.toDF("id", "shared_double", "shared_string")
nhhxz33t

nhhxz33t3#

解决这个问题最简单的方法是使用 df.toDF(...<new-col-names>...) ,但如果不想更改列名,则按类型将重复的列分组为 struct<type1, type2> 如下所示-
请注意,下面的解决方案是用scala编写的,但逻辑上类似的代码可以用python实现。此外,此解决方案还适用于Dataframe中的所有重复列-

1. 加载测试数据

val df = Seq((1, 2.0, "shared")).toDF("id", "shared", "shared")
    df.show(false)
    df.printSchema()
    /**
      * +---+------+------+
      * |id |shared|shared|
      * +---+------+------+
      * |1  |2.0   |shared|
      * +---+------+------+
      *
      * root
      * |-- id: integer (nullable = false)
      * |-- shared: double (nullable = false)
      * |-- shared: string (nullable = true)
      */

2. 获取所有重复的列名

// 1. get all the duplicated column names
    val findDupCols = (cols: Array[String]) => cols.map((_ , 1)).groupBy(_._1).filter(_._2.length > 1).keys.toSeq
    val dupCols = findDupCols(df.columns)
    println(dupCols.mkString(", "))
    // shared

3. 将重复列重命名为shared=>shared:string, shared:int,而不触及其他列名

val renamedDF = df
      // 2 rename duplicate cols like shared => shared:string, shared:int
      .toDF(df.schema
        .map{case StructField(name, dt, _, _) =>
          if(dupCols.contains(name)) s"$name:${dt.simpleString}" else name}: _*)

3. 创建所有列的结构

// 3. create struct of all cols
    val structCols = df.schema.map(f => f.name -> f  ).groupBy(_._1)
      .map{case(name, seq) =>
        if (seq.length > 1)
          struct(
            seq.map { case (_, StructField(fName, dt, _, _)) =>
              expr(s"`$fName:${dt.simpleString}` as ${dt.simpleString}")
            }: _*
          ).as(name)
        else col(name)
      }.toSeq
     val structDF = renamedDF.select(structCols: _*)

    structDF.show(false)
    structDF.printSchema()

    /**
      * +-------------+---+
      * |shared       |id |
      * +-------------+---+
      * |[2.0, shared]|1  |
      * +-------------+---+
      *
      * root
      * |-- shared: struct (nullable = false)
      * |    |-- double: double (nullable = false)
      * |    |-- string: string (nullable = true)
      * |-- id: integer (nullable = false)
      */

4. 使用<column\u name>

// Use the dataframe without losing any columns
    structDF.selectExpr("id", "shared.double as shared").show(false)
    /**
      * +---+------+
      * |id |shared|
      * +---+------+
      * |1  |2.0   |
      * +---+------+
      */

希望这对别人有用!

相关问题