spark驱动程序中hive记录的处理

shyt4zoc  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(352)

在我的用例中,我有一个包含100000条记录的配置单元表。每条记录表示一个必须处理的原始数据文件。对每个原始数据文件的处理将生成一个csv文件,其大小将在10mb到500mb之间变化。最终,这些csv文件将作为一个单独的进程填充到配置单元表中。在我的企业集群中,在hdfs中生成大量数据仍然是不可取的。因此,我更喜欢将这两个独立的进程合并到一个进程中,这样它们就可以处理5000条记录乘以5000条记录。
我的question:-
假设我的rdd引用了整个配置单元表,那么如何对每5000条记录执行原始数据处理步骤(类似于for循环,每次增加5000条记录)

kkbh8khc

kkbh8khc1#

一种方法是使用rdd的滑动功能。您可以在apachespark的mllib包中找到它。下面是你如何使用它。假设我们有一个包含1000个元素的rdd

val rdd = sc.parallelize(1 to 1000)
import org.apache.spark.mllib.rdd._
val newRdd = RDDFunctions.fromRDD(rdd)

// sliding by 10 (instead use 5000 or what you need)
val rddSlidedBy10 = newRdd.sliding(10, 10)

结果是这样的

Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(21, 22, 23, 24, 25, 26, 27, 28, 29, 30), Array(31, 32, 33, 34, 35, 36, 37, 38, 39, 40), Array(41, 42, 43, 44, 45, 46, 47, 48, 49, 50), Array(51, 52, 53, 54, 55, 56, 57, 58, 59, 60), Array(61, 62, 63, 64, 65, 66, 67, 68, 69, 70), Array(71, 72, 73, 74, 75, 76, 77, 78, 79, 80)

您可以在数组上创建foreach并将原始数据处理为csv

相关问题