当我的批量大小达到n时,我需要专门对spark streaming执行聚合:
例如,当我运行以下代码时:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.readStream.option('cleanSource', 'delete')\
.option('maxFilesPerTrigger', n)\
.json('data/stream/')
def func(batch_df, batch_id):
results = batch_df.select(aggregation_code).collect()[0]
json.dump(results.asDict(), open(f'data/aggregates/{batch_id}.json', 'w'))
df.writeStream.foreachBatch(func)
我需要确保每个批量大小正好是n大时,我只能保证它不会大于n
暂无答案!
目前还没有任何答案,快来回答吧!