apache beam textio读取文件,行号为

ohfgkhjo  于 2021-07-12  发布在  Java
关注(0)|答案(0)|浏览(222)

是否可以从textio.read()获取行号?如果不是的话,您能指出一个从java中的textio继承的自定义i/o的例子吗
我的问题是,我需要读取一个大的csv文件(150gb+),并在30分钟内生成一个行号。
使用以下代码 FileIO 并且打开一个文件然后放序列号,这个数据流作业不能做伸缩,它只阻塞了一个worker,在数据流上花费的时间大约是5个小时 n1-standard-2 ```
@Slf4j
public class ApplyRowIndexFn extends DoFn<FileIO.ReadableFile, KV<Integer, String>> {

protected final PCollectionView<TransformSideInput> transformFnInput;

public ApplyRowIndexFn(PCollectionView<TransformSideInput> transformFnInput) {
    this.transformFnInput = transformFnInput;
}

@ProcessElement
public void processElement(ProcessContext context) {
    TransformSideInput input = context.sideInput(transformFnInput);
    String header = input.getHeader();
    try (Reader reader = Channels.newReader(
            FileSystems.open(context.element().getMetadata().resourceId()), "UTF-8");
         BufferedReader buffer = new BufferedReader(reader)) {
        String row;
        int counter = 1;
        while ((row = buffer.readLine()) != null) {
            if (!row.equals(header)) {
                context.output(KV.of(counter, row));
                counter = counter + 1;
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题