我有以下数据:
id | capacity | timestamp
-----------------------------------
1 | 35 | 2020-12-01 13:28:..
2 | 47 | 2020-12-01 13:28:..
3 | 101 | 2020-12-01 13:28:..
我需要用一个窗口片段计算每个id的平均容量。每2分钟我需要获得过去5分钟内每个id的容量平均值。我的代码如下,但没有工作。我定义了窗口,但数据不正确。有人能帮我吗?
def main():
directory = '/Users/jose/sizes'
spark_session = SparkSession \
.builder \
.master("local[2]") \
.appName("StreamingAveragePlazasLibresV2") \
.getOrCreate()
logger = spark_session._jvm.org.apache.log4j
logger.Lo
fields = [StructField("nombre", StringType(), True),
StructField("capacidad", IntegerType(), True),
StructField("libres", IntegerType(), True),
StructField("plazas_ocupadas", IntegerType(), True),
StructField("timestamp", TimestampType(), True)]
lines = spark_session \
.readStream \
.format("csv") \
.option('includeTimestamp', 'true') \
.schema(StructType(fields)) \
.load(directory) \
.withColumn("timestamp", current_timestamp()) \
lines.printSchema()
values = lines\
.groupBy(lines.nombre,lines.capacidad,lines.timestamp) \
.agg(functions.mean("plazas_ocupadas").alias("mean"))
#values.printSchema()
windowSize = 300
slideSize = 120
windowDuration = '{} seconds'.format(windowSize)
slideDuration = '{} seconds'.format(slideSize)
windowedCounts = values.groupBy(
window(values.timestamp, windowDuration, slideDuration),
values.mean
).count().orderBy('window')
windowedCounts.printSchema()
query = values \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
if __name__ == '__main__':
main()
1条答案
按热度按时间smdnsysy1#
我不明白你为什么用两次群比。
在没有测试数据的情况下,我建议合并两个groupby(两个printschema()语句之间的代码块):