org.apache.hadoop.hdfs.server.datanode.DataNode类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(12.1k)|赞(0)|评价(0)|浏览(163)

本文整理了Java中org.apache.hadoop.hdfs.server.datanode.DataNode类的一些代码示例,展示了DataNode类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。DataNode类的具体详情如下:
包路径:org.apache.hadoop.hdfs.server.datanode.DataNode
类名称:DataNode

DataNode介绍

[英]DataNode is a class (and program) that stores a set of blocks for a DFS deployment. A single deployment can have one or many DataNodes. Each DataNode communicates regularly with a single NameNode. It also communicates with client code and other DataNodes from time to time. DataNodes store a series of named blocks. The DataNode allows client code to read these blocks, or to write new block data. The DataNode may also, in response to instructions from its NameNode, delete blocks or copy blocks to/from other DataNodes. The DataNode maintains just one critical table: block-> stream of bytes (of BLOCK_SIZE or less) This info is stored on a local disk. The DataNode reports the table's contents to the NameNode upon startup and every so often afterwards. DataNodes spend their lives in an endless loop of asking the NameNode for something to do. A NameNode cannot connect to a DataNode directly; a NameNode simply returns values from functions invoked by a DataNode. DataNodes maintain an open server socket so that client code or other DataNodes can read/write data. The host/port for this server is reported to the NameNode, which then sends that information to clients or other DataNodes that might be interested.
[中]DataNode是为DFS部署存储一组块的类(和程序)。单个部署可以有一个或多个DataNode。每个DataNode定期与单个NameNode通信。它还不时与客户机代码和其他数据节点通信。数据节点存储一系列命名块。DataNode允许客户端代码读取这些块,或写入新的块数据。DataNode还可以响应来自其NameNode的指令,向其他DataNode删除块或从其他DataNode复制块。DataNode只维护一个关键表:block->stream of bytes(块大小或更小)。此信息存储在本地磁盘上。DataNode在启动时向NameNode报告表的内容,之后每隔一段时间报告一次。数据节点的一生都在没完没了地向NameNode请求一些事情来做。NameNode不能直接连接到DataNode;NameNode只是从DataNode调用的函数返回值。DataNodes维护一个开放的服务器套接字,以便客户端代码或其他DataNodes可以读/写数据。此服务器的主机/端口报告给NameNode,然后NameNode将该信息发送给可能感兴趣的客户端或其他DataNode。

代码示例

代码示例来源:origin: apache/hbase

LOG.info("killing datanode " + name + " / " + lookup);
ipcPort = dn.ipcServer.getListenerAddress().getPort();
dn.shutdown();
LOG.info("killed datanode " + name + " / " + lookup);
break;

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

/** Instantiate & Start a single datanode daemon and wait for it to finish.
 *  If this thread is specifically interrupted, it will stop waiting.
 */
@VisibleForTesting
public static DataNode createDataNode(String args[],
                Configuration conf) throws IOException {
 return createDataNode(args, conf, null);
}

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

private DataXceiver(Peer peer, DataNode datanode,
  DataXceiverServer dataXceiverServer) throws IOException {
 super(datanode.getTracer());
 this.peer = peer;
 this.dnConf = datanode.getDnConf();
 this.socketIn = peer.getInputStream();
 this.socketOut = peer.getOutputStream();
 this.datanode = datanode;
 this.dataXceiverServer = dataXceiverServer;
 this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
 this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(datanode.getConf());
 this.smallBufferSize = DFSUtilClient.getSmallBufferSize(datanode.getConf());
 remoteAddress = peer.getRemoteAddressString();
 final int colonIdx = remoteAddress.indexOf(':');
 remoteAddressWithoutPort =
   (colonIdx < 0) ? remoteAddress : remoteAddress.substring(0, colonIdx);
 localAddress = peer.getLocalAddressString();
 LOG.debug("Number of active connections is: {}",
   datanode.getXceiverCount());
}

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

BlockRecoveryWorker(DataNode datanode) {
 this.datanode = datanode;
 conf = datanode.getConf();
 dnConf = datanode.getDnConf();
}

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

@Override // ClientDatanodeProtocol
public void refreshNamenodes() throws IOException {
 checkSuperuserPrivilege();
 setConf(new Configuration());
 refreshNamenodes(getConf());
}

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

IOStreamPair connectToDN(DatanodeInfo datanodeID, int timeout,
             ExtendedBlock block,
             Token<BlockTokenIdentifier> blockToken)
  throws IOException {
 return DFSUtilClient.connectToDN(datanodeID, timeout, getConf(),
   saslClient, NetUtils.getDefaultSocketFactory(getConf()), false,
   getDataEncryptionKeyFactoryForBlock(block), blockToken);
}

代码示例来源:origin: ch.cern.hadoop/hadoop-hdfs

/**
 * Stores the information related to a namenode in the cluster
 */
public static class NameNodeInfo {
 final NameNode nameNode;
 final Configuration conf;
 final String nameserviceId;
 final String nnId;
 StartupOption startOpt;
 NameNodeInfo(NameNode nn, String nameserviceId, String nnId,
   StartupOption startOpt, Configuration conf) {
  this.nameNode = nn;
  this.nameserviceId = nameserviceId;
  this.nnId = nnId;
  this.startOpt = startOpt;
  this.conf = conf;
 }
 
 public void setStartOpt(StartupOption startOpt) {
  this.startOpt = startOpt;
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

/** Instantiate & Start a single datanode daemon and wait for it to finish.
 *  If this thread is specifically interrupted, it will stop waiting.
 */
@VisibleForTesting
@InterfaceAudience.Private
public static DataNode createDataNode(String args[], Configuration conf,
  SecureResources resources) throws IOException {
 DataNode dn = instantiateDataNode(args, conf, resources);
 if (dn != null) {
  dn.runDatanodeDaemon();
 }
 return dn;
}

代码示例来源:origin: ch.cern.hadoop/hadoop-hdfs

DatanodeRegistration bpReg = bpos.bpRegistration;
  InterDatanodeProtocol datanode = bpReg.equals(id)?
    this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
      dnConf.socketTimeout, dnConf.connectToDnViaHostname);
  ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
  if (info != null &&
    info.getGenerationStamp() >= block.getGenerationStamp() &&
syncBlock(rBlock, syncList);

代码示例来源:origin: org.jvnet.hudson.hadoop/hadoop-core

try {
   InterDatanodeProtocol datanode = dnRegistration.equals(id)?
     this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
   BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
   if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
  block.setNumBytes(minlength);
 return syncBlock(block, syncList, closeFile);
} finally {
 synchronized (ongoingRecovery) {

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs-test

/**
 * Check whether the datanode can be started.
 */
private boolean canStartDataNode(Configuration conf) throws IOException {
 DataNode dn = null;
 try {
  dn = DataNode.createDataNode(new String[]{}, conf);
 } catch(IOException e) {
  if (e instanceof java.net.BindException)
   return false;
  throw e;
 } finally {
  if(dn != null) dn.shutdown();
 }
 return true;
}

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
 InputStream input = socketIn;
 try {
  IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
   socketIn, datanode.getXferAddress().getPort(),
   datanode.getDatanodeId());
  input = new BufferedInputStream(saslStreams.in,
    smallBufferSize);
   (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
} catch (Throwable t) {
 String s = datanode.getDisplayName() + ":DataXceiver error processing "
   + ((op == null) ? "unknown" : op.name()) + " operation "
   + " src: " + remoteAddress + " dst: " + localAddress;
 collectThreadLocalStates();
 LOG.debug("{}:Number of active connections is: {}",
   datanode.getDisplayName(), datanode.getXceiverCount());
 updateCurrentThreadName("Cleaning up");
 if (peer != null) {

代码示例来源:origin: com.facebook.hadoop/hadoop-core

try {
   InterDatanodeProtocol datanode;
   if (getDNRegistrationForNS(namespaceId).equals(id)) {
    LOG.info("Skipping IDNPP creation for local id " + id
      + " when recovering " + block);
   } else {
    LOG.info("Creating IDNPP for non-local id " + id + " (dnReg="
      + getDNRegistrationForNS(namespaceId) + ") when recovering "
      + block);
    datanode = DataNode.createInterDataNodeProtocolProxy(
    id, getConf(), socketTimeout);
    datanodeProxies.add(datanode);
   throwIfAfterTime(deadline);
   BlockRecoveryInfo info = datanode.startBlockRecovery(namespaceId, block);
   if (info == null) {
  stopAllProxies(datanodeProxies);
  throw new IOException("All datanodes failed: block=" + block
    + ", datanodeids=" + Arrays.asList(datanodeids));
  block.setNumBytes(minlength);
 return syncBlock(namespaceId, block, syncList, closeFile,
   datanodeProxies, deadline);
} finally {

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

private void sendLifeline() throws IOException {
  StorageReport[] reports =
    dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
  if (LOG.isDebugEnabled()) {
   LOG.debug("Sending lifeline with " + reports.length + " storage " +
        " reports from service actor: " + BPServiceActor.this);
  }
  VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
    .getVolumeFailureSummary();
  int numFailedVolumes = volumeFailureSummary != null ?
    volumeFailureSummary.getFailedStorageLocations().length : 0;
  lifelineNamenode.sendLifeline(bpRegistration,
                 reports,
                 dn.getFSDataset().getCacheCapacity(),
                 dn.getFSDataset().getCacheUsed(),
                 dn.getXmitsInProgress(),
                 dn.getXceiverCount(),
                 numFailedVolumes,
                 volumeFailureSummary);
 }
}

代码示例来源:origin: io.fabric8/fabric-hadoop

CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_DEFAULT);
InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
int tmpPort = socAddr.getPort();
storage = new DataStorage();
          conf);
NamespaceInfo nsInfo = handshake();
StartupOption startOpt = getStartupOption(conf);
assert startOpt != null : "Startup option must be set.";
  conf.getBoolean("dfs.datanode.simulateddatastorage", false);
if (simulatedFSDataset) {
  setNewStorageID(dnRegistration);
  dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
  dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
this.registerMXBean(conf); // register the MXBean for DataNode
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
String infoHost = infoSocAddr.getHostName();
int tmpInfoPort = infoSocAddr.getPort();

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

final SecureResources resources) throws IOException {
super(conf);
this.tracer = createTracer(conf);
this.tracerConfigurationManager =
  new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
 hostName = getHostName(conf);
 LOG.info("Configured hostname is {}", hostName);
 startDataNode(dataDirs, resources);
} catch (IOException ie) {
 shutdown();
 throw ie;
initOOBTimeout();
this.storageLocationChecker = storageLocationChecker;

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

/**
 * Allows submission of a disk balancer Job.
 * @param planID  - Hash value of the plan.
 * @param planVersion - Plan version, reserved for future use. We have only
 *                    version 1 now.
 * @param planFile - Plan file name
 * @param planData - Actual plan data in json format
 * @throws IOException
 */
@Override
public void submitDiskBalancerPlan(String planID, long planVersion,
  String planFile, String planData, boolean skipDateCheck)
  throws IOException {
 checkSuperuserPrivilege();
 if (getStartupOption(getConf()) != StartupOption.REGULAR) {
  throw new DiskBalancerException(
    "Datanode is in special state, e.g. Upgrade/Rollback etc."
      + " Disk balancing not permitted.",
    DiskBalancerException.Result.DATANODE_STATUS_NOT_REGULAR);
 }
 getDiskBalancer().submitPlan(planID, planVersion, planFile, planData,
     skipDateCheck);
}

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

@Override
public void onCompleteLazyPersist(String bpId, long blockId,
  long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
 try (AutoCloseableLock lock = datasetLock.acquire()) {
  ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
  targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
    + savedFiles[1].length());
  // Update metrics (ignore the metadata file size)
  datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
  datanode.getMetrics().incrRamDiskBytesLazyPersisted(savedFiles[1].length());
  datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
    Time.monotonicNow() - creationTime);
  if (LOG.isDebugEnabled()) {
   LOG.debug("LazyWriter: Finish persisting RamDisk block: "
     + " block pool Id: " + bpId + " block id: " + blockId
     + " to block file " + savedFiles[1] + " and meta file " + savedFiles[0]
     + " on target volume " + targetVolume);
  }
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

public BlockScanner(DataNode datanode) {
 this(datanode, datanode.getConf());
}

代码示例来源:origin: org.apache.hadoop/hadoop-hdfs

public static void secureMain(String args[], SecureResources resources) {
 int errorCode = 0;
 try {
  StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
  DataNode datanode = createDataNode(args, null, resources);
  if (datanode != null) {
   datanode.join();
  } else {
   errorCode = 1;
  }
 } catch (Throwable e) {
  LOG.error("Exception in secureMain", e);
  terminate(1, e);
 } finally {
  // We need to terminate the process here because either shutdown was called
  // or some disk related conditions like volumes tolerated or volumes required
  // condition was not met. Also, In secure mode, control will go to Jsvc
  // and Datanode process hangs if it does not exit.
  LOG.warn("Exiting Datanode");
  terminate(errorCode);
 }
}

相关文章

DataNode类方法