使用udf插值和填充pysparkDataframe

vq8itlhq  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(377)

我正在尝试使用pyspark udf在apachespark中插入和填充大量分组数据集中丢失的值。我需要一个数据集的所有天/秒在一个月里,缺失值要么插值或向前填充。这个问题的答案很接近,但我需要整个月的数据点,而不是时间戳序列的开始和结束。
方法可以是创建一个Dataframe,其中包含2018年2月的所有日期/秒,左键连接缺少数据的Dataframe,然后插入/ffill,但我不确定如何在pyspark udf中执行此操作。有什么建议吗?

from pyspark.sql.functions import to_timestamp
    from operator import attrgetter
    from pyspark.sql.types import StructType
    from pyspark.sql.functions import pandas_udf, PandasUDFType

    def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
        @pandas_udf(
            StructType(sorted(schema, key=attrgetter("name"))), 
            PandasUDFType.GROUPED_MAP)
        def _(pdf):
            pdf.set_index(timestamp_col, inplace=True)
            pdf = pdf.resample(freq).interpolate()
            pdf.ffill(inplace=True)
            pdf.reset_index(drop=False, inplace=True)
            pdf.sort_index(axis=1, inplace=True)
            return pdf
        return _

    df = spark.createDataFrame([
        ("John",   "2018-02-01 03:00:00", 60),  
        ("John",   "2018-02-10 12:03:00", 66),  
        ("John",   "2018-02-18 03:05:00", 70),  
        ("John",   "2018-02-23 14:08:00", 76),  
        ("Mo",     "2018-02-04 01:05:00", 10),  
        ("Mo",     "2018-02-08 14:07:00", 20),  
        ("Mo",     "2018-02-12 16:10:00", 35),  
        ("Mo",     "2018-02-23 21:11:00", 40),
    ], ("webID", "timestamp", "counts")).withColumn(
      "timestamp", to_timestamp("timestamp")
    )
df.groupBy("webID").apply(resample(df.schema, "1S")).show()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题