将spark dfMap为(行\号、列\号、值)格式

9o685dep  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(402)

我有一个如下形状的Dataframe

1 2
5 9

如何将其转换为(row\u num,col\u num,value)格式

0 0 1
0 1 2
1 0 5
1 1 9

有什么方法可以应用一些函数或Map器吗?提前谢谢

aor9mmx1

aor9mmx11#

检查以下代码。

scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._

scala> val colExpr = array(df.columns.zipWithIndex.map(c => struct(lit(c._2).as("col_name"),col(c._1).as("value"))):_*)
colExpr: org.apache.spark.sql.Column = array(named_struct(col_name, 0 AS `col_name`, NamePlaceholder(), a AS `value`), named_struct(col_name, 1 AS `col_name`, NamePlaceholder(), b AS `value`))

scala> df.withColumn("row_number",lit(row_number().over(Window.orderBy(lit(1)))-1)).withColumn("data",explode(colExpr)).select($"row_number",$"data.*").show(false)
+----------+--------+-----+
|row_number|col_name|value|
+----------+--------+-----+
|0         |0       |1    |
|0         |1       |2    |
|1         |0       |5    |
|1         |1       |9    |
+----------+--------+-----+
0yg35tkg

0yg35tkg2#

您可以将数据转换为:

from pyspark.sql.functions import *
from pyspark.sql import Window
df = spark.createDataFrame([(1,2),(5,9)],['col1','col2'])

# renaming the columns based on their position

df = df.toDF(*list(map(lambda x: str(x),[*range(len(df.columns))])))

# Transposing the dataframe as required

col_list = ','.join([f'{i},`{i}`'for i in df.columns])
rows = len(df.columns)

df.withColumn('row_id',lit(row_number().over(Window.orderBy(lit(1)))-1)).select('row_id',
         expr(f'''stack({rows},{col_list}) as (col_id,col_value)''')).show()

+------+------+---------+
|row_id|col_id|col_value|
+------+------+---------+
|     0|     0|        1|
|     0|     1|        2|
|     1|     0|        5|
|     1|     1|        9|
+------+------+---------+
qhhrdooz

qhhrdooz3#

在pyspark中,行数()和位置分解将很有帮助。试试这个:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
tst= sqlContext.createDataFrame([(1,7,80),(1,8,40),(1,5,100),(5,8,90),(7,6,50),(0,3,60)],schema=['col1','col2','col3'])
tst1= tst.withColumn("row_number",F.row_number().over(Window.orderBy(F.lit(1)))-1)

# %%

tst_arr = tst1.withColumn("arr",F.array(tst.columns))
tst_new = tst_arr.select('row_number','arr').select('row_number',F.posexplode('arr'))

结果:

In [47]: tst_new.show()
+----------+---+---+
|row_number|pos|col|
+----------+---+---+
|         0|  0|  1|
|         0|  1|  7|
|         0|  2| 80|
|         1|  0|  1|
|         1|  1|  8|
|         1|  2| 40|
|         2|  0|  1|
|         2|  1|  5|
|         2|  2|100|
|         3|  0|  5|
|         3|  1|  8|
|         3|  2| 90|
|         4|  0|  7|
|         4|  1|  6|
|         4|  2| 50|
|         5|  0|  0|
|         5|  1|  3|
|         5|  2| 60|
+----------+---+---+

相关问题