如何在flink中使用hadoop的mapfileoutputformat?

yrwegjxp  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(412)

我在用apache flink编写程序时被卡住了。问题是我试图生成hadoop的mapfile作为计算的结果,但是scala编译器抱怨类型不匹配。
为了说明这个问题,让我向您展示下面的代码片段,它尝试生成两种输出:一种是hadoop的sequencefile,另一种是mapfile。

val dataSet: DataSet[(IntWritable, BytesWritable)] =
  env.readSequenceFile(classOf[Text], classOf[BytesWritable], inputSequenceFile.toString)
    .map(mapper(_))
    .partitionCustom(partitioner, 0)
    .sortPartition(0, Order.ASCENDING)

val seqOF = new HadoopOutputFormat(
  new SequenceFileOutputFormat[IntWritable, BytesWritable](), Job.getInstance(hadoopConf)
)

val mapfileOF = new HadoopOutputFormat(
  new MapFileOutputFormat(), Job.getInstance(hadoopConf)
)

val dataSink1 = dataSet.output(seqOF)  // it typechecks!
val dataSink2 = dataSet.output(mapfileOF) // syntax error

如上所述,dataset.output(mapfileof)导致scala编译器抱怨如下:

仅供参考,与sequencefile相比,mapfile需要一个更强的条件,即键必须是可写的。
在使用flink编写应用程序之前,我使用spark实现了它,如下所示,它工作正常(没有编译错误,运行正常,没有任何错误)。

val rdd = sc
  .sequenceFile(inputSequenceFile.toString, classOf[Text], classOf[BytesWritable])
  .map(mapper(_))
  .repartitionAndSortWithinPartitions(partitioner)

rdd.saveAsNewAPIHadoopFile(
  outputPath.toString,
  classOf[IntWritable],
  classOf[BytesWritable],
  classOf[MapFileOutputFormat]
)
mmvthczy

mmvthczy1#

你检查过:https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/hadoop_compatibility.html#using-hadoop输出格式
它包含以下示例:

// Obtain your result to emit.
val hadoopResult: DataSet[(Text, IntWritable)] = [...]

val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
  new TextOutputFormat[Text, IntWritable],
  new JobConf)

hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))

hadoopResult.output(hadoopOF)

相关问题