是否可以从textio.read()获取行号?如果不是的话,您能指出一个从java中的textio继承的自定义i/o的例子吗
我的问题是,我需要读取一个大的csv文件(150gb+),并在30分钟内生成一个行号。
使用以下代码 FileIO
并且打开一个文件然后放序列号,这个数据流作业不能做伸缩,它只阻塞了一个worker,在数据流上花费的时间大约是5个小时 n1-standard-2
```
@Slf4j
public class ApplyRowIndexFn extends DoFn<FileIO.ReadableFile, KV<Integer, String>> {
protected final PCollectionView<TransformSideInput> transformFnInput;
public ApplyRowIndexFn(PCollectionView<TransformSideInput> transformFnInput) {
this.transformFnInput = transformFnInput;
}
@ProcessElement
public void processElement(ProcessContext context) {
TransformSideInput input = context.sideInput(transformFnInput);
String header = input.getHeader();
try (Reader reader = Channels.newReader(
FileSystems.open(context.element().getMetadata().resourceId()), "UTF-8");
BufferedReader buffer = new BufferedReader(reader)) {
String row;
int counter = 1;
while ((row = buffer.readLine()) != null) {
if (!row.equals(header)) {
context.output(KV.of(counter, row));
counter = counter + 1;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!