nlineinputformat在spark中不起作用

hc8w905p  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(355)

我想要的基本上是让每个数据元素由10行组成。但是,在下面的代码中,每个元素仍然是一行。我在这里犯了什么错误?

val conf = new SparkConf().setAppName("MyApp")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array[Class[_]](classOf[NLineInputFormat], classOf[LongWritable], 
 classOf[Text]))
val sc = new SparkContext(conf)

val c = new Configuration(sc.hadoopConfiguration)
c.set("lineinputformat.linespermap", 10);
val data = sc.newAPIHadoopFile(fname, classOf[NLineInputFormat], classOf[LongWritable], 
 classOf[Text], c)
fzsnzjdm

fzsnzjdm1#

NLineInputFormat 按照设计,它不会执行您期望的操作:
nlineinputformat,将n行输入拆分为一个拆分。(…)将输入文件分割为默认值,将一行作为值提供给一个Map任务。
如您所见,它修改了分割(spark术语中的分区)的计算方式,而不是记录的确定方式。
如果描述不清楚,我们可以用下面的例子来说明:

def nline(n: Int, path: String) = {
  val sc = SparkContext.getOrCreate
  val conf = new Configuration(sc.hadoopConfiguration)
  conf.setInt("mapreduce.input.lineinputformat.linespermap", n);

  sc.newAPIHadoopFile(path,
    classOf[NLineInputFormat], classOf[LongWritable], classOf[Text], conf
  )
}

require(nline(1, "README.md").glom.map(_.size).first == 1)
require(nline(2, "README.md").glom.map(_.size).first == 2)
require(nline(3, "README.md").glom.map(_.size).first == 3)

如上所示,每个分区(可能不包括最后一个分区)正好包含n行。
虽然您可以尝试对其进行改装以适合您的情况,但对于较小的 linespermap 参数。

相关问题