我从csv文件中读取数据,但没有索引。我想添加一个从1到行号的列。我该怎么办,谢谢(scala)
wf82jlnq1#
在Scala中,你可以用途:
import org.apache.spark.sql.functions._ df.withColumn("id",monotonicallyIncreasingId)
你可以参考exemple和scala文档。使用Pyspark,您可以用途:
from pyspark.sql.functions import monotonically_increasing_id df_index = df.select("*").withColumn("id", monotonically_increasing_id())
syqv5f0l2#
monotonically_increasing_id-生成的ID保证单调递增且唯一,但不连续。
假设我们有以下DF
+--------+-------------+-------+ | userId | productCode | count | +--------+-------------+-------+ | 25 | 6001 | 2 | | 11 | 5001 | 8 | | 23 | 123 | 5 | +--------+-------------+-------+
生成从1开始的ID
val w = Window.orderBy("count") val result = df.withColumn("index", row_number().over(w))
这将添加一个索引列,该列按count的递增值排序。
+--------+-------------+-------+-------+ | userId | productCode | count | index | +--------+-------------+-------+-------+ | 25 | 6001 | 2 | 1 | | 23 | 123 | 5 | 2 | | 11 | 5001 | 8 | 3 | +--------+-------------+-------+-------+
g6ll5ycj3#
如何获取sequential id columnid[1,2,3,4...n]:
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id from pyspark.sql.window import Window df_with_seq_id = df.withColumn('index_column_name', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
注意,row_number()从1开始,因此如果需要索引为0的列,则减去1
u0njafvf4#
注:上面的方法没有给予序列号,但它确实给出了递增的id。
简单的方法来做到这一点,并确保索引的顺序如下.. zipWithIndex。样本数据。
zipWithIndex
+-------------------+ | Name| +-------------------+ | Ram Ghadiyaram| | Ravichandra| | ilker| | nick| | Naveed| | Gobinathan SP| |Sreenivas Venigalla| | Jackela Kowski| | Arindam Sengupta| | Liangpi| | Omar14| | anshu kumar| +-------------------+
package com.example import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row} /** * DistributedDataIndex : Program to index an RDD with */ object DistributedDataIndex extends App with Logging { val spark = builder .master("local[*]") .appName(this.getClass.getName) .getOrCreate() import spark.implicits._ val df = spark.sparkContext.parallelize( Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick" , "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar" )).toDF("Name") df.show logInfo("addColumnIndex here") // Add index now... val df1WithIndex = addColumnIndex(df) .withColumn("monotonically_increasing_id", monotonically_increasing_id) df1WithIndex.show(false) /** * Add Column Index to dataframe to each row */ def addColumnIndex(df: DataFrame) = { spark.sqlContext.createDataFrame( df.rdd.zipWithIndex.map { case (row, index) => Row.fromSeq(row.toSeq :+ index) }, // Create schema for index column StructType(df.schema.fields :+ StructField("index", LongType, false))) } }
结果:
+-------------------+-----+---------------------------+ |Name |index|monotonically_increasing_id| +-------------------+-----+---------------------------+ |Ram Ghadiyaram |0 |0 | |Ravichandra |1 |8589934592 | |ilker |2 |8589934593 | |nick |3 |17179869184 | |Naveed |4 |25769803776 | |Gobinathan SP |5 |25769803777 | |Sreenivas Venigalla|6 |34359738368 | |Jackela Kowski |7 |42949672960 | |Arindam Sengupta |8 |42949672961 | |Liangpi |9 |51539607552 | |Omar14 |10 |60129542144 | |anshu kumar |11 |60129542145 | +-------------------+-----+---------------------------+
a0zr77ik5#
正如Ram所说,zippedwithindex比单调递增id更好,id你需要连续的行号。试试这个(PySpark环境):
zippedwithindex
from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, LongType new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)]) zipped_rdd = **original_dataframe**.rdd.zipWithIndex() indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
其中original_dataframe是您必须添加索引的 Dataframe ,row_with_index是具有列索引的新模式,可以写成
row_with_index = Row( "calendar_date" ,"year_week_number" ,"year_period_number" ,"realization" ,"index" )
在这里,calendar_date,year_week_number,year_period_number和realization是我原始 Dataframe 的列。您可以将名称替换为列名。index是您必须为行号添加的新列名。
calendar_date
year_week_number
year_period_number
index
sy5wg1nm6#
如果您需要为每一行指定一个唯一的序列号,我有一个稍微不同的方法,即添加一个静态列,并使用该列计算行号。
val srcData = spark.read.option("header","true").csv("/FileStore/sample.csv") srcData.show(5) +--------+--------------------+ | Job| Name| +--------+--------------------+ |Morpheus| HR Specialist| | Kayla| Lawyer| | Trisha| Bus Driver| | Robert|Elementary School...| | Ober| Judge| +--------+--------------------+ val srcDataModf = srcData.withColumn("sl_no",lit("1")) val windowSpecRowNum = Window.partitionBy("sl_no").orderBy("sl_no") srcDataModf.withColumn("row_num",row_number.over(windowSpecRowNum)).drop("sl_no").select("row_num","Name","Job")show(5) +-------+--------------------+--------+ |row_num| Name| Job| +-------+--------------------+--------+ | 1| HR Specialist|Morpheus| | 2| Lawyer| Kayla| | 3| Bus Driver| Trisha| | 4|Elementary School...| Robert| | 5| Judge| Ober| +-------+--------------------+--------+
wfsdck307#
不要使用这个monotonically_increasing_id(),它会成为你的噩梦,数据偏斜会发生。
yx2lnoni8#
对于SparkR:(假设sdf是某种spark Dataframe )sdf<- withColumn(sdf, "row_id", SparkR:::monotonically_increasing_id())
sdf<- withColumn(sdf, "row_id", SparkR:::monotonically_increasing_id())
8条答案
按热度按时间wf82jlnq1#
在Scala中,你可以用途:
你可以参考exemple和scala文档。
使用Pyspark,您可以用途:
syqv5f0l2#
monotonically_increasing_id-生成的ID保证单调递增且唯一,但不连续。
假设我们有以下DF
生成从1开始的ID
这将添加一个索引列,该列按count的递增值排序。
g6ll5ycj3#
如何获取sequential id columnid[1,2,3,4...n]:
注意,row_number()从1开始,因此如果需要索引为0的列,则减去1
u0njafvf4#
注:上面的方法没有给予序列号,但它确实给出了递增的id。
简单的方法来做到这一点,并确保索引的顺序如下..
zipWithIndex
。样本数据。
结果:
a0zr77ik5#
正如Ram所说,
zippedwithindex
比单调递增id更好,id你需要连续的行号。试试这个(PySpark环境):其中original_dataframe是您必须添加索引的 Dataframe ,row_with_index是具有列索引的新模式,可以写成
在这里,
calendar_date
,year_week_number
,year_period_number
和realization是我原始 Dataframe 的列。您可以将名称替换为列名。index
是您必须为行号添加的新列名。sy5wg1nm6#
如果您需要为每一行指定一个唯一的序列号,我有一个稍微不同的方法,即添加一个静态列,并使用该列计算行号。
wfsdck307#
不要使用这个monotonically_increasing_id(),它会成为你的噩梦,数据偏斜会发生。
yx2lnoni8#
对于SparkR:
(假设sdf是某种spark Dataframe )
sdf<- withColumn(sdf, "row_id", SparkR:::monotonically_increasing_id())