将文件读入sparkDataframe,然后使用randomsplit()时,对同一个文件进行多次(冗余?)读取

bqjvbblv  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(514)

我正在将一组文件读入pysparkDataframe,然后使用 randomSplit() 把它分成3个子集。我从日志中注意到,它似乎读取了数据集的每个文件4次。我的问题是,多次读取是否是必要的(或冗余的),如果不是,是否有一种方法可以缓存它以最小化每个文件需要读取的次数?
例子:
我的数据文件是这样设置的:

/somedir/data/ID=123/123_0.jsonl
/somedir/data/ID=123/123_1.jsonl
/somedir/data/ID=123/123_2.jsonl

/somedir/data/ID=456/456_0.jsonl
/somedir/data/ID=456/456_1.jsonl
/somedir/data/ID=456/456_2.jsonl

我的剧本:

df = spark.read.json('/somedir/data')
df = df.repartition('ID')
splits = df.randomSplit(weights=[0.6, 0.2, 0.2])

for df_split, label in zip(splits, ['train', 'test', 'validation']):
    df_split.write.partitionBy('ID').json(os.path.join('/somedir/split', label))

我在日志中注意到:

2020-06-01 00:48:33 INFO  FileScanRDD:54 - Reading File path: file:/somedir/data/ID=123/123_0.jsonl, range: 0-1000000, partition values: [empty row]
2020-06-01 00:48:33 INFO  FileScanRDD:54 - Reading File path: file:/somedir/data/ID=123/123_1.jsonl, range: 0-1000000, partition values: [empty row]
2020-06-01 00:48:33 INFO  FileScanRDD:54 - Reading File path: file:/somedir/data/ID=123/123_2.jsonl, range: 0-1000000, partition values: [empty row]
...some other lines...
2020-06-01 00:48:34 INFO  FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_0.jsonl, range: 0-1000000, partition values: [123]
2020-06-01 00:48:34 INFO  FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_1.jsonl, range: 0-1000000, partition values: [123]
2020-06-01 00:48:34 INFO  FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_2.jsonl, range: 0-1000000, partition values: [123]
...some other lines...
2020-06-01 00:48:35 INFO  FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_0.jsonl, range: 0-1000000, partition values: [123]
2020-06-01 00:48:35 INFO  FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_1.jsonl, range: 0-1000000, partition values: [123]
2020-06-01 00:48:35 INFO  FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_2.jsonl, range: 0-1000000, partition values: [123]
...some other lines...
2020-06-01 00:48:36 INFO  FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_0.jsonl, range: 0-1000000, partition values: [123]
2020-06-01 00:48:36 INFO  FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_1.jsonl, range: 0-1000000, partition values: [123]
2020-06-01 00:48:36 INFO  FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_2.jsonl, range: 0-1000000, partition values: [123]

...repeat for ID=456...

我试着加上这句话: splits = [df_split.persist() for df_split in splits] 在randomsplit()行之后,但在写循环之前,但似乎没有帮助。
有什么想法,谢谢!

wgx48brx

wgx48brx1#

看来我把persist()放错地方了。如果在spark.read行之后继续执行,则同一文件的读取次数将减少到2次(无论有多少个randomspilt子集)。

df = spark.read.json('/somedir/data')
df = df.persist()

相关问题