如何使用spark生成大量随机整数?

7d7tgy0s  于 2021-05-30  发布在  Hadoop
关注(0)|答案(4)|浏览(1252)

我需要很多随机数,每行一个。结果应该是这样的:

24324 24324
4234234 4234234
1310313 1310313
...

所以我写了这个spark代码(对不起,我是spark和scala的新手):

import util.Random

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object RandomIntegerWriter {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: RandomIntegerWriter <num Integers> <outDir>")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("Spark RandomIntegerWriter")
    val spark = new SparkContext(conf)
    val distData = spark.parallelize(Seq.fill(args(0).toInt)(Random.nextInt))
    distData.saveAsTextFile(args(1))
    spark.stop()
  }
}

注意:现在我只想每行生成一个数字。
但似乎当数字的数量变大时,程序会报告一个错误。你知道这段代码吗?
谢谢您。

hlswsv35

hlswsv351#

在spark 1.4中,可以使用dataframe api执行以下操作:

In [1]: from pyspark.sql.functions import rand, randn
In [2]: # Create a DataFrame with one int column and 10 rows.
In [3]: df = sqlContext.range(0, 10)
In [4]: df.show()
+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+

In [4]: # Generate two other columns using uniform distribution and normal distribution.
In [5]: df.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")).show()
+--+-------------------+--------------------+
|id|            uniform|              normal|
+--+-------------------+--------------------+
| 0| 0.7224977951905031| -0.1875348803463305|
| 1| 0.2953174992603351|-0.26525647952450265|
| 2| 0.4536856090041318| -0.7195024130068081|
| 3| 0.9970412477032209|  0.5181478766595276|
| 4|0.19657711634539565|  0.7316273979766378|
| 5|0.48533720635534006| 0.07724879367590629|
| 6| 0.7369825278894753| -0.5462256961278941|
| 7| 0.5241113627472694| -0.2542275002421211|
| 8| 0.2977697066654349| -0.5752237580095868|
| 9| 0.5060159582230856|  1.0900096472044518|
+--+-------------------+--------------------+
ippsafx7

ippsafx72#

尝试

val distData = spark.parallelize(Seq[Int](), numPartitions)
  .mapPartitions { _ => {
    (1 to recordsPerPartition).map{_ => Random.nextInt}.iterator
  }}

它将在驱动端创建一个空集合,但在工作端生成许多随机整数。记录总数为: numPartitions * recordsPerPartition

brvekthn

brvekthn3#

在spark 2.3.0中工作

Python  
df = spark.range(0, 10)

Scala
val df = spark.range(0, 10)
vsdwdz23

vsdwdz234#

在Spark簇上运行

当前版本是在驱动程序的内存中具体化随机数的集合。如果该集合非常大,则驱动程序将耗尽内存。请注意,该版本没有使用spark的处理功能,因为它只在创建数据后使用spark保存数据。
假设我们在集群上工作,我们需要做的是在执行者之间分配生成数据所需的工作。一种方法是将原始算法转换为一个版本,通过在执行者之间分配工作,该版本可以跨集群工作:

val numRecords:Int = ???
val partitions:Int = ???
val recordsPerPartition = numRecords / partitions // we are assuming here that numRecords is divisible by partitions, otherwise we need to compensate for the residual 

val seedRdd = sparkContext.parallelize(Seq.fill(partitions)(recordsPerPartition),partitions)
val randomNrs = seedRdd.flatMap(records => Seq.fill(records)(Random.nextInt))
randomNrs.saveAsTextFile(...)

在一台机器上运行

如果我们没有一个集群,而这个集群是在一台机器上运行的,那么问题是“为什么要使用spark?”。这个随机生成器过程基本上是i/o绑定的,可以在内存的o(1)中通过顺序地将随机数写入文件来完成。

import java.io._
def randomFileWriter(file:String, records:Long):Unit = {
    val pw = new PrintWriter(new BufferedWriter(new FileWriter(file)))
    def loop(count:Int):Unit = {
        if (count <= 0) () else {    
          pw.println(Random.nextInt)
          writeRandom(writer, count-1)
        }
    }
    loop(records)
    pw.close
}

相关问题