我写了一个Hadoop程序,当我尝试执行它时(我使用以下命令:hadoop jar kmeans-1.0-SNAPSHOT.jar it. kurapika. Kmeans dataset.txt output)我得到以下错误:
Error: java.io.FileNotFoundException: File does not exist: /user/hadoop/centroids.seq (inode 17689) [Lease. Holder: DFSClient_attempt_1685688288161_0036_r_000002_0_2097879656_1, pending creates: 1]
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2840)
at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:599)
at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:171)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2719)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:892)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:568)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:527)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1036)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1000)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:928)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2916)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
at org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1084)
at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1866)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1668)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:716)
当我在浏览器上检查Hadoop门户时,我可以看到该文件存在:
'centroids. seq'文件在驱动程序文件(Kmeans.java)中创建,如下所示:
public class Kmeans {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.addResource(new Path("configuration.xml"));
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
// set parameters
final String INPUT = otherArgs[0];
final String OUTPUT = otherArgs[1] + "/temp";
final int DATASET_SIZE = conf.getInt("dataset.size", 100);
final String CENTROIDS_PATH = conf.get("centroids.path", "centroids.seq");
final int K = conf.getInt("k", 3);
final int MAX_ITERATIONS = conf.getInt("iterations", 20);
Point[] newCentroids = new Point[K];
// generate initial centroids
newCentroids = Utility.generateCentroids(conf, INPUT, K, DATASET_SIZE);
Utility.writeCentroids(conf, new Path(CENTROIDS_PATH), newCentroids);
...
// write final centroids in output file
Utility.writeOutput(conf, new Path(CENTROIDS_PATH), new Path(otherArgs[1]));
System.exit(0);
}
}
这里是www.example.com Utility.java:
public class Utility {
public static Point[] generateCentroids(Configuration conf, String pathString, int k, int dataSetSize)
throws IOException {
Point[] points = new Point[k];
//Create a sorted list of positions without duplicates
//Positions are the line index of the random selected centroids
List<Integer> positions = new ArrayList<Integer>();
Random random = new Random();
int pos;
while(positions.size() < k) {
pos = random.nextInt(dataSetSize);
if(!positions.contains(pos)) {
positions.add(pos);
}
}
Collections.sort(positions);
//File reading utils
Path dataPath = new Path(pathString);
FileSystem hdfs = FileSystem.get(conf);
FSDataInputStream in = hdfs.open(dataPath);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
//Get centroids from the file
int row = 0;
int i = 0;
int position;
while(i < positions.size()) {
position = positions.get(i);
String point = br.readLine();
if(row == position) {
points[i] = new Point();
points[i].parse(point);
i++;
}
row++;
}
br.close();
return points;
}
public static void writeCentroids(Configuration conf, Path center, Point[] points) throws IOException {
try (SequenceFile.Writer centerWriter = SequenceFile.createWriter(conf, SequenceFile.Writer.file(center) ,
SequenceFile.Writer.keyClass(Point.class), SequenceFile.Writer.valueClass(IntWritable.class))) {
final IntWritable value = new IntWritable(0);
for (Point point : points) {
centerWriter.append(point, value);
}
}
}
public static void writeOutput(Configuration conf, Path centroidsPath, Path outpath) throws IOException {
List<Centroid> centroids = new ArrayList<>(); // list of centroids
try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(centroidsPath))) {
Centroid key = new Centroid();
while (reader.next(key)) { // iterate over records
Centroid center = new Centroid(key); // create new centroid
centroids.add(center); // add new Centroid to list
}
FileSystem hdfs = FileSystem.get(conf);
FSDataOutputStream dos = hdfs.create(outpath, true);
BufferedWriter br = new BufferedWriter(new OutputStreamWriter(dos));
// write the result in output file
for(int i = 0; i < centroids.size(); i++) {
br.write(centroids.get(i).toString());
br.newLine();
}
br.close();
hdfs.close();
}
}
}
减速器:
public class KmeansReducer extends Reducer<Centroid, Point, Centroid, NullWritable>{
public static enum Counter {
// Global counter: it gets incremented every time new centroids are more than epsilon distant from previous centroids
CONVERGED
}
private final List<Centroid> centers = new ArrayList<>(); // list containing new centroids
private Double epsilon = 0.; // convergence parameter
@Override
protected void setup(Context context) {
Configuration conf = context.getConfiguration();
epsilon = conf.getDouble("epsilon", 0.0001); // initialize convergence parameter with value in configuration file
}
// for each cluster calculate new centroids
@Override
protected void reduce(Centroid key, Iterable<Point> partialSums, Context context) throws IOException, InterruptedException {
Centroid newKey = new Centroid(); // new centroid
for (Point point : partialSums) { // summation of partial sums
newKey.getPoint().sum(point);
}
newKey.getPoint().compress(); // divide for number of points in cluster
newKey.setIndex(key); // assign old centroid's index to new centroid
centers.add(newKey); // add new centroid to new centroids list
context.write(newKey, NullWritable.get()); // write output record (key: centroid, value: null)
// calculate distance between new centroid and old centroid
double distance = key.getPoint().getDistance(newKey.getPoint());
if (distance > epsilon) { // if distance is greater than epsilon
context.getCounter(Counter.CONVERGED).increment(1); // increment global counter
}
}
// write new centroids in sequence file
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Path outPath = new Path(conf.get("centroids.path", "centroids.seq")); // get path of centroids sequence file
FileSystem fs = FileSystem.get(conf);
fs.delete(outPath, true); // if path exists delete it
try (SequenceFile.Writer out = SequenceFile.createWriter(conf, SequenceFile.Writer.file(outPath),
SequenceFile.Writer.keyClass(Centroid.class), SequenceFile.Writer.valueClass(IntWritable.class))) {
final IntWritable value = new IntWritable(0);
for (Centroid center : centers) {
out.append(center, value); // write new centroids in sequence file
}
}
}
}
我尝试修改文件centroids.seq的权限:hadoop fs -chmod +wx /user/hadoop/centroids.seq
无效,因为再次运行程序会产生相同的输出。
任何帮助将不胜感激。
1条答案
按热度按时间tcomlyy61#
你关闭这个文件了吗
您也应该关闭该文件