pyspark在分组applyinpandas中添加多列(更改架构)

mbzjlibv  于 2021-05-17  发布在  Spark
关注(0)|答案(2)|浏览(1099)

我在pandas中有一个处理时间序列的简单函数。我的问题是:我想应用它的时间序列的数量非常大。所以我想用pyspark来扩展它。
我面临两个问题:
必须显式地传递模式这能使隐式传递更加平滑吗?
代码失败: Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: 2 Actual: 3 --如何确保模式自动匹配?注意:我不想手动指定它(理想情况下)。在最坏的情况下-我可以将我的函数应用于单个时间序列,并将pandas dataframes数据类型的输出转换为预期的模式吗?

Pandas功能

import pandas as pd
from pandas import Timestamp

df = pd.DataFrame({'time':['2020-01-01 00:00', '2020-01-01 03:00', '2020-01-01 04:00', '2020-01-06 00:00'], 'category':['1','1','1','1'], 'value':[5, 8, 7, 2]})
df['time'] = pd.to_datetime(df['time'])
display(df)
print(df.shape)

def my_complex_function(pdf):
    # fill missing hours
    pdf = pdf.set_index('time').resample('H').mean().fillna(0)

    # some complex computation/business logic which is adding columns/simplified here:
    pdf['foo'] = 'bar'
    pdf['baz'] = 123
    return pdf

print(df.groupby(['category']).apply(my_complex_function).reset_index().shape)

Pypark函数

注意:spark的版本是:v3.0.1

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("anomlydetection").master("local[4]").config("spark.driver.memory", "2G").getOrCreate()
sdf = spark.createDataFrame(df)
sdf.printSchema()

def my_complex_function_spark(pdf: pd.DataFrame)-> pd.DataFrame:
    # fill missing hours
    pdf = pdf.set_index('time').resample('H').mean().fillna(0)

    # some complex computation/business logic which is adding columns/simplified here:
    pdf['foo'] = 'bar'
    pdf['baz'] = 123
    return pdf

# 1) can I somehow implicitly get a reference to the schema?

# 2) this function fails due to schema mismatch

sdf.groupby("category").applyInPandas(my_complex_function_spark, schema=df.schema).show()
f3temu5u

f3temu5u1#

为定义返回模式 ApplyInPandas ,一种更健壮的方法是使用p.s.t.structtype.fromjson方法和df.schema.jsonvalue方法,它可以从原始列保留所有现有的列属性(可空、元数据等)(注:不匹配 nullable 设置通常会导致不易检测的错误)
一些例子:
使用structtype.add方法追加新列,这是最常见的用例(示例):

from pyspark.sql.types import StructType

return_schema = StructType.fromJson(df.schema.jsonValue()) \
    .add('foo', 'string', False, "dummu string field") \
    .add('bar', 'integer')

删除现有列并附加新列:

return_schema = StructType.fromJson(df.drop("col1", "col2").schema.jsonValue()) \
    .add('foo', 'string')

设置与新列混合的列的任意顺序(仅对于主数据类型,请注意array、map、struct和嵌套数据类型):

return_schema = df.selectExpr("col1", "cast('1' as string) as foo", "nested_col", "int(NULL) as bar").schema

新列的注意事项, nullable 可以通过指定null或非null zero\u值来推断,请参见以下示例:

cast('1' as string) as foo -> nullable is False
string(NULL) as foo        -> nullable is True

int(123) as bar            -> nullable is False
cast(NULL as int) as bar   -> nullable is True

所以如果 metadata 不需要为所有新列指定属性,此方法很好,并提供了灵活性。一种特殊的情况是,返回模式没有改变,我们可以使用它 return_schema = df.schema . 上面没有元数据的第一个示例的模式可以是:

return_schema = df.selectExpr("*", "string('') as foo", "int(NULL) as bar").schema

对于复杂的数据类型,下面是一个带有 nullable 设置:


# specify `nullable` attribute of the StructField

my_structs = StructType().add('f1', 'int', False).add('f2', 'string')

# specify `nullable=False` for the StructType

return_schema = df.select("*", F.struct(F.lit(0),F.lit('1')).cast(my_structs).alias('s1')).schema

# specify `nullable=True` for the StructType

return_schema = df.select("*", F.lit(None).cast(my_structs).alias('s2')).schema

我的建议是:不需要手动指定return\u模式,这可能会很繁琐并且容易出错,使用来自现有模式的尽可能多的信息,并且不完全依赖于动态推断。

svgewumm

svgewumm2#

pyspark中udf的隐式模式?为解决方案提供了一个很好的提示。
这种方法在这里转换为以下内容(参见下面的代码)。我还需要为我的真实数据支持嵌套结构-并且需要执行更多的测试,以使这些工作以及。

def my_complex_function_spark(pdf: pd.DataFrame)-> pd.DataFrame:
    # fill missing hours

    pdf = pdf.set_index('time').resample('H').mean().fillna(0).reset_index()

    # some complex computation/business logic which is adding columns/simplified here:
    pdf['foo'] = 'bar'
    pdf['baz'] = 123
    return pdf

from pyspark.sql.types import *

mapping = {"float64": DoubleType,
           "object":StringType,
           "datetime64[ns]":TimestampType,
           "int64":IntegerType} # Incomplete - extend with your types.

def createUDFSchemaFromPandas(dfp):
  column_types  = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
  schema = StructType(column_types)
  return schema

# certainly use some kind of limit here for real data

df_pd = sdf.toPandas()
df_return = my_complex_function_spark(df_pd)
schema = createUDFSchemaFromPandas(df_return)
sdf.groupby("category").applyInPandas(my_complex_function_spark, schema=schema).show()

相关问题