Apache Spark 将DataFrame中的空字符串替换为None/null值

laawzig2  于 2023-05-01  发布在  Apache
关注(0)|答案(9)|浏览(327)

我有Spark 1。5.0 DataFrame在同一列中混合了null和空字符串。我想将所有列中的所有空字符串转换为nullNone,在Python中)。DataFrame可能有数百个列,所以我试图避免对每一列进行硬编码操作。
请参阅下面的尝试,结果出现错误。

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## Create a test DataFrame
testDF = sqlContext.createDataFrame([Row(col1='foo', col2=1), Row(col1='', col2=2), Row(col1=None, col2='')])
testDF.show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## |    |   2|
## |null|null|
## +----+----+

## Try to replace an empty string with None/null
testDF.replace('', None).show()
## ValueError: value should be a float, int, long, string, list, or tuple

## A string value of null (obviously) doesn't work...
testDF.replace('', 'null').na.drop(subset='col1').show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## |null|   2|
## +----+----+
myzjeezk

myzjeezk1#

就这么简单:

from pyspark.sql.functions import col, when

def blank_as_null(x):
    return when(col(x) != "", col(x)).otherwise(None)

dfWithEmptyReplaced = testDF.withColumn("col1", blank_as_null("col1"))

dfWithEmptyReplaced.show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## |null|   2|
## |null|null|
## +----+----+

dfWithEmptyReplaced.na.drop().show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## +----+----+

如果您想填充多个列,您可以例如减少:

to_convert = set([...]) # Some set of columns

reduce(lambda df, x: df.withColumn(x, blank_as_null(x)), to_convert, testDF)

或使用理解:

exprs = [
    blank_as_null(x).alias(x) if x in to_convert else x for x in testDF.columns]

testDF.select(*exprs)

如果你想具体操作字符串字段,请检查the answer by robin-loxley

voj3qocg

voj3qocg2#

UDF的效率不是很高。使用内置方法的正确方法是:

df = df.withColumn('myCol', when(col('myCol') == '', None).otherwise(col('myCol')))
kzipqqlq

kzipqqlq3#

简单地加上zero323和soulmachine的答案。转换所有StringType字段。

from pyspark.sql.types import StringType
string_fields = []
for i, f in enumerate(test_df.schema.fields):
    if isinstance(f.dataType, StringType):
        string_fields.append(f.name)
r6vfmomb

r6vfmomb4#

我的解决方案比我迄今为止看到的所有解决方案都要好得多,它可以处理尽可能多的字段,请参阅以下小函数:

// Replace empty Strings with null values
  private def setEmptyToNull(df: DataFrame): DataFrame = {
    val exprs = df.schema.map { f =>
      f.dataType match {
        case StringType => when(length(col(f.name)) === 0, lit(null: String).cast(StringType)).otherwise(col(f.name)).as(f.name)
        case _ => col(f.name)
      }
    }

    df.select(exprs: _*)
  }

你可以很容易地在Python中重写上面的函数。
这一招是跟@连城学的

kqhtkvqz

kqhtkvqz5#

如果你使用的是Python,你可以检查以下内容。

+----+-----+----+
|  id| name| age|
+----+-----+----+
|null|name1|  50|
|   2|     |    |
|    |name3|null|
+----+-----+----+

def convertToNull(dfa):
   for i in dfa.columns:
    dfa = dfa.withColumn(i , when(col(i) == '', None ).otherwise(col(i)))
  return dfa

convertToNull(dfa).show()

+----+-----+----+
|  id| name| age|
+----+-----+----+
|null|name1|  50|
|   2| null|null|
|null|name3|null|
+----+-----+----+
hk8txs48

hk8txs486#

我会在@zero323的solution中添加一个trim来处理多白色的情况:

def blank_as_null(x):
    return when(trim(col(x)) != "", col(x))
2sbarzqh

2sbarzqh7#

感谢@zero323、@Tomerikoo和@Robin Loxley
即用功能:

def convert_blank_to_null(df, cols=None):
    from pyspark.sql.functions import col, when, trim
    from pyspark.sql.types import StringType

    def blank_as_null(x):
        return when(trim(col(x)) == "", None).otherwise(col(x))
    # Don't know how to parallel
    for f in (df.select(cols) if cols else df).schema.fields:
        if isinstance(f.dataType, StringType):
            df = df.withColumn(f.name, blank_as_null(f.name))
    return df
ktecyv1j

ktecyv1j8#

这有助于我净化价值观。
对于所有列:

address_sanitize_df = address_df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in address_df.columns]).distinct()
address_sanitize_df.show()

对于特定列:

sanitize_cols=["address_line2","zip4"]
address_sanitize_df = address_df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in sanitize_cols])
address_sanitize_df.show()
czq61nw1

czq61nw19#

这是soulmachine解决方案的不同版本,但我不认为你可以很容易地将其翻译成Python:

def emptyStringsToNone(df: DataFrame): DataFrame = {
  df.schema.foldLeft(df)(
    (current, field) =>
      field.dataType match {
        case DataTypes.StringType =>
          current.withColumn(
            field.name,
            when(length(col(field.name)) === 0, lit(null: String)).otherwise(col(field.name))
          )
        case _ => current
      }
  )
}

相关问题