如何使用pyspark对sparkDataframe中的一列进行排序?

thigvfpy  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(596)

我有一个sparkDataframe,看起来像这样:

|  time  | col1 | col2 |
|----------------------|
| 123456 |   2  |  A   |
| 123457 |   4  |  B   |
| 123458 |   7  |  C   |
| 123459 |   5  |  D   |
| 123460 |   3  |  E   |
| 123461 |   1  |  F   |
| 123462 |   9  |  G   |
| 123463 |   8  |  H   |
| 123464 |   6  |  I   |

现在我需要对“col1”列进行排序,但其他列必须保持相同的顺序:(使用pyspark)

|  time  | col1 | col2 | col1_sorted |
|-----------------------------------|
|  same  | same | same |   sorted   |
|-----------------------------------|
| 123456 |   2  |  A   |     1      |
| 123457 |   4  |  B   |     2      |
| 123458 |   7  |  C   |     3      |
| 123459 |   5  |  D   |     4      |
| 123460 |   3  |  E   |     5      |
| 123461 |   1  |  F   |     6      |
| 123462 |   9  |  G   |     7      |
| 123463 |   8  |  H   |     8      |
| 123464 |   6  |  I   |     9      |

提前感谢您的帮助!

tp5buhyn

tp5buhyn1#

对于spark 2.3.1,您可以尝试使用udf,如下所示(假设原始Dataframe按 time (列)

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType

schema = StructType.fromJson(df.schema.jsonValue()).add('col1_sorted', 'integer')

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def get_col1_sorted(pdf):
  return pdf.sort_values(['time']).assign(col1_sorted=sorted(pdf["col1"]))

df.groupby().apply(get_col1_sorted).show()
+------+----+----+-----------+
|  time|col1|col2|col1_sorted|
+------+----+----+-----------+
|123456|   2|   A|          1|
|123457|   4|   B|          2|
|123458|   7|   C|          3|
|123459|   5|   D|          4|
|123460|   3|   E|          5|
|123461|   1|   F|          6|
|123462|   9|   G|          7|
|123463|   8|   H|          8|
|123464|   6|   I|          9|
+------+----+----+-----------+
vecaoik1

vecaoik12#

假设df是具有实际值的Dataframe:

import copy
df_schema = copy.deepcopy(df.schema)
new_df = X.rdd.zipWithIndex().toDF(df_schema)
new_df = new_df.orderBy("col1")
df = df.withColumn("col1_sorted", new_df["col1"])
df.show()
zpgglvta

zpgglvta3#

我自己的解决方案如下:
首先用col1选择并排序的col1复制df:

df_copy = df.select("col1").orderBy("col1")

对两个Dataframe进行第二次索引:(对于df\u copy相同,只是使用windoworderby(“col1”))

w = Window.orderBy("time").rowsBetween(-sys.maxsize, 0)

df = df\
            .withColumn("helper", lit(1))\
            .withColumn("index", lit(0))\
            .withColumn("index", F.col("index")+F.sum(F.col("helper")).over(w))

最后一步,将col1重命名为col1\u排序并加入Dataframe

df_copy = df_copy.withColumnRenamed("col1", "col1_sorted")

df = df.join(df_copy, df.index == df_copy.index, how="inner")

相关问题