我正在使用生产者-消费者模型,利用blockingqueue,使用多线程从hdfs目录读取文件。
这是我的密码;
制片人级别:
public void readURLS() {
final int capacity = Integer.MAX_VALUE;
BlockingQueue<String> queue = new LinkedBlockingQueue<>(capacity);
try {
FileSystem hdfs = FileSystem.get(hadoopConf);
FileStatus[] status = hdfs.listStatus(new Path("MYHDFS_PATH"));
int i = 0;
for (FileStatus file : status) {
LOG.info("Thread {} started: ", i++);
LOG.info("Reading file {} ", file.getPath().getName());
new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();
}
} catch (IOException e) {
LOG.error("IOException occured while listing files from HDFS directory");
}
}
获取数据:
@Override
public void run() {
LOG.info("Inside reader to start reading the files ");
try (BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader
(FileSystem.get(hadoopConf).open(file), StandardCharsets.UTF_8))) {
String line;
while ((line = bufferedReader.readLine()) != null) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
LOG.info("Line is :{}", line);
queue.put(line);
}
} catch (IOException e) {
LOG.error("file : {} ", file.toString());
throw new IOException(e);
} catch (InterruptedException e) {
LOG.error("An error has occurred: ", e);
Thread.currentThread().interrupt();
}
在执行代码时,它会引发interruptedioexception:
java.io.IOException: Failed on local exception: java.io.**InterruptedIOException**: Interruped while waiting for IO on channel java.nio.channels.SocketChannel[connected
知道为什么吗。我的想法是循环每个文件并使用单独的线程读取每个文件。
1条答案
按热度按时间wbgh16ku1#
在使用多个(多个)的hdfs时,我也得到了相同的行为线程,并且不知道问题“为什么?”的答案,但是保持并发访问hdfs的线程数似乎有帮助。
在您的情况下,我建议使用具有有限线程数的executorservice,并在没有异常时将该数量微调到限制。
因此,创建executorservice(以10个线程为起点):
而不是你的
做
另一个改进是
org.apache.hadoop.fs.FileSystem
工具Closeable
,你应该把它关上。在代码中,每个线程都会创建FileSystem
,但不关闭它。所以我会把它提取到try
:更新:
尽管上面的代码似乎是
Closeable
对象,默认情况下FileSystem.get
将从因此,当
close()
他们将被召唤。您可以通过设置
fs.hdfs.impl.disable.cache
配置参数到true
,或确保FileSystem
示例仅在所有工作进程完成时关闭。您似乎可以为所有的工作人员使用一个文件系统示例,尽管我在javadocs中找不到任何确认,即在没有额外同步的情况下,它可以正常工作。