HDFS Hadoop无法看到文件,尽管它存在:“错误:java.io.FileNotFoundException:文件不存在:/user/hadoop/centroids.seq”

piztneat  于 2023-06-20  发布在  HDFS
关注(0)|答案(1)|浏览(201)

我写了一个Hadoop程序,当我尝试执行它时(我使用以下命令:hadoop jar kmeans-1.0-SNAPSHOT.jar it. kurapika. Kmeans dataset.txt output)我得到以下错误:

Error: java.io.FileNotFoundException: File does not exist: /user/hadoop/centroids.seq (inode 17689) [Lease.  Holder: DFSClient_attempt_1685688288161_0036_r_000002_0_2097879656_1, pending creates: 1]
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2840)
        at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:599)
        at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:171)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2719)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:892)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:568)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:527)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1036)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1000)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:928)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2916)

        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
        at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
        at org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1084)
        at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1866)
        at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1668)
        at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:716)

当我在浏览器上检查Hadoop门户时,我可以看到该文件存在:

'centroids. seq'文件在驱动程序文件(Kmeans.java)中创建,如下所示:

public class Kmeans {

        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
                // TODO Auto-generated method stub
                Configuration conf = new Configuration();
        conf.addResource(new Path("configuration.xml"));

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        // set parameters
        final String INPUT = otherArgs[0];
        final String OUTPUT = otherArgs[1] + "/temp";
        final int DATASET_SIZE = conf.getInt("dataset.size", 100);

        final String CENTROIDS_PATH = conf.get("centroids.path", "centroids.seq");
        final int K = conf.getInt("k", 3);
        final int MAX_ITERATIONS = conf.getInt("iterations", 20);

        Point[] newCentroids = new Point[K];

        // generate initial centroids
        newCentroids = Utility.generateCentroids(conf, INPUT, K, DATASET_SIZE);
        Utility.writeCentroids(conf, new Path(CENTROIDS_PATH), newCentroids);
          
            ...
                // write final centroids in output file
                Utility.writeOutput(conf, new Path(CENTROIDS_PATH), new Path(otherArgs[1]));

        System.exit(0);

        }
}

这里是www.example.com Utility.java:

public class Utility {

        public static Point[] generateCentroids(Configuration conf, String pathString, int k, int dataSetSize)
                      throws IOException {
                        Point[] points = new Point[k];

                        //Create a sorted list of positions without duplicates
                        //Positions are the line index of the random selected centroids
                        List<Integer> positions = new ArrayList<Integer>();
                        Random random = new Random();
                        int pos;
                        while(positions.size() < k) {
                            pos = random.nextInt(dataSetSize);
                            if(!positions.contains(pos)) {
                                positions.add(pos);
                            }
                        }
                        Collections.sort(positions);

                        //File reading utils
                        Path dataPath = new Path(pathString);
                        FileSystem hdfs = FileSystem.get(conf);
                        FSDataInputStream in = hdfs.open(dataPath);
                        BufferedReader br = new BufferedReader(new InputStreamReader(in));

                        //Get centroids from the file
                        int row = 0;
                        int i = 0;
                        int position;
                        while(i < positions.size()) {
                            position = positions.get(i);
                            String point = br.readLine();
                            if(row == position) {
                                points[i] = new Point();
                                points[i].parse(point);
                                i++;
                            }
                            row++;
                        }
                        br.close();

                        return points;
                    }

        public static void writeCentroids(Configuration conf, Path center, Point[] points) throws IOException {
                try (SequenceFile.Writer centerWriter = SequenceFile.createWriter(conf, SequenceFile.Writer.file(center) ,
                                SequenceFile.Writer.keyClass(Point.class), SequenceFile.Writer.valueClass(IntWritable.class))) {
                        final IntWritable value = new IntWritable(0);
                        for (Point point : points) {
                                centerWriter.append(point, value);
                        }

                }
        }

        public static void writeOutput(Configuration conf, Path centroidsPath, Path outpath) throws IOException {

                List<Centroid> centroids = new ArrayList<>();           // list of centroids

                try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(centroidsPath))) {

                        Centroid key = new Centroid();

                        while (reader.next(key)) {                      // iterate over records
                                Centroid center = new Centroid(key);    // create new centroid
                                centroids.add(center);                  // add new Centroid to list
                        }

                        FileSystem hdfs = FileSystem.get(conf);
                        FSDataOutputStream dos = hdfs.create(outpath, true);
                        BufferedWriter br = new BufferedWriter(new OutputStreamWriter(dos));

                        // write the result in output file
                        for(int i = 0; i < centroids.size(); i++) {
                                br.write(centroids.get(i).toString());
                                br.newLine();
                        }

                        br.close();
                        hdfs.close();
                }
        }

}

减速器:

public class KmeansReducer extends Reducer<Centroid, Point, Centroid, NullWritable>{

        public static enum Counter {
                // Global counter: it gets incremented every time new centroids are more than epsilon distant from previous centroids
                CONVERGED
        }

        private final List<Centroid> centers = new ArrayList<>();  // list containing new centroids

        private Double epsilon = 0.;            // convergence parameter

        @Override
        protected void setup(Context context) {
            Configuration conf = context.getConfiguration();
            epsilon = conf.getDouble("epsilon", 0.0001);        // initialize convergence parameter with value in configuration file
        }

        // for each cluster calculate new centroids
        @Override
        protected void reduce(Centroid key, Iterable<Point> partialSums, Context context) throws IOException, InterruptedException {

            Centroid newKey = new Centroid();                   // new centroid

            for (Point point : partialSums) {                   // summation of partial sums
                newKey.getPoint().sum(point);
            }
            newKey.getPoint().compress();                       // divide for number of points in cluster
            newKey.setIndex(key);                               // assign old centroid's index to new centroid

            centers.add(newKey);                                // add new centroid to new centroids list
            context.write(newKey, NullWritable.get());          // write output record (key: centroid, value: null)

            // calculate distance between new centroid and old centroid
            double distance = key.getPoint().getDistance(newKey.getPoint());
            if (distance > epsilon) {                           // if distance is greater than epsilon
                context.getCounter(Counter.CONVERGED).increment(1);     // increment global counter
            }
        }

        // write new centroids in sequence file
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

                Configuration conf = context.getConfiguration();
                Path outPath = new Path(conf.get("centroids.path", "centroids.seq"));           // get path of centroids sequence file
                FileSystem fs = FileSystem.get(conf);
                fs.delete(outPath, true);                               // if path exists delete it
                try (SequenceFile.Writer out = SequenceFile.createWriter(conf, SequenceFile.Writer.file(outPath),
                                SequenceFile.Writer.keyClass(Centroid.class), SequenceFile.Writer.valueClass(IntWritable.class))) {
                        final IntWritable value = new IntWritable(0);
                        for (Centroid center : centers) {
                                out.append(center, value);              // write new centroids in sequence file
                        }
                }
        }

}

我尝试修改文件centroids.seq的权限:hadoop fs -chmod +wx /user/hadoop/centroids.seq无效,因为再次运行程序会产生相同的输出。
任何帮助将不胜感激。

tcomlyy6

tcomlyy61#

你关闭这个文件了吗

FSDataOutputStream dos = hdfs.create(outpath, true);

您也应该关闭该文件

out.close();

相关问题