如何在pyspark上按行分组并创建新列

1tu0hz3e  于 2022-11-21  发布在  Spark
关注(0)|答案(3)|浏览(176)

原始 Dataframe
| 标识符|电邮|姓名|
| - -|- -|- -|
| 一个|id1@first.com|约翰|
| 2个|id2@first.com|迈科|
| 2个|id2 @秒|迈科|
| 一个|id1@second.com|约翰|
我想转换成这个
| 标识符|电邮|电子邮件1|姓名|
| - -|- -|- -|- -|
| 一个|id1@first.com | id1@second.com|约翰|
| 2个|id2@first.com |id2 @秒|迈科|
这只是一个示例,我有非常大文件和超过60个列
即时消息使用

df = spark.read.option("header",True) \
        .csv("contatcs.csv", sep =',')

但与pyspark合作。

import pyspark.pandas as ps    

df = ps.read_csv('contacts.csv', sep=',')
df.head()

但我更喜欢spark.read,因为它是一个惰性评估,而PandasAPI不是

bgibtngc

bgibtngc1#

为了 在 Spark 中 做到 这 一 点 , 你 必须 有 一些 规则 来 确定 哪个 电子 邮件 是 第 一 个 , 哪个 是 第 二 个 。 当 你 使用 Spark 时 , CSV 文件 中 的 行 顺序 ( 没有 指定 行号 的 列 ) 是 一 个 不 好 的 规则 , 因为 每 一 行 都 可能 转到 不同 的 节点 , 这样 你 就 看不到 哪 一 行 是 第 一 个 还是 第 二 个 。
在 下面 的 示例 中 , 我 假设 规则 是 字母 顺序 , 所以 我 使用 collect_set 将 所有 电子 邮件 收集 到 一 个 数组 中 , 然后 使用 array_sort 对 它们 进行 排序 。
输入 :

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('1', 'id1@first.com', 'john'),
     ('2', 'id2@first.com', 'Maike'),
     ('2', 'id2@second', 'Maike'),
     ('1', 'id1@second.com', 'john')],
    ['id', 'email', 'name'])

中 的 每 一 个
脚本 :

emails = F.array_sort(F.collect_set('email'))
df = df.groupBy('id', 'name').agg(
    emails[0].alias('email0'),
    emails[1].alias('email1'),
)
df.show()
# +---+-----+-------------+--------------+
# | id| name|       email0|        email1|
# +---+-----+-------------+--------------+
# |  2|Maike|id2@first.com|    id2@second|
# |  1| john|id1@first.com|id1@second.com|
# +---+-----+-------------+--------------+

格式
如果 你 有 一 个 行号 , 比如 ...

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('1', '1', 'id1@first.com', 'john'),
     ('2', '2', 'id2@first.com', 'Maike'),
     ('3', '2', 'id2@second', 'Maike'),
     ('4', '1', 'id1@second.com', 'john')],
    ['row_number', 'id', 'email', 'name'])

格式
您 可以 使用 以下 选项 :
第 一 个

ngynwnxp

ngynwnxp2#

如果您希望使其动态化,以便根据最大电子邮件计数创建新的电子邮件计数,您可以尝试以下逻辑和代码

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('1', 'id1@first.com', 'john'),
     ('2', 'id2@first.com', 'Maike'),
     ('2', 'id2_3@first.com', 'Maike'),
     ('2', 'id2@second', 'Maike'),
     ('1', 'id1@second.com', 'john')],
    ['id', 'email', 'name'])

df.show()


+---+---------------+-----+
| id|          email| name|
+---+---------------+-----+
|  1|  id1@first.com| john|
|  2|  id2@first.com|Maike|
|  2|id2_3@first.com|Maike|
|  2|     id2@second|Maike|
|  1| id1@second.com| john|

解决方案

new = (   df.groupBy('id','name').agg(collect_set('email').alias('email') )#Collect unique emails
        .withColumn('x',max(size('email')).over(Window.partitionBy()))#Find the group with maximum emails, for use in email column count
    )
     
new = (new.withColumn('email',F.struct(*[ F.col("email")[i].alias(f"email{i+1}") for i in range(new.select('x').collect()[0][0])]))#Convert email column to struct type
      .selectExpr('x','id','name','email.*') #Select all columns
     )
new.show(truncate=False)

结果

+---+---+-----+-------------+--------------+---------------+
|x  |id |name |email1       |email2        |email3         |
+---+---+-----+-------------+--------------+---------------+
|3  |1  |john |id1@first.com|id1@second.com|null           |
|3  |2  |Maike|id2@second   |id2@first.com |id2_3@first.com|
+---+---+-----+-------------+--------------+---------------+
dwbf0jvd

dwbf0jvd3#

物理公园

我已经包括了一个极端的情况,当有奇数个电子邮件id。为此,找到最大长度,并迭代提取每个索引的电子邮件:

from pyspark.sql import functions as F
df = spark.createDataFrame([(1, 'id1@first.com', 'john'),(2, 'id2@first.com', 'Maike'),(2, 'id2@second', 'Maike'),(1, 'id1@second.com', 'john'),(3, 'id3@third.com', 'amy'),], ['id', 'email', 'name'])

df = df.groupby("id", "name").agg(F.collect_list("email").alias("email"))
max_len = df.select(F.size("email").alias("size")).collect()[0]["size"]
for i in range(1, max_len + 1):
  df = df.withColumn(f"email{i}", F.when(F.size("email") >= i, F.element_at("email", i)).otherwise(F.lit("")))
df = df.drop("email")

输出量:

+---+-----+-------------+--------------+
|id |name |email1       |email2        |
+---+-----+-------------+--------------+
|2  |Maike|id2@first.com|id2@second    |
|3  |amy  |id3@third.com|              |
|1  |john |id1@first.com|id1@second.com|
+---+-----+-------------+--------------+

"Pandas"
既然您在标签中提到了Pandas,那么以下是Pandas的解决方案:

df = pd.DataFrame(data=[(1, 'id1@first.com', 'john'),(2, 'id2@first.com', 'Maike'),(2, 'id2@second', 'Maike'),(1, 'id1@second.com', 'john'),(3, 'id3@third.com', 'amy'),], columns=["id","email","name"])

df = df.groupby("id").agg(email=("email",list), name=("name",pd.unique))
df2 = df.apply(lambda row: pd.Series(data={f"email{i+1}":v for i,v in enumerate(row["email"])}, dtype="object"), axis=1)
df = df.drop("email", axis=1).merge(df2, on="id")

输出量:

name         email1          email2
id                                      
1    john  id1@first.com  id1@second.com
2   Maike  id2@first.com      id2@second
3     amy  id3@third.com             NaN

相关问题