我在pandas中有一个处理时间序列的简单函数。我的问题是:我想应用它的时间序列的数量非常大。所以我想用pyspark来扩展它。
我面临两个问题:
必须显式地传递模式这能使隐式传递更加平滑吗?
代码失败: Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: 2 Actual: 3
--如何确保模式自动匹配?注意:我不想手动指定它(理想情况下)。在最坏的情况下-我可以将我的函数应用于单个时间序列,并将pandas dataframes数据类型的输出转换为预期的模式吗?
Pandas功能
import pandas as pd
from pandas import Timestamp
df = pd.DataFrame({'time':['2020-01-01 00:00', '2020-01-01 03:00', '2020-01-01 04:00', '2020-01-06 00:00'], 'category':['1','1','1','1'], 'value':[5, 8, 7, 2]})
df['time'] = pd.to_datetime(df['time'])
display(df)
print(df.shape)
def my_complex_function(pdf):
# fill missing hours
pdf = pdf.set_index('time').resample('H').mean().fillna(0)
# some complex computation/business logic which is adding columns/simplified here:
pdf['foo'] = 'bar'
pdf['baz'] = 123
return pdf
print(df.groupby(['category']).apply(my_complex_function).reset_index().shape)
Pypark函数
注意:spark的版本是:v3.0.1
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("anomlydetection").master("local[4]").config("spark.driver.memory", "2G").getOrCreate()
sdf = spark.createDataFrame(df)
sdf.printSchema()
def my_complex_function_spark(pdf: pd.DataFrame)-> pd.DataFrame:
# fill missing hours
pdf = pdf.set_index('time').resample('H').mean().fillna(0)
# some complex computation/business logic which is adding columns/simplified here:
pdf['foo'] = 'bar'
pdf['baz'] = 123
return pdf
# 1) can I somehow implicitly get a reference to the schema?
# 2) this function fails due to schema mismatch
sdf.groupby("category").applyInPandas(my_complex_function_spark, schema=df.schema).show()
2条答案
按热度按时间f3temu5u1#
为定义返回模式
ApplyInPandas
,一种更健壮的方法是使用p.s.t.structtype.fromjson方法和df.schema.jsonvalue方法,它可以从原始列保留所有现有的列属性(可空、元数据等)(注:不匹配nullable
设置通常会导致不易检测的错误)一些例子:
使用structtype.add方法追加新列,这是最常见的用例(示例):
删除现有列并附加新列:
设置与新列混合的列的任意顺序(仅对于主数据类型,请注意array、map、struct和嵌套数据类型):
新列的注意事项,
nullable
可以通过指定null或非null zero\u值来推断,请参见以下示例:所以如果
metadata
不需要为所有新列指定属性,此方法很好,并提供了灵活性。一种特殊的情况是,返回模式没有改变,我们可以使用它return_schema = df.schema
. 上面没有元数据的第一个示例的模式可以是:对于复杂的数据类型,下面是一个带有
nullable
设置:我的建议是:不需要手动指定return\u模式,这可能会很繁琐并且容易出错,使用来自现有模式的尽可能多的信息,并且不完全依赖于动态推断。
svgewumm2#
pyspark中udf的隐式模式?为解决方案提供了一个很好的提示。
这种方法在这里转换为以下内容(参见下面的代码)。我还需要为我的真实数据支持嵌套结构-并且需要执行更多的测试,以使这些工作以及。