pandas 当使用pyspark读入数据集时,是否有一种方法可以对数据集进行子集化

2w2cym1i  于 2023-03-16  发布在  Spark
关注(0)|答案(1)|浏览(181)

我知道你可以用use sample返回一个随机的项目样本,但是当把一个csv文件作为一个 Dataframe 读入时,比如我们只读入一个指定的随机选择的特定数量的行吗?
有没有办法读入csv,但从该csv中随机选择100行。
我需要读入完整的文件吗?还是有其他方法。

# Import the SparkSession library
from pyspark.sql import SparkSession
  
# Create a spark session using getOrCreate() function
spark_session = SparkSession.builder.getOrCreate()
  
# Read the CSV file
data_frame=csv_file = spark_session.read.csv('/content/student_data.csv',
                            sep = ',', inferSchema = True, header = True)
  
# Extract random sample through sample function using 
# withReplacement (value=True) and fraction as arguments 
data_frame.sample(True,0.8).collect()
k3fezbri

k3fezbri1#

我想我的主要问题是你为什么要这样做?数据文件真实的大吗?就像你正在做的那样把它读入内存(RDD),然后采样。
spark背后的想法是不用担心文件I/O,是的,你可以写下面的python来读入一个行分隔符为CR/LF的文件。

#
# Not Recommended - read in file - line by line
#

with open('/content/student_data.csv') as f:
    while True:
        line = f.readline()
        if not line:
            break

下一个问题是,要随机抽取100行数据,需要多少行?是否每隔三行抽取一行?那么我们至少需要300行数据?如果只有250行,会发生什么情况?我建议不要走这条路,因为它确实不是随机抽取所有数据。
同样,如果数据大小是考虑的问题,那么我建议您将数据存储在分区文件中,我将向您展示如何选择分区,然后对分区进行采样并获取前100行。
Azure Databricks提供了示例数据集。我们将使用存储为分区CSV文件的航空公司数据集。

我们可以看到有1919个分区。

# pick a partition
n = random.randrange(1, 1919)
s = '{:05d}'.format(n)
print(f"This is the random partition = {s}")

# read data
path = "/databricks-datasets/airlines/part-" + s
df1 = spark.read.load(path,format="csv",sep=",",inferSchema="true",header="true" )

# get partition count
c = df1.count()
print(f"This is the whole partition count {c}")

# get sample count
df2 = df1.sample(True, 0.01)
c = df2.count()
print(f"This is the sample count {c}")

# grab first 100
df3 = df2.limit(100)
c = df3.count()
print(f"This is the first 100 rows {c}")

总而言之,对于非常大的数据集,您应该按数字N对数据进行分区,并随机选择一个分区,然后对数据进行采样和选择。这只需要8秒的时间。如果我们必须处理整个数据集,它将读取1919 x 600 K行,而不是600 K行。O会导致程序不能在执行器节点上分布和运行,简而言之,分区是你处理大数据集的朋友。

相关问题