pyspark流:当你读stream.json时,你能设置一个最小的批大小吗?

6uxekuva  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(228)

当我的批量大小达到n时,我需要专门对spark streaming执行聚合:
例如,当我运行以下代码时:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.readStream.option('cleanSource', 'delete')\
                .option('maxFilesPerTrigger', n)\
                .json('data/stream/')

def func(batch_df, batch_id):
    results = batch_df.select(aggregation_code).collect()[0]  
    json.dump(results.asDict(), open(f'data/aggregates/{batch_id}.json', 'w'))

df.writeStream.foreachBatch(func)

我需要确保每个批量大小正好是n大时,我只能保证它不会大于n

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题