java在hadoop/cascading中从ftp服务器读取数据

jdg4fx2g  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(394)

我想从ftp服务器读取数据。我正在以以下格式提供驻留在ftp服务器上的文件的路径ftp://username:password@host/path. 当我使用map-reduce程序从文件中读取数据时,效果很好。我想通过级联框架从同一个文件中读取数据。我使用级联框架的hfs-tap读取数据。它引发以下异常

java.io.IOException: Stream closed
    at org.apache.hadoop.fs.ftp.FTPInputStream.close(FTPInputStream.java:98)
    at java.io.FilterInputStream.close(Unknown Source)
    at org.apache.hadoop.util.LineReader.close(LineReader.java:83)
    at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:168)
    at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.close(MapTask.java:254)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:440)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)

下面是我阅读文件的级联框架代码:

public class FTPWithHadoopDemo {
    public static void main(String args[]) {
        Tap source = new Hfs(new TextLine(new Fields("line")), "ftp://user:pwd@xx.xx.xx.xx//input1");
        Tap sink = new Hfs(new TextLine(new Fields("line1")), "OP\\op", SinkMode.REPLACE);
        Pipe pipe = new Pipe("First");
        pipe = new Each(pipe, new RegexSplitGenerator("\\s+"));
        pipe = new GroupBy(pipe);
        Pipe tailpipe = new Every(pipe, new Count());
        FlowDef flowDef = FlowDef.flowDef().addSource(pipe, source).addTailSink(tailpipe, sink);
        new HadoopFlowConnector().connect(flowDef).complete();
    }
}

我试图在hadoop源代码中查找相同的异常。我发现在maptask类中有一个方法runoldmapper处理流。在同一个方法中,最后还有一个关闭流的块(in.close())。当我从finally block中删除该行时,它工作正常。代码如下:

private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex,
            final TaskUmbilicalProtocol umbilical, TaskReporter reporter)
                    throws IOException, InterruptedException, ClassNotFoundException {
        InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());

        updateJobWithSplit(job, inputSplit);
        reporter.setInputSplit(inputSplit);

        RecordReader<INKEY, INVALUE> in = isSkipping()
                ? new SkippingRecordReader<INKEY, INVALUE>(inputSplit, umbilical, reporter)
                : new TrackedRecordReader<INKEY, INVALUE>(inputSplit, job, reporter);
        job.setBoolean("mapred.skip.on", isSkipping());

        int numReduceTasks = conf.getNumReduceTasks();
        LOG.info("numReduceTasks: " + numReduceTasks);
        MapOutputCollector collector = null;
        if (numReduceTasks > 0) {
            collector = new MapOutputBuffer(umbilical, job, reporter);
        } else {
            collector = new DirectMapOutputCollector(umbilical, job, reporter);
        }
        MapRunnable<INKEY, INVALUE, OUTKEY, OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(),
                job);

        try {
            runner.run(in, new OldOutputCollector(collector, conf), reporter);
            collector.flush();
        } finally {
            // close
            in.close(); // close input
            collector.close();
        }
    }

请帮助我解决这个问题。
谢谢,阿尔沙达利

ltskdhd1

ltskdhd11#

经过一番努力,我发现hadoop使用 org.apache.hadoop.fs.ftp.FTPFileSystem ftp类。
这个类不支持seek,即从文件开始到给定偏移量的seek。在一个块中读取数据,然后文件系统寻找下一个要读取的块。默认块大小为4kb FTPFileSystem . 由于不支持seek,它只能读取小于或等于4kb的数据。

相关问题