pyspark:how to 处理Dataframe的每一行

xvw2m8pv  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(416)

我想处理每一行Dataframe。这里是专栏 feat 包含许多格式为的元素 idx:value . 我想保持沉默 idx 我想要的。
例如,我想保留 idx=1 或者 idx=5 .

df = spark.createDataFrame([("u1","1:a 2:k 5:c 6:i"),("u2","2:k 4:p 5:b 6:k")],["id","feat"])
``` `Input:` ```
+---+---------------+
| id|           feat|
+---+---------------+
| u1|1:a 2:k 5:c 6:i|
| u2|2:k 4:p 5:b 6:k|
+---+---------------+
``` `Expected` :

+---+---------------+
| id| feat|
+---+---------------+
| u1|1:a 5:c |
| u2|5:b |
+---+---------------+

z2acfund

z2acfund1#

下面是我对几个函数的尝试。

df = spark.createDataFrame([("u1","1:a 2:k 5:c 6:i"),("u2","2:k 4:p 5:b 6:k")],["id","feat"])

select_idx=[1,5]

df.withColumn('feat', f.explode(f.split('feat', ' '))) \
  .withColumn('feat', f.split('feat', ':')) \
  .filter(f.col('feat')[0].isin(select_idx)) \
  .withColumn('feat', f.concat_ws(':', 'feat')) \
  .groupBy('id').agg(f.collect_list('feat').alias('feat')) \
  .withColumn('feat', f.concat_ws(' ', 'feat')) \
  .show(10, False)

+---+-------+
|id |feat   |
+---+-------+
|u1 |1:a 5:c|
|u2 |5:b    |
+---+-------+

相关问题