hdfs读取

6za6bjd0  于 2021-05-31  发布在  Hadoop
关注(0)|答案(1)|浏览(474)

我正在使用生产者-消费者模型,利用blockingqueue,使用多线程从hdfs目录读取文件。
这是我的密码;
制片人级别:

public void readURLS() {
    final int capacity = Integer.MAX_VALUE;

    BlockingQueue<String> queue = new LinkedBlockingQueue<>(capacity);
    try {
        FileSystem hdfs = FileSystem.get(hadoopConf);
        FileStatus[] status = hdfs.listStatus(new Path("MYHDFS_PATH"));

        int i = 0;

       for (FileStatus file : status) {
            LOG.info("Thread {} started: ", i++);
            LOG.info("Reading file {} ", file.getPath().getName());
            new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();
       }
    } catch (IOException e) {
        LOG.error("IOException occured while listing files from HDFS directory");
    }

}

获取数据:

@Override
    public void run() {
        LOG.info("Inside reader to start reading the files ");

        try (BufferedReader bufferedReader =
                new BufferedReader(new InputStreamReader
                        (FileSystem.get(hadoopConf).open(file), StandardCharsets.UTF_8))) {

            String line;
            while ((line = bufferedReader.readLine()) != null) {
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
                LOG.info("Line is :{}", line);
                queue.put(line);

            }

        } catch (IOException e) {
            LOG.error("file : {} ", file.toString());
            throw new IOException(e);
        } catch (InterruptedException e) {
            LOG.error("An error has occurred: ", e);
            Thread.currentThread().interrupt();

        }

在执行代码时,它会引发interruptedioexception:

java.io.IOException: Failed on local exception: java.io.**InterruptedIOException**: Interruped while waiting for IO on channel java.nio.channels.SocketChannel[connected

知道为什么吗。我的想法是循环每个文件并使用单独的线程读取每个文件。

wbgh16ku

wbgh16ku1#

在使用多个(多个)的hdfs时,我也得到了相同的行为线程,并且不知道问题“为什么?”的答案,但是保持并发访问hdfs的线程数似乎有帮助。
在您的情况下,我建议使用具有有限线程数的executorservice,并在没有异常时将该数量微调到限制。
因此,创建executorservice(以10个线程为起点):

final ExecutorService executorService = Executors.newFixedThreadPool(10);

而不是你的

new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();

executorService.submit(new FetchData(queue, file.getPath(), hadoopConf));

另一个改进是 org.apache.hadoop.fs.FileSystem 工具 Closeable ,你应该把它关上。在代码中,每个线程都会创建 FileSystem ,但不关闭它。所以我会把它提取到 try :

try (FileSystem fileSystem = FileSystem.get(hadoopConf);
     BufferedReader bufferedReader =
             new BufferedReader(new InputStreamReader
                     (fileSystem.open(file), StandardCharsets.UTF_8))) {

更新:
尽管上面的代码似乎是 Closeable 对象,默认情况下 FileSystem.get 将从

/**FileSystem cache */
static final Cache CACHE = new Cache();

因此,当 close() 他们将被召唤。
您可以通过设置 fs.hdfs.impl.disable.cache 配置参数到 true ,或确保 FileSystem 示例仅在所有工作进程完成时关闭。您似乎可以为所有的工作人员使用一个文件系统示例,尽管我在javadocs中找不到任何确认,即在没有额外同步的情况下,它可以正常工作。

相关问题