我有一个工作Pandas函数,但不知道如何翻译成dask在一个好的方式。
我在特定日期有某些产品的销售额,但我在数据集中缺少没有销售额的行:
import pandas as pd
data = {
"PRODUCT_ID": [1, 1, 1, 2, 2, 3],
"DATE": [
pd.Timestamp(year=2019, month=1, day=2),
pd.Timestamp(year=2019, month=1, day=4),
pd.Timestamp(year=2019, month=1, day=8),
pd.Timestamp(year=2019, month=1, day=3),
pd.Timestamp(year=2019, month=1, day=7),
pd.Timestamp(year=2019, month=1, day=5)
],
"SALES": [5, 3, 2, 8, 1, 7],
}
df = pd.DataFrame.from_dict(data)
开始数据框:
PRODUCT_ID DATE SALES
0 1 2019-01-02 5
1 1 2019-01-04 3
2 1 2019-01-08 2
3 2 2019-01-03 8
4 2 2019-01-07 1
5 3 2019-01-05 7
我的目标是填写2019年1月1日至2019年1月10日期间所有PRODUCT_ID的销售额。我发现. groupby(). reindex()可以在panda中为我完成这个任务:
min_date = pd.Timestamp(year=2019, month=1, day=1)
max_date = pd.Timestamp(year=2019, month=1, day=10)
dates = pd.date_range(start=min_date, end=max_date, name="DATE")
df2 = (df.set_index('DATE')
.groupby(['PRODUCT_ID'], as_index=True, sort=False)
.apply(lambda df: df.reindex(dates, fill_value=0)
.drop(columns=["PRODUCT_ID"]))
.reset_index(1, drop=False))
这将导致以下填充 Dataframe ,其中PRODUCT_ID作为索引:
DATE SALES
PRODUCT_ID
1 2019-01-01 0
1 2019-01-02 5
1 2019-01-03 0
1 2019-01-04 3
1 2019-01-05 0
1 2019-01-06 0
1 2019-01-07 0
1 2019-01-08 2
1 2019-01-09 0
1 2019-01-10 0
2 2019-01-01 0
2 2019-01-02 0
2 2019-01-03 8
2 2019-01-04 0
2 2019-01-05 0
2 2019-01-06 0
2 2019-01-07 1
2 2019-01-08 0
2 2019-01-09 0
2 2019-01-10 0
3 2019-01-01 0
3 2019-01-02 0
3 2019-01-03 0
3 2019-01-04 0
3 2019-01-05 7
3 2019-01-06 0
3 2019-01-07 0
3 2019-01-08 0
3 2019-01-09 0
3 2019-01-10 0
当然,这只是一个虚拟数据集。实际上,我有太多的数据无法使用Pandas,所以我使用了Dashk。然而,我不太能够在Dashk中复制上述行为。有什么建议吗?
我目前掌握的情况:
import dask.dataframe as dd
df = df.set_index("PRODUCT_ID")
ddf = dd.from_pandas(df, npartitions=2)
def custom_apply(df):
df = df.reset_index()
prd_id = df["PRODUCT_ID"].unique()[0]
df = df.set_index("DATE")
df = df.reindex(dates, fill_value=0)
df["PRODUCT_ID"] = prd_id
df = df.reset_index()
df = df.set_index("PRODUCT_ID")
print(df)
return df
ddf_group = ddf.groupby(["PRODUCT_ID"]).apply(custom_apply, meta=pd.DataFrame).compute()
通过custom_apply中的print语句,我看到创建的 Dataframe 具有所需的形状:
DATE SALES
PRODUCT_ID
1 2019-01-01 0
1 2019-01-02 5
1 2019-01-03 0
1 2019-01-04 3
1 2019-01-05 0
1 2019-01-06 0
1 2019-01-07 0
1 2019-01-08 2
1 2019-01-09 0
1 2019-01-10 0
然而,我得到了以下错误,我有点迷失了:AttributeError: 'Series' object has no attribute 'columns'
我也不知道apply函数是否可以扩展到更大的数据集,欢迎任何帮助。
1条答案
按热度按时间fd3cxomn1#
您应该使用重采样。最有效的解决方案是按产品ID进行分区,然后按map_partitions(lambda df:df.groupby(“产品标识”).重新取样(“D”).asfreq())