我有一个初始 Dataframe df
,如下所示:
+-------+---+-----+------------------+----+-------------------+
|gender| pro|share| prediction|week| forecast_units|
+------+----+-----+------------------+----+-------------------+
| Male|Polo| 0.01| 258.4054260253906| 37| 1809.0|
| Male|Polo| 0.1| 332.4026794433594| 38| 2327.0|
| Male|Polo| 0.15|425.97430419921875| 39| 2982.0|
| Male|Polo| 0.2| 508.3385314941406| 40| 3558.0|
....
我有下面的代码,尝试通过应用一些演算从原始 Dataframe 创建多个 Dataframe 。最初我创建了四个空的 Dataframe ,然后我想在四个不同的星期c_weeks
中循环,并将演算的结果保存到list_dfs
上的每个 Dataframe 中。
schema = StructType([\
StructField("gender", StringType(),True), \
StructField("pro",StringType(),True), \
StructField("units_1_tpr",DoubleType(),True), \
StructField("units_1'_tpr",DoubleType(),True), \
StructField("units_15_tpr",DoubleType(),True), \
StructField("units_20_tpr",DoubleType(),True)])
df_wk1 = spark.createDataFrame([],schema=schema)
df_wk2 = spark.createDataFrame([],schema=schema)
df_wk3 = spark.createDataFrame([],schema=schema)
df_wk4 = spark.createDataFrame([],schema=schema)
list_dfs = [df_wk1, df_wk2, df_wk3, df_wk4]
c_weeks = [37, 38, 39, 40]
for data,weeknum in zip(list_dfs, campaign_weeks):
data = df.filter(df.week == weeknum).groupBy(['gender', 'pro']).pivot("share").agg(first('forecast_units'))
最后, Dataframe 仍然是空的。如何解决这个问题?如果这种方法不可行,我如何实现我想要的?
1条答案
按热度按时间fhg3lkii1#
如果你把
df.filter(...
的结果赋给data
,它将会丢失(实际上,那一行没有任何作用)。但是,
df.filter(df.week == weeknum).groupBy(['gender', 'pro']).pivot("share").agg(first('forecast_units'))
创建的DataFrame
模式与您可能需要的模式不同(查看您的问题)。下面是您获得的DataFrame的一个示例:
这是它的模式: