我正在努力解决将序列号添加到数据集这个古老的问题。我正在使用DataFrame,但似乎没有与RDD.zipWithIndex
等价的DataFrame。另一方面,以下内容或多或少地按照我希望的方式运行:
val origDF = sqlContext.load(...)
val seqDF= sqlContext.createDataFrame(
origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),
StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields)
)
在我的实际应用程序中,OrigDF不会直接从文件中加载--它将通过将2-3个其他DataFrame连接在一起来创建,它将包含超过1亿行。
有没有更好的方法来做这件事?我能做些什么来优化它?
9条答案
按热度按时间vsnjm48y1#
全唱全舞的dfZipWithIndex方法。您可以设置起始偏移量(默认为1)、索引列名称(默认为“id”),并将列放在前面或后面:
ia2d9nvy2#
从Spark 1.6开始,有一个名为*Monotonally_increating_id()的函数
它为每行生成一个具有唯一64位单调索引的新列
但这并不重要,每个分区开始一个新的范围,所以我们必须在使用之前计算每个分区的偏移量。
为了提供一个“无RDD”的解决方案,我得到了一些Collect(),但它只收集偏移量,每个分区一个值,所以它不会导致OOM
该解决方案不会重新打包原始行,也不会对原始庞大的 Dataframe 进行重新分区,因此它在现实世界中相当快:在240个核心上,在2分钟内读取200 GB的CSV数据(4300万行,150列),对其进行索引并打包到拼图上
在测试我的解决方案之后,我运行了Kirk Broadhurst's solution,它慢了20秒
您可能想要或不想使用
dfWithPartitionId.cache()
,这取决于任务bakd9h0s3#
从Spark 1.5开始,Spark中添加了
Window
表达式。现在可以使用org.apache.spark.sql.expressions.row_number
,而不必将DataFrame
转换为RDD
。请注意,我发现上面的dfZipWithIndex
算法的性能比下面的算法快得多。但我把它贴出来是因为:1.其他人会忍不住想试一试
1.也许有人可以优化下面的表达方式
无论如何,以下是对我有效的方法:
请注意,我使用
lit(1)
进行分区和排序--这使得所有内容都在同一个分区中,并且似乎保留了DataFrame
的原始顺序,但我认为这就是它速度变慢的原因。我在具有7,000,000行的4列
DataFrame
上测试了它,它与上面的dfZipWithIndex
之间的速度差异非常显著(正如我所说的,RDD
函数的速度要快得多)。b4lqfgs44#
PySpark版本:
我还创建了一个JIRA来在Spark中本地添加此功能:https://issues.apache.org/jira/browse/SPARK-23074
cetgtptt5#
@Evgeny,your solution很有趣。注意,当您有空分区时有一个错误(数组缺少这些分区索引,至少在我使用Spark 1.6时是这样),所以我将数组转换为Map(artitionId->Offsets)。
另外,我去掉了单调递增id的源代码,使每个分区中的“incid”从0开始。
以下是更新后的版本:
lhcgjxsq6#
我已经修改了@Tagar的版本,以便在Python3.7上运行,我想分享一下:
myzjeezk7#
Spark Java API版本:
我用Java实现了@Evgeny的solution,用于在DataFrames上执行zipWithIndex,并想分享代码。
它还包含@fylb在他的solution中提供的改进。我可以为Spark 2.4确认,当spark_artition_id()返回的条目不是以0开头或不按顺序递增时,执行失败。由于此函数被记录为不确定的,因此很可能会发生上述情况之一。一个例子是通过增加分区计数来触发。
Java实现如下:
7nbnzgx98#
以下是我的建议,其好处是:
DataFrame
的InternalRow
的任何序列化/反序列化[1]。RDD.zipWithIndex
。它的主要缺点是:
package org.apache.spark.sql;
以下。进口:
[1]:(从/到
InternalRow
的底层字节数组<-->GenericRow
的底层JVM对象集合Array[Any]
)。eeq64g8w9#
我已经将@canberker suggest移植到了Python3(Pyspark)。
此外,我没有使用带有散列Map的UDF,而是使用了Broadcast Join,这在测试期间略微提高了性能。
注意:此解决方案仍然存在由于空分区造成的缺口。