我需要很多随机数,每行一个。结果应该是这样的:
24324 24324
4234234 4234234
1310313 1310313
...
所以我写了这个spark代码(对不起,我是spark和scala的新手):
import util.Random
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object RandomIntegerWriter {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: RandomIntegerWriter <num Integers> <outDir>")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark RandomIntegerWriter")
val spark = new SparkContext(conf)
val distData = spark.parallelize(Seq.fill(args(0).toInt)(Random.nextInt))
distData.saveAsTextFile(args(1))
spark.stop()
}
}
注意:现在我只想每行生成一个数字。
但似乎当数字的数量变大时,程序会报告一个错误。你知道这段代码吗?
谢谢您。
4条答案
按热度按时间hlswsv351#
在spark 1.4中,可以使用dataframe api执行以下操作:
ippsafx72#
尝试
它将在驱动端创建一个空集合,但在工作端生成许多随机整数。记录总数为:
numPartitions * recordsPerPartition
brvekthn3#
在spark 2.3.0中工作
vsdwdz234#
在Spark簇上运行
当前版本是在驱动程序的内存中具体化随机数的集合。如果该集合非常大,则驱动程序将耗尽内存。请注意,该版本没有使用spark的处理功能,因为它只在创建数据后使用spark保存数据。
假设我们在集群上工作,我们需要做的是在执行者之间分配生成数据所需的工作。一种方法是将原始算法转换为一个版本,通过在执行者之间分配工作,该版本可以跨集群工作:
在一台机器上运行
如果我们没有一个集群,而这个集群是在一台机器上运行的,那么问题是“为什么要使用spark?”。这个随机生成器过程基本上是i/o绑定的,可以在内存的o(1)中通过顺序地将随机数写入文件来完成。