val spark = SparkSession
.builder()
.appName("TenMillionsRows")
.master("local[*]")
.config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id","TenMillionsRows") // To silence Metrics warning
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
/**
* Returns a List of nums sentences
* @param sentence
* @param num
* @return
*/
def getList(sentence: String, num: Int) : List[String] = {
@tailrec
def loop(st: String,n: Int, acc: List[String]): List[String] = {
n match {
case num if num == 0 => acc
case _ => loop(st, n - 1, st :: acc)
}
}
loop(sentence,num,List())
}
/**
* Returns a Dataframe that is the union of nums dataframes
* @param lst
* @param num
* @return
*/
def getDataFrame(lst: List[String], num: Int): DataFrame = {
@tailrec
def loop (ls: List[String],n: Int, acc: DataFrame): DataFrame = {
n match {
case n if n == 0 => acc
case _ => loop(lst,n - 1, acc.union(sc.parallelize(ls).toDF("sentence")))
}
}
loop(lst, num, sc.parallelize(List(sentence)).toDF("sentence"))
}
val sentence = "hope for the best but prepare for the worst"
val lSentence = getList(sentence, 100000)
val dfs = getDataFrame(lSentence,100)
println(dfs.count())
// output: 10000001
dfs.write.orc("path_to_hdfs") // write dataframe to a orc file
// you can save the file as parquet, txt, json .......
// with dataframe.write
import org.apache.spark.sql.SaveMode
object GenerateTenMils {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
spark.conf.set("spark.sql.crossJoin.enabled","true") // Enable cross join
import spark.implicits._
//Create a DF with your sentence
val df = List("each line has the same sentence").toDF
//Create another Dataset with 10000000 records
spark.range(10000000)
.join(df) // Cross Join the dataframes
.coalesce(1) // Output to a single file
.drop("id") // Drop the extra column
.write
.mode(SaveMode.Overwrite)
.text("src/main/resources/tenMils") // Write as text file
}
}
3条答案
按热度按时间hk8txs481#
你可以试试这样的。
生成一个值为1到100k的列和一个值为1到100的列,并使用explode(列)分解这两个列。因为kryo buffer会抛出一个错误,所以不能生成一个值为10 mil的列。
我不知道这是不是最好的表演方式,但这是我现在能想到的最快的方式。
slmsl1lt2#
你可以采用这种方法。
tail recursive生成对象列表和dataframe,union生成大dataframe
希望这有帮助。
dgiusagp3#
您可以通过如下方式加入2个df来完成,也可以在内联中找到代码解释。