python 如何将一个pyspark Dataframe 分割为两行

ntjbwcob  于 2023-01-01  发布在  Python
关注(0)|答案(7)|浏览(230)

我在数据积木工作。
我有一个包含500行的 Dataframe ,我想创建两个包含100行的 Dataframe ,另一个包含剩余的400行。

+--------------------+----------+
|              userid| eventdate|
+--------------------+----------+
|00518b128fc9459d9...|2017-10-09|
|00976c0b7f2c4c2ca...|2017-12-16|
|00a60fb81aa74f35a...|2017-12-04|
|00f9f7234e2c4bf78...|2017-05-09|
|0146fe6ad7a243c3b...|2017-11-21|
|016567f169c145ddb...|2017-10-16|
|01ccd278777946cb8...|2017-07-05|

我已尝试以下操作,但收到错误

df1 = df[:99]
df2 = df[100:499]

TypeError: unexpected item type: <type 'slice'>
bzzcjhmw

bzzcjhmw1#

一开始我误解了,以为你想对列进行切片,如果你想选择行的子集,一种方法是使用monotonically_increasing_id()创建一个索引列。
生成的ID保证单调递增且唯一,但不连续。
您可以使用此ID对 Dataframe 进行排序,并使用limit()将其划分为子集,以确保您得到的正是所需的行。
例如:

import pyspark.sql.functions as f
import string

# create a dummy df with 500 rows and 2 columns
N = 500
numbers = [i%26 for i in range(N)]
letters = [string.ascii_uppercase[n] for n in numbers]

df = sqlCtx.createDataFrame(
    zip(numbers, letters),
    ('numbers', 'letters')
)

# add an index column
df = df.withColumn('index', f.monotonically_increasing_id())

# sort ascending and take first 100 rows for df1
df1 = df.sort('index').limit(100)

# sort descending and take 400 rows for df2
df2 = df.sort('index', ascending=False).limit(400)

只是为了验证这是否符合您的要求:

df1.count()
#100
df2.count()
#400

我们还可以验证索引列是否不重叠:

df1.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
#+---+---+
#|min|max|
#+---+---+
#|  0| 99|
#+---+---+

df2.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
#+---+----------+
#|min|       max|
#+---+----------+
#|100|8589934841|
#+---+----------+
uxhixvfz

uxhixvfz2#

Spark Dataframe 不能像你写的那样被索引。你可以使用head方法来创建n个最前面的行。这将返回一个Row()对象的列表,而不是一个 Dataframe 。所以你可以把它们转换回 Dataframe ,然后使用subtract从原始 Dataframe 中获取其余的行。

#Take the 100 top rows convert them to dataframe 
#Also you need to provide the schema also to avoid errors
df1 = sqlContext.createDataFrame(df.head(100), df.schema)

#Take the rest of the rows
df2 = df.subtract(df1)

如果你使用spark 2.0+,你也可以使用SparkSession代替spark sqlContext。如果你对前100行不感兴趣,你想随机拆分,你可以这样使用randomSplit:

df1,df2 = df.randomSplit([0.20, 0.80],seed=1234)
goucqfw6

goucqfw63#

如果我不介意两个 Dataframe 中有相同的行,那么我可以使用sample。例如,我有一个354行的 Dataframe 。

>>> df.count()
354

>>> df.sample(False,0.5,0).count() //approx. 50%
179

>>> df.sample(False,0.1,0).count() //approx. 10%
34

或者,如果我想严格拆分而不出现重复项,我可以

df1 = df.limit(100)     //100 rows
df2 = df.subtract(df1)  //Remaining rows
sauutmhj

sauutmhj4#

试着这样做:

df1_list = df.collect()[:99] #this will return list    
df1 = spark.createDataFrame(df1) #convert it to spark dataframe

对此也类似:

df2_list = df.collect()[100:499]
df2 = spark.createDataFrame(df2)
yyyllmsg

yyyllmsg5#

在这两种解决方案中,我认为我们需要将第二句中的df1更改为df1_list,并将df2更改为df2_list

nfeuvbwi

nfeuvbwi6#

下面是我按行对 Dataframe 进行切片的解决方案:

def slice_df(df,start,end):
    return spark.createDataFrame(df.limit(end).tail(end - start))
h79rfbju

h79rfbju7#

在此提供一个简单得多的解决方案,更接近于所要求的解决方案:
(在Spark 2.4 +中工作)

# Starting
print('Starting row count:',df.count())
print('Starting column count:',len(df.columns))

# Slice rows
df2 = df.limit(3)
print('Sliced row count:',df2.count())

# Slice columns
cols_list = df.columns[0:1]
df3 = df.select(cols_list)
print('Sliced column count:',len(df3.columns))

相关问题