如何在PySpark DF中使用来自statmodels的seasonal_decompose/如何将DataFrame转换为时间序列数组

dgiusagp  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(145)

我有一个包含两列的spark Dataframe(time_key,日期间隔7天,数量为7),如下所示:
| 时间键|数量|
| --|--|
| 2023年10月26日星期五| 10 |
| 2023年10月19日| 12 |
| 2023年10月12日| 14 |
我尝试在PySpark中使用“statmodels”库中的函数“seasonary_decomposition”。我知道如何在Pandas中使用,我可以转换索引中的time_key列,函数“seasonary_decomposition”工作得很好。但是我尝试在PySpark中使用它。我想我有一个选择(我这样说是因为函数的文档:https://shorturl.at/ovC35):将DF转换为array_like时间序列?
我尝试了以下方法:
1.我将time_key列转换为日期类型(使用函数“to_date”,最初该列为字符串类型),然后尝试创建如下数组:

time_series_array = [(row.time_key, row.qty_total) for row in df_iter.collect()]

字符串
我得到了这个:

[(datetime.date(2020, 11, 1), 0.0), (datetime.date(2020, 8, 30), 0.0), ... ]


然后我应用了这个函数:

seasonal_decompose(time_series_array)


但是我得到了这个错误:

TypeError: float() argument must be a string or a number, not 'datetime.date'


我想“好吧,也许问题出在转换成数据类型上,那就让我把它保存成字符串吧”。
1.把它保存为字符串,得到的数组是这样的:
[(“2020年11月1日”,0.0),(“2020年8月30日”,0.0),...]
但是在将数组应用于函数之后,我得到了这个错误:

ValueError: could not convert string to float: '2020-11-01'


我该如何解决这个问题?如何将原始DF转换为函数“seasmonary_decomposition”?或者,如何将原始DF转换为时间序列数组?PySpark中是否有一个函数可以执行相同的操作?

ztyzrc3y

ztyzrc3y1#

下面是一个如何使用statsmodels的工作示例。

import numpy as np
import pandas as pd
from statsmodels.tsa.seasonal import seasonal_decompose
import matplotlib.pyplot as plt

def print_pandas(dataframe_given):
    with pd.option_context('display.max_rows', None,'display.max_columns', None, 'expand_frame_repr', False):
        print("Given pandas dataframe name")
        print(dataframe_given)

np.random.seed(42)
time = np.arange(122)
trend = time * 0.1
seasonal = 5 * np.sin(time * 2 * np.pi / 12)
residual = np.random.randn(time.shape[0]) * 2
data = trend + seasonal + residual
date_rng = pd.date_range(start='2020-01-01', end='2020-05-01', freq='D')
ts = pd.Series(data, date_rng)

print_pandas(ts)

result = seasonal_decompose(ts, model='additive')

result.plot()
plt.show()

字符串
将绘图输出为图像:


的数据

相关问题