如何在flink的readhadoopfile中指定类型信息?

eiee3dmh  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(655)

我无法理解在从hadoop获取输入文件时如何在flink中指定类型信息。我试着这样做:

DataSet<Tuple2<LongWritable,Text>> data =
            env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, args[0], job, );

文档说它需要类型信息作为最后一个参数。但我不明白我该怎么做。有人能帮忙吗?

suzh9iv8

suzh9iv81#

编码如下:

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<LongWritable, Text> hadoopFile = org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile(
            new org.apache.hadoop.mapreduce.lib.input.TextInputFormat(), // extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat
            org.apache.hadoop.io.LongWritable.class,
            org.apache.hadoop.io.Text.class,
            inputPath);

    DataSet<Tuple2<LongWritable, Text>> input = env.createInput(hadoopFile);

除此之外,maven pom还需要这种依赖:

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hadoop-compatibility_2.10</artifactId>
        <version>${flink.version}</version>
 </dependency>
qgzx9mmu

qgzx9mmu2#

下面是一个如何在flink中使用hadoopinputformat的简短示例:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

final String inputPath = ...
final HadoopInputFormat<LongWritable, Text> hadoopFile = HadoopInputs.readHadoopFile(
        new TextInputFormat(),
        LongWritable.class,
        Text.class,
        inputPath);
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(hadoopFile);

相关问题