查找最大值和平均值的IDSpark源 Dataframe

vxbzzdmp  于 2022-10-02  发布在  Python
关注(0)|答案(2)|浏览(169)

如何使用pyspark dataframe找到另一列的最大值和平均值的id?

df

+-------------+-------+----------+---------+----------+--------+------+------------------+
|ChargingEvent|   CPID| StartDate|StartTime|   EndDate| EndTime|Energy|    PluginDuration|
+-------------+-------+----------+---------+----------+--------+------+------------------+
|     16673806|AN11719|2017-12-31| 14:46:00|2017-12-31|18:00:00|   2.4|3.2333333333333334|
|     16670986|AN01706|2017-12-31| 11:25:00|2017-12-31|13:14:00|   6.1|1.8166666666666667|
|      3174961|AN18584|2017-12-31| 11:26:11|2018-01-01|12:54:11|    24|25.466666666666665|

当前代码:

df.agg({'PluginDuration': 'max'}).show()
df.agg({'PluginDuration': 'avg'}).show()

然后将所有列重命名为如下所示的预期结果:

+-------------------+-------------------+------------------+
|id                 |max_value          |avg_value
+-------------------+-------------------+-------------------+
| QWER              |96.26              |12.35              |

idCPID重命名。max_valueavg_value需要四舍五入到小数点后两位。

8yparm6h

8yparm6h1#

我基本上采用了一种SQL方法来适应DataFrame方法,这是可行的并回答了这个问题。

from pyspark.sql import functions as F

# get the max and average values from the column

mx = df.agg({'PluginDuration':'max'}).collect()[0][0]
av = df.agg({'PluginDuration':'avg'}).collect()[0][0]

# add max and avg olumns, then select cols with rename

# and then sort by value and limit to top

df
  .withColumn('max_value', F.lit(round(mx,2)))
  .withColumn('avg_value', F.lit(round(av,2)))
  .sort('PluginDuration', ascending = False)
  .selectExpr('CPID as id', 'max_value', 'avg_value')
  .limit(1)
  .show()
nfs0ujit

nfs0ujit2#

def extract(self):
    df = self.spark_session.read.csv(self.input_path,header=True)

    return df

def transform(self, df):

    from pyspark.sql.functions import sum, avg, max
    from pyspark.sql.functions import col

    df1=df.withColumn('PluginDuration',
                      col('PluginDuration').cast('double'))

    df2=df1.groupBy("CPID") 
        .agg(max("PluginDuration").alias("max_duration"),avg("PluginDuration").alias("avg_duration"))

    #df2.show(truncate=False)

    df3 = df2.select(col("CPID").alias("chargepoint_id"),
                    func.round(df2["max_duration"], 2).alias("max_duration"),
                    func.round(df2["avg_duration"], 2).alias("avg_duration"))

    #df3.show(truncate=False)

    return df3

def load(self, df):
    df.write.parquet(self.output_path)

相关问题