自定义fileinputformat总是将一个filesplit分配给一个插槽

368yc8dk  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(414)

我一直在给我们的s3存储桶写协议记录。我想用flink数据集api来读取它。所以我实现了一个定制的fileinputformat来实现这一点。代码如下。

public class ProtobufInputFormat extends FileInputFormat<StandardLog.Pageview> {
    public ProtobufInputFormat() {
    }

    private transient boolean reachedEnd = false;

    @Override
    public boolean reachedEnd() throws IOException {
        return reachedEnd;
    }

    @Override
    public StandardLog.Pageview nextRecord(StandardLog.Pageview reuse) throws IOException {
        StandardLog.Pageview pageview = StandardLog.Pageview.parseDelimitedFrom(stream);
        if (pageview == null) {
            reachedEnd = true;
        }
        return pageview;
    }

    @Override
    public boolean supportsMultiPaths() {
        return true;
    }
}

public class BatchReadJob {

    public static void main(String... args) throws Exception {

        String readPath1 = args[0];

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ProtobufInputFormat inputFormat =  new ProtobufInputFormat();
        inputFormat.setNestedFileEnumeration(true);

        inputFormat.setFilePaths(readPath1);

        DataSet<StandardLog.Pageview> dataSource = env.createInput(inputFormat);

        dataSource.map(new MapFunction<StandardLog.Pageview, String>() {
            @Override
            public String map(StandardLog.Pageview value) throws Exception {
                return value.getId();
            }
        }).writeAsText("s3://xxx", FileSystem.WriteMode.OVERWRITE);
        env.execute();

    }

}

问题是flink总是将一个filesplit分配给一个并行槽。换句话说,它总是处理与并行数相同的文件分割数。
我想知道实现自定义fileinputformat的正确方法是什么。
谢谢。

00jrzges

00jrzges1#

我相信你看到的行为是因为 ExecutionJobVertex 呼叫 FileInputFormat. createInputSplits() 方法与 minNumSplits 等于顶点(数据源)平行度的参数。所以如果你想要一个不同的行为,那么你必须重写 createInputSplits 方法。
虽然你没说你想要什么样的行为。例如,如果您只想对每个文件进行一次拆分,则可以重写 testForUnsplittable() 方法的子类中 FileInputFormat 总是回到真实;它还应该设置(受保护的) unsplittable 布尔值为真。

相关问题