windows在数据上切片spark python

w51jfk4q  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(426)

我有以下数据:

id | capacity | timestamp
-----------------------------------
1  |   35     |  2020-12-01 13:28:..
2  |   47     |  2020-12-01 13:28:..
3  |   101    |  2020-12-01 13:28:..

我需要用一个窗口片段计算每个id的平均容量。每2分钟我需要获得过去5分钟内每个id的容量平均值。我的代码如下,但没有工作。我定义了窗口,但数据不正确。有人能帮我吗?

def main():

    directory = '/Users/jose/sizes'

    spark_session = SparkSession \
        .builder \
        .master("local[2]") \
        .appName("StreamingAveragePlazasLibresV2") \
        .getOrCreate()

    logger = spark_session._jvm.org.apache.log4j
    logger.Lo

    fields = [StructField("nombre", StringType(), True),
              StructField("capacidad", IntegerType(), True),
              StructField("libres", IntegerType(), True),
              StructField("plazas_ocupadas", IntegerType(), True),
              StructField("timestamp", TimestampType(), True)]

    lines = spark_session \
        .readStream \
        .format("csv") \
        .option('includeTimestamp', 'true') \
        .schema(StructType(fields)) \
        .load(directory) \
        .withColumn("timestamp", current_timestamp()) \

    lines.printSchema()

    values = lines\
        .groupBy(lines.nombre,lines.capacidad,lines.timestamp) \
        .agg(functions.mean("plazas_ocupadas").alias("mean"))

    #values.printSchema()

    windowSize = 300
    slideSize  = 120

    windowDuration = '{} seconds'.format(windowSize)
    slideDuration = '{} seconds'.format(slideSize)

    windowedCounts = values.groupBy(
        window(values.timestamp, windowDuration, slideDuration),
        values.mean
    ).count().orderBy('window')

    windowedCounts.printSchema()

    query = values \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()

    query.awaitTermination()

if __name__ == '__main__':
    main()
smdnsysy

smdnsysy1#

我不明白你为什么用两次群比。
在没有测试数据的情况下,我建议合并两个groupby(两个printschema()语句之间的代码块):

windowSize = 300
slideSize  = 120

windowDuration = '{} seconds'.format(windowSize)
slideDuration = '{} seconds'.format(slideSize)

windowedCounts = lines\
                   .groupBy(
                           window(values.timestamp, windowDuration, slideDuration),
                           lines.nombre,
                           lines.capacidad,
                           )\
                   .agg(functions.mean("plazas_ocupadas").alias("mean"))

相关问题