scala 如何修复hdfs附加异常:已经是当前租约保持器

c8ib6hqw  于 2023-02-04  发布在  Scala
关注(0)|答案(1)|浏览(124)

我从Kafka文件中提取数据,并将数据添加到HDFS文件中。但是添加数据时出现异常信息。
以下是例外消息

Caused by: org.apache.hadoop.ipc.RemoteException: Failed to APPEND_FILE T_LOG_RECORD_CS/web-Thread-3 for DFSClient_NONMAPREDUCE_-558981961_18 on IP地址 because DFSClient_NONMAPREDUCE_-558981961_18 is already the current lease holder.
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2970)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2766)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3073)
        at ...
com.sharing.hdfs.HdfsHelper$.getFSDataOutputSteam(HdfsHelper.scala:225)
        at com.sharing.hdfs.HdfsHelper$.writeLines(HdfsHelper.scala:149)

这是我的代码,我使用writeLines方法将数据附加到HDFS。

/**
    * 创建fsDataOutputSteam
    *
    * @param path
    * @return
    */
  def getFSDataOutputSteam(path: String): FSDataOutputStream = {
    val pathIsExist = isExist(path)
    if (pathIsExist) {
      fileSystem.append(path)
    } else {
      fileSystem.create(path)
    }
  }

// 写入数据
def writeLines(lines: java.util.List[String], path: String): Unit = {

    var outputStream: FSDataOutputStream = null
    try {

      outputStream = getFSDataOutputSteam(path)
      lines.foreach({
        line =>
          outputStream.write(line.trim.getBytes("UTF-8"))
          outputStream.write("\n".getBytes("UTF-8"))
      })
      LOGGER.info("HDFS写入成功!")
    }
    catch {
      case e: IOException =>
        LOGGER.error(e.getMessage, e)
        updateLease(path)
        throw new IOException(e)
    } finally {
      if (Objects.nonNull(outputStream)) {
        outputStream.close()
      }
    }
  }

帮帮忙吧!谢谢~

oxf4rvwz

oxf4rvwz1#

创建FileSystem(更确切地说是Configuration类)时,请添加以下属性设置:

Configuration conf = new Configuration();
conf.set("fs.defaultFS", HDFS_PATH);
conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");

相关问题