java.lang.illegalargumentexception:错误的fs:,应为:hdfs://localhost:9000

z5btuh9x  于 2021-06-02  发布在  Hadoop
关注(0)|答案(4)|浏览(298)

我正在尝试实现reduce-side-join,并使用mapfile reader查找分布式缓存,但在stderr中检查时没有查找值。它显示了以下错误:lookupfile文件已存在于hdfs中,并且似乎已正确加载到缓存中,如stdout所示。
java.lang.illegalargumentexception:错误的fs:file:/app/hadoop/tmp/mapred/local/tasktracker/distcache/-8118663285704962921_u1196516983_170706299/localhost/input/delivery_status/deliverystatuscodes/data,预期:hdfs://localhost:9000位于org.apache.hadoop.fs.filesystem.checkpath(filesystem。java:390)在org.apache.hadoop.hdfs.distributedfilesystem.getpathname(distributedfilesystem。java:140)在org.apache.hadoop.hdfs.distributedfilesystem.getfilestatus(distributedfilesystem)。java:554)在org.apache.hadoop.fs.filesystem.getlength(文件系统)。java:816)在org.apache.hadoop.io.sequencefile$reader。java:1479)在org.apache.hadoop.io.sequencefile$reader。java:1474)在org.apache.hadoop.io.mapfile$reader.createdatafilereader(mapfile。java:302)在org.apache.hadoop.io.mapfile$reader.open(mapfile。java:284)在org.apache.hadoop.io.mapfile$reader.(Map文件。java:273)在org.apache.hadoop.io.mapfile$reader.(mapfile。java:260)在org.apache.hadoop.io.mapfile$reader.(mapfile。java:253)在mr.poc.reducersj.InitializedPartmentsMap(reducersj。java:59)在mr\u poc.reducersj.setup(reducersj。java:42)在org.apache.hadoop.mapreduce.reducer.run(reducer。java:174)在org.apache.hadoop.mapred.reducetask.runnewreducer(reducetask。java:649)在org.apache.hadoop.mapred.reducetask.run(reducetask。java:418)在org.apache.hadoop.mapred.child$4.run(child。java:255)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:416)在org.apache.hadoop.security.usergroupinformation.doas(用户组信息。java:1190)在org.apache.hadoop.mapred.child.main(child。java:249)mr_poc.reducersj.buildoutputvalue(reducersj。java:83)在mr.poc.reducersj.reduce(reducersj。java:127)在mr.poc.reducersj.reduce(reducersj。java:1)在org.apache.hadoop.mapreduce.reducer.run(reducer。java:177)在org.apache.hadoop.mapred.reducetask.runnewreducer(reducetask。java:649)在org.apache.hadoop.mapred.reducetask.run(reducetask。java:418)在org.apache.hadoop.mapred.child$4.run(child。java:255)位于java.security.accesscontroller.doprivileged(本机方法)javax.security.auth.subject.doas(主题。java:416)访问org.apache.hadoop.security。
这是我的驾驶密码,

package mr_poc;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class driverrsj extends Configured implements Tool{

    @Override
    public int run(String[] arg) throws Exception {
if(arg.length != 3)
{
    System.out.printf("3 parameters are required for DriverRSJ- <Input Dir1> <Input Dir2> <Output Dir> \n");
    return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
DistributedCache.addCacheFile(new URI("/input/delivery_status"), conf);
System.out.println("Cache : " + job.getConfiguration().get("mapred.cache.files"));
job.setJarByClass(driverrsj.class);
conf.setInt("cust_info", 1);
conf.setInt("status", 2);
StringBuilder inputPaths = new StringBuilder();
inputPaths.append(arg[0].toString()).append(",").append(arg[1].toString());
FileInputFormat.setInputPaths(job, inputPaths.toString());
FileOutputFormat.setOutputPath(job, new Path(arg[2]));
job.setJarByClass(driverrsj.class);
job.setMapperClass(mappperRSJ.class);
job.setReducerClass(reducerrsj.class);
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class);
job.setMapOutputValueClass(Text.class);
//job.setPartitionerClass(partinonrsj.class);
job.setSortComparatorClass(secondarysortcomp.class);
job.setGroupingComparatorClass(GroupingComparatorRSJ.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

boolean success =job.waitForCompletion(true);
return success? 0 : 1;

    }

    public static void main(String[] args) throws Exception{
        int exitCode = ToolRunner.run(new Configuration(), new driverrsj(),args);
        System.exit(exitCode);

    }

}

这是我的代码
包mrèpoc;

import java.io.File;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class reducerrsj extends Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text>{
    StringBuilder reduceValueBuilder = new StringBuilder("");
    NullWritable nullWritableKey = NullWritable.get();
    Text reduceOutputValue = new Text("");
    String strSeparator = ",";
    private MapFile.Reader deptMapReader = null;
    Text txtMapFileLookupKey = new Text();
    Text txtMapFileLookupValue = new Text();
    //Path[] cacheFilesLocal;
    //Path[] eachPath;

    @Override
    protected void setup(Context context) throws IOException,InterruptedException {
        Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());

        for ( Path eachPath : cacheFiles){

            System.out.println(eachPath.toString());
              System.out.println(eachPath.getName());
            if(eachPath.getName().toString().contains("delivery_status"))
            {

                URI uriUncompressedFile = new File(eachPath.toString()+ "/DeliveryStatusCodes").toURI();
                initializeDepartmentsMap(uriUncompressedFile, context);

            }
            }
        }

    //@SuppressWarnings("deprecation")
    private void initializeDepartmentsMap(URI uriUncompressedFile, Context context)
    throws IOException {
    // {{
    // Initialize the reader of the map file (side data)
        Configuration conf = context.getConfiguration();
        conf.addResource(new Path("/usr/local/hadoop-1.2.1/conf/core-site.xml"));
        FileSystem dfs = FileSystem.get(conf);
    try {

    deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration());
    } catch (Exception e) {
    e.printStackTrace();
    }
    // }}
    }
    private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key,
            StringBuilder reduceValueBuilder, Text value) {

            if (key.getsourceindex() == 2) {

            String arrSalAttributes[] = value.toString().split(",");
            txtMapFileLookupKey.set(arrSalAttributes[0].toString());
            System.out.println("key=" + txtMapFileLookupKey);

            try {

            deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue);
            }
             catch (Exception e) {
            txtMapFileLookupValue.set("");
                e.printStackTrace();
            } finally {
            txtMapFileLookupValue
            .set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue
            .equals("")) ? "NOT-FOUND"
            : txtMapFileLookupValue.toString());
            }

            reduceValueBuilder.append(txtMapFileLookupValue.toString());

            } else if(key.getsourceindex() == 1) {

            String arrEmpAttributes[] = value.toString().split(",");
            reduceValueBuilder.append(arrEmpAttributes[0].toString()).append(
            strSeparator);
            } 

            txtMapFileLookupKey.set("");
            txtMapFileLookupValue.set("");

            return reduceValueBuilder;
    }

    @Override
    public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values,
    Context context) throws IOException, InterruptedException {

    for (Text value : values) {
    buildOutputValue(key, reduceValueBuilder, value);
    }

    // Drop last comma, set value, and emit output
    if (reduceValueBuilder.length() > 1) {

    //reduceValueBuilder.setLength(reduceValueBuilder.length() - 1);
    // Emit output
    reduceOutputValue.set(reduceValueBuilder.toString());
    context.write(nullWritableKey, reduceOutputValue);
    } else {
    System.out.println("Key=" + key.getjoinkey() + "src="
    + key.getsourceindex());

    }
    // Reset variables
    reduceValueBuilder.setLength(0);
    reduceOutputValue.set("");

    }
    @Override
    protected void cleanup(Context context) throws IOException,
    InterruptedException {
         if(deptMapReader != null)
         {
deptMapReader.close();
    }
    }
}

这是我的核心网站xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/app/hadoop/tmp</value>
  <description>A base for other temporary directories.</description>
</property>
<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:9000</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>
</configuration>

任何帮助都将不胜感激。提前谢谢!!!

gdx19jrr

gdx19jrr1#

我也有同样的问题,我通过添加

FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"),conf)

在驾驶课上。
你必须导入 URIjava.net.URI

gudnpqoy

gudnpqoy2#

我想我也遇到过类似的问题。这个问题的关键是,您要从distributedcache操作一个sequencefile,这个文件应该在您的本地文件系统上。从你的日志中,有一行

"org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140)"

如果您可以查看sequencefile.reader的源代码,就会发现日志是由此代码引起的

fs.getFileStatus(filename).getLen()

这里的“fs”应该是localfilesystem,而不是distributedfilesystem。
我的解决办法就是改变

deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration());

JobConf conf = context.getConfiguration();
String originalFS = conf.get("fs.default.name");   //backup original configuration
conf.set("fs.default.name", "file:///");           //change configuration to local file system
deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), conf);
conf.set("fs.default.name", originalFS);           //restore original configuration

完成此操作后,sequencefile.reader对象可以访问本地文件系统上的缓存文件。
我认为出现这个问题是因为sequencefile api发生了变化,并且sequencefile.reader的一些api,比如mapfile.reader(fs,path,conf),在本例中已经被弃用了。
这个解决方案对我很有效。

cvxl0en2

cvxl0en23#

在job runner中包含以下行:distributedcache.addcachefile(newuri(“”),conf);
下面是mapper设置方法中的代码

@Override
protected void setup(Context context) throws IOException, InterruptedException {
    Configuration configuration = new Configuration();
    FileSystem fileSystem = null;
    try {
         fileSystem = FileSystem.get(new URI("<File location"),configuration);
    } catch (URISyntaxException e) {
        e.printStackTrace();
    }

    String location = <S3 file location>;
    FSDataInputStream fsDataInputStream =fileSystem.open(new Path(location));
    Scanner scanner = new Scanner(fsDataInputStream);
    int i = 1;
    while(scanner.hasNextLine()) {
        String str[] = scanner.nextLine().split(",");
        LOG.info("keys are \t" + str[0] + str[1]);
        stickerMap.put(str[0] + str[1], i);
        ++i;
    }
}
jgzswidk

jgzswidk4#

你应该设置 conf 根据你的意见 core-site.xml 文件如下:

conf.set("fs.defaultFS", "hdfs://host:port");
conf.set("mapreduce.jobtracker.address", "host:port");

相关问题