我有一个Spark数据框,大约有一百万条记录。我尝试将此 Dataframe 拆分成多个小 Dataframe ,其中每个 Dataframe 的最大行数为20,000(除了最后一个 Dataframe ,行数可能为20,000,也可能不是20,000)。你能帮我弄一下这个吗?谢谢。
ylamdve61#
好吧,也许不是最有效的方法,但在这里。您可以创建一个计算每一行的新列(以防您没有唯一的ID列)。在这里,我们基本上迭代整个 Dataframe 并选择大小为20k的批,将它们添加到DataFrame列表中。
import org.apache.spark.sql.functions._ import spark.implicits._ import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.monotonically_increasing_id var index = 0 val subsetSize = 20000 var listOfDF: List[DataFrame] = List() // withColumn optional if you already have a unique id per row val df = spark.table("your_table").withColumn("rowNum", monotonically_increasing_id()) def returnSubDF(fromIndex: Int, toIndex: Int) = { df.filter($"rowNum" >= fromIndex && $"rowNum" < toIndex) } while (index <= 1000000){ listOfDF = listOfDF :+ returnSubDF(index, index+subsetSize) index += subsetSize } listOfDF.head.show()
1条答案
按热度按时间ylamdve61#
好吧,也许不是最有效的方法,但在这里。您可以创建一个计算每一行的新列(以防您没有唯一的ID列)。在这里,我们基本上迭代整个 Dataframe 并选择大小为20k的批,将它们添加到DataFrame列表中。