spark向dataframe添加索引并附加其他没有索引的数据集

yeotifhr  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(546)

我有一个具有列userid和索引值的数据集。

+---------+--------+
|  userid |   index|
+---------+--------+
|    user1|       1|
|    user2|       2|
|    user3|       3|
|    user4|       4|
|    user5|       5|
|    user6|       6|
|    user7|       7|
|    user8|       8|
|    user9|       9|
|   user10|      10|
+---------+--------+

我想给它添加一个新的Dataframe,并为新添加的列添加一个索引。这个 userid 是唯一的,并且现有Dataframe将不具有dataframe 2用户ID。

+----------+
|  userid  |
+----------+
|    user11|
|    user21| 
|    user41| 
|    user51| 
|    user64|
+----------+

预计新增产量 userid 和索引

+---------+--------+
|  userid |   index|
+---------+--------+
|    user1|        1|
|    user2|        2|
|    user3|        3|
|    user4|        4|
|    user5|        5|
|    user6|        6|
|    user7|        7|
|    user8|        8|
|    user9|        9|
|    user10|      10|
|    user11|      11|
|    user21|      12| 
|    user41|      13| 
|    user51|      14| 
|    user64|      15|
+---------+--------+

是否可以通过传递一个最大索引值并从给定的索引值开始第二个Dataframe的索引来实现这一点。

bvhaajcl

bvhaajcl1#

如果userid具有某种顺序,那么可以使用rownumber函数。即使没有,也可以使用单调递增的\u id()添加id。目前,我假设可以订购userid。然后你可以这样做:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
df_merge = df1.select('userid').union(df2.select('userid'))
w=Window.orderBy('userid')
df_result = df_merge.withColumn('indexid',F.row_number().over(w))

编辑:在评论讨论之后。


# %% Test data and imports

import pyspark.sql.functions as F
from pyspark.sql import Window

df = sqlContext.createDataFrame([('a',100),('ab',50),('ba',300),('ced',60),('d',500)],schema=['userid','index'])
df1 = sqlContext.createDataFrame([('fgh',100),('ff',50),('fe',300),('er',60),('fi',500)],schema=['userid','dummy'])

# %%

# %% Merge the two dataframes, with a null columns as the index

df1=df1.withColumn('index', F.lit(None))
df_merge = df.select(df.columns).union(df1.select(df.columns))

# %%Define a window to arrange the newly added rows at the last and order them by userid

# %% The user id, even though random strings, can be ordered

w= Window.orderBy(F.col('index').asc_nulls_last(),F.col('userid'))# if possible add a partition column here, otherwise all your data will come in one partition, consider salting

# %% For the newly added rows, define index as the maximum value + increment of number of rows in main dataframe

df_final = df_merge.withColumn("index_new",F.when(~F.col('index').isNull(),F.col('index')).otherwise((F.last(F.col('index'),ignorenulls=True).over(w))+F.sum(F.lit(1)).over(w)))

# %% If number of rows in main dataframe is huge, then add an offset in the above line

df_final.show()
+------+-----+---------+
|userid|index|index_new|
+------+-----+---------+
|    ab|   50|       50|
|   ced|   60|       60|
|     a|  100|      100|
|    ba|  300|      300|
|     d|  500|      500|
|    er| null|      506|
|    fe| null|      507|
|    ff| null|      508|
|   fgh| null|      509|
|    fi| null|      510|
+------+-----+---------+

相关问题