scala Spark Dataframe:如何添加索引列:分布式数据索引

izj3ouym  于 2023-04-12  发布在  Scala
关注(0)|答案(8)|浏览(109)

我从csv文件中读取数据,但没有索引。
我想添加一个从1到行号的列。
我该怎么办,谢谢(scala)

wf82jlnq

wf82jlnq1#

在Scala中,你可以用途:

import org.apache.spark.sql.functions._ 

df.withColumn("id",monotonicallyIncreasingId)

你可以参考exemple和scala文档。
使用Pyspark,您可以用途:

from pyspark.sql.functions import monotonically_increasing_id 

df_index = df.select("*").withColumn("id", monotonically_increasing_id())
syqv5f0l

syqv5f0l2#

monotonically_increasing_id-生成的ID保证单调递增且唯一,但不连续。

  • “我想从1到行号添加一列。"*

假设我们有以下DF

+--------+-------------+-------+
| userId | productCode | count |
+--------+-------------+-------+
|     25 |        6001 |     2 |
|     11 |        5001 |     8 |
|     23 |         123 |     5 |
+--------+-------------+-------+

生成从1开始的ID

val w = Window.orderBy("count")
val result = df.withColumn("index", row_number().over(w))

这将添加一个索引列,该列按count的递增值排序。

+--------+-------------+-------+-------+
| userId | productCode | count | index |
+--------+-------------+-------+-------+
|     25 |        6001 |     2 |     1 |
|     23 |         123 |     5 |     2 |
|     11 |        5001 |     8 |     3 |
+--------+-------------+-------+-------+
g6ll5ycj

g6ll5ycj3#

如何获取sequential id columnid[1,2,3,4...n]:

from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

df_with_seq_id = df.withColumn('index_column_name', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)

注意,row_number()从1开始,因此如果需要索引为0的列,则减去1

u0njafvf

u0njafvf4#

:上面的方法没有给予序列号,但它确实给出了递增的id。

简单的方法来做到这一点,并确保索引的顺序如下.. zipWithIndex
样本数据。

+-------------------+
|               Name|
+-------------------+
|     Ram Ghadiyaram|
|        Ravichandra|
|              ilker|
|               nick|
|             Naveed|
|      Gobinathan SP|
|Sreenivas Venigalla|
|     Jackela Kowski|
|   Arindam Sengupta|
|            Liangpi|
|             Omar14|
|        anshu kumar|
+-------------------+
package com.example

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row}

/**
  * DistributedDataIndex : Program to index an RDD  with
  */
object DistributedDataIndex extends App with Logging {

  val spark = builder
    .master("local[*]")
    .appName(this.getClass.getName)
    .getOrCreate()

  import spark.implicits._

  val df = spark.sparkContext.parallelize(
    Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick"
      , "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar"
    )).toDF("Name")
  df.show
  logInfo("addColumnIndex here")
  // Add index now...
  val df1WithIndex = addColumnIndex(df)
    .withColumn("monotonically_increasing_id", monotonically_increasing_id)
  df1WithIndex.show(false)

  /**
    * Add Column Index to dataframe to each row
    */
  def addColumnIndex(df: DataFrame) = {
    spark.sqlContext.createDataFrame(
      df.rdd.zipWithIndex.map {
        case (row, index) => Row.fromSeq(row.toSeq :+ index)
      },
      // Create schema for index column
      StructType(df.schema.fields :+ StructField("index", LongType, false)))
  }
}

结果:

+-------------------+-----+---------------------------+
|Name               |index|monotonically_increasing_id|
+-------------------+-----+---------------------------+
|Ram Ghadiyaram     |0    |0                          |
|Ravichandra        |1    |8589934592                 |
|ilker              |2    |8589934593                 |
|nick               |3    |17179869184                |
|Naveed             |4    |25769803776                |
|Gobinathan SP      |5    |25769803777                |
|Sreenivas Venigalla|6    |34359738368                |
|Jackela Kowski     |7    |42949672960                |
|Arindam Sengupta   |8    |42949672961                |
|Liangpi            |9    |51539607552                |
|Omar14             |10   |60129542144                |
|anshu kumar        |11   |60129542145                |
+-------------------+-----+---------------------------+
a0zr77ik

a0zr77ik5#

正如Ram所说,zippedwithindex比单调递增id更好,id你需要连续的行号。试试这个(PySpark环境):

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))

其中original_dataframe是您必须添加索引的 Dataframe ,row_with_index是具有列索引的新模式,可以写成

row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)

在这里,calendar_dateyear_week_numberyear_period_number和realization是我原始 Dataframe 的列。您可以将名称替换为列名。index是您必须为行号添加的新列名。

sy5wg1nm

sy5wg1nm6#

如果您需要为每一行指定一个唯一的序列号,我有一个稍微不同的方法,即添加一个静态列,并使用该列计算行号。

val srcData = spark.read.option("header","true").csv("/FileStore/sample.csv")
srcData.show(5)

+--------+--------------------+
|     Job|                Name|
+--------+--------------------+
|Morpheus|       HR Specialist|
|   Kayla|              Lawyer|
|  Trisha|          Bus Driver|
|  Robert|Elementary School...|
|    Ober|               Judge|
+--------+--------------------+

val srcDataModf = srcData.withColumn("sl_no",lit("1"))
val windowSpecRowNum =  Window.partitionBy("sl_no").orderBy("sl_no")

srcDataModf.withColumn("row_num",row_number.over(windowSpecRowNum)).drop("sl_no").select("row_num","Name","Job")show(5)

+-------+--------------------+--------+
|row_num|                Name|     Job|
+-------+--------------------+--------+
|      1|       HR Specialist|Morpheus|
|      2|              Lawyer|   Kayla|
|      3|          Bus Driver|  Trisha|
|      4|Elementary School...|  Robert|
|      5|               Judge|    Ober|
+-------+--------------------+--------+
wfsdck30

wfsdck307#

不要使用这个monotonically_increasing_id(),它会成为你的噩梦,数据偏斜会发生。

yx2lnoni

yx2lnoni8#

对于SparkR:
(假设sdf是某种spark Dataframe )
sdf<- withColumn(sdf, "row_id", SparkR:::monotonically_increasing_id())

相关问题