我一直在给我们的s3存储桶写协议记录。我想用flink数据集api来读取它。所以我实现了一个定制的fileinputformat来实现这一点。代码如下。
public class ProtobufInputFormat extends FileInputFormat<StandardLog.Pageview> {
public ProtobufInputFormat() {
}
private transient boolean reachedEnd = false;
@Override
public boolean reachedEnd() throws IOException {
return reachedEnd;
}
@Override
public StandardLog.Pageview nextRecord(StandardLog.Pageview reuse) throws IOException {
StandardLog.Pageview pageview = StandardLog.Pageview.parseDelimitedFrom(stream);
if (pageview == null) {
reachedEnd = true;
}
return pageview;
}
@Override
public boolean supportsMultiPaths() {
return true;
}
}
public class BatchReadJob {
public static void main(String... args) throws Exception {
String readPath1 = args[0];
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ProtobufInputFormat inputFormat = new ProtobufInputFormat();
inputFormat.setNestedFileEnumeration(true);
inputFormat.setFilePaths(readPath1);
DataSet<StandardLog.Pageview> dataSource = env.createInput(inputFormat);
dataSource.map(new MapFunction<StandardLog.Pageview, String>() {
@Override
public String map(StandardLog.Pageview value) throws Exception {
return value.getId();
}
}).writeAsText("s3://xxx", FileSystem.WriteMode.OVERWRITE);
env.execute();
}
}
问题是flink总是将一个filesplit分配给一个并行槽。换句话说,它总是处理与并行数相同的文件分割数。
我想知道实现自定义fileinputformat的正确方法是什么。
谢谢。
1条答案
按热度按时间00jrzges1#
我相信你看到的行为是因为
ExecutionJobVertex
呼叫FileInputFormat. createInputSplits()
方法与minNumSplits
等于顶点(数据源)平行度的参数。所以如果你想要一个不同的行为,那么你必须重写createInputSplits
方法。虽然你没说你想要什么样的行为。例如,如果您只想对每个文件进行一次拆分,则可以重写
testForUnsplittable()
方法的子类中FileInputFormat
总是回到真实;它还应该设置(受保护的)unsplittable
布尔值为真。