在spark流中随机获取leaseexpiredexception

siv3szwd  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(429)

我有一个spark流(cloudera5.12的2.1.1版本)。输入kafka和输出hdfs(Parquet格式)的问题是,我随机得到leaseexpiredexception(不是所有的小批量)
org.apache.hadoop.ipc.remoteexception(org.apache.hadoop.hdfs.server.namenode.leaseexpiredexception):无租赁/user/qoe\u fixe/data\u tv/tmp/cleandata/\u temporary/0/\u temporary/attement\u 20180629132202\u 0215\u m\u0000000/year=2018/month=6/day=29/hour=11/source=lyo2/part-00000-c6f21a40-4088-4d97-ae0c-24fa463550ab.snapy.parquet(inode 135532024):文件不存在不存在。持有者dfsclient\u尝试\u 20180629132202\u 0215\u m\u000000\u 0\u1048963677\u 900没有任何打开的文件。
我正在使用dataset api来编写hdfs

if (!InputWithDatePartition.rdd.isEmpty() ) InputWithDatePartition.repartition(1).write.partitionBy("year", "month", "day","hour","source").mode("append").parquet(cleanPath)

因为这个错误,我的工作几个小时后就失败了

mpbci0fu

mpbci0fu1#

写入同一目录的两个作业共享同一个 _temporary 文件夹。
因此,当第一个作业完成时,将执行此代码(fileoutputcommitter类):

public void cleanupJob(JobContext context) throws IOException {
    if (hasOutputPath()) {
      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
      FileSystem fs = pendingJobAttemptsPath
          .getFileSystem(context.getConfiguration());
      // if job allow repeatable commit and pendingJobAttemptsPath could be
      // deleted by previous AM, we should tolerate FileNotFoundException in
      // this case.
      try {
        fs.delete(pendingJobAttemptsPath, true);
      } catch (FileNotFoundException e) {
        if (!isCommitJobRepeatable(context)) {
          throw e;
        }
      }
    } else {
      LOG.warn("Output Path is null in cleanupJob()");
    }
  }

它会在第二个作业仍在运行时删除pendingjobattemptspath(\u temporary)。这可能会有所帮助:
通过分区将Parquet数据附加到同一基本路径的多个spark作业

相关问题