我从Kafka文件中提取数据,并将数据添加到HDFS文件中。但是添加数据时出现异常信息。
以下是例外消息
Caused by: org.apache.hadoop.ipc.RemoteException: Failed to APPEND_FILE T_LOG_RECORD_CS/web-Thread-3 for DFSClient_NONMAPREDUCE_-558981961_18 on IP地址 because DFSClient_NONMAPREDUCE_-558981961_18 is already the current lease holder.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2970)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2766)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3073)
at ...
com.sharing.hdfs.HdfsHelper$.getFSDataOutputSteam(HdfsHelper.scala:225)
at com.sharing.hdfs.HdfsHelper$.writeLines(HdfsHelper.scala:149)
这是我的代码,我使用writeLines方法将数据附加到HDFS。
/**
* 创建fsDataOutputSteam
*
* @param path
* @return
*/
def getFSDataOutputSteam(path: String): FSDataOutputStream = {
val pathIsExist = isExist(path)
if (pathIsExist) {
fileSystem.append(path)
} else {
fileSystem.create(path)
}
}
// 写入数据
def writeLines(lines: java.util.List[String], path: String): Unit = {
var outputStream: FSDataOutputStream = null
try {
outputStream = getFSDataOutputSteam(path)
lines.foreach({
line =>
outputStream.write(line.trim.getBytes("UTF-8"))
outputStream.write("\n".getBytes("UTF-8"))
})
LOGGER.info("HDFS写入成功!")
}
catch {
case e: IOException =>
LOGGER.error(e.getMessage, e)
updateLease(path)
throw new IOException(e)
} finally {
if (Objects.nonNull(outputStream)) {
outputStream.close()
}
}
}
帮帮忙吧!谢谢~
1条答案
按热度按时间oxf4rvwz1#
创建FileSystem(更确切地说是Configuration类)时,请添加以下属性设置: