hadoop NameNodeRpcServer 源码

  • 2022-10-20
  • 浏览 (105)

haddop NameNodeRpcServer 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
import static org.apache.hadoop.util.Time.now;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.tuple.Pair;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsPartialListing;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
import org.apache.hadoop.hdfs.protocol.XAttrNotFoundException;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.DatanodeLifelineProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.RetryCache;
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.proto.RefreshAuthorizationPolicyProtocolProtos.RefreshAuthorizationPolicyProtocolService;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService;
import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import org.slf4j.Logger;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;

import javax.annotation.Nonnull;

/**
 * This class is responsible for handling all of the RPC calls to the NameNode.
 * It is created, started, and stopped by {@link NameNode}.
 */
@InterfaceAudience.Private
@VisibleForTesting
public class NameNodeRpcServer implements NamenodeProtocols {
  
  private static final Logger LOG = NameNode.LOG;
  private static final Logger stateChangeLog = NameNode.stateChangeLog;
  private static final Logger blockStateChangeLog = NameNode
      .blockStateChangeLog;
  
  // Dependencies from other parts of NN.
  protected final FSNamesystem namesystem;
  protected final NameNode nn;
  private final NameNodeMetrics metrics;

  private final RetryCache retryCache;

  private final boolean serviceAuthEnabled;

  /** The RPC server that listens to requests from DataNodes */
  private final RPC.Server serviceRpcServer;
  private final InetSocketAddress serviceRPCAddress;

  /** The RPC server that listens to lifeline requests */
  private final RPC.Server lifelineRpcServer;
  private final InetSocketAddress lifelineRPCAddress;
  
  /** The RPC server that listens to requests from clients */
  protected final RPC.Server clientRpcServer;
  protected final InetSocketAddress clientRpcAddress;
  
  private final String minimumDataNodeVersion;

  private final String defaultECPolicyName;

  // Users who can override the client info
  private final String[] ipProxyUsers;

  public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException {
    this.nn = nn;
    this.namesystem = nn.getNamesystem();
    this.retryCache = namesystem.getRetryCache();
    this.metrics = NameNode.getNameNodeMetrics();

    int handlerCount = 
      conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, 
                  DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
    ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);

    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
        ProtobufRpcEngine2.class);

    ClientNamenodeProtocolServerSideTranslatorPB 
       clientProtocolServerTranslator = 
         new ClientNamenodeProtocolServerSideTranslatorPB(this);
     BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

    int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH,
        IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
    DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator = 
        new DatanodeProtocolServerSideTranslatorPB(this, maxDataLength);
    BlockingService dnProtoPbService = DatanodeProtocolService
        .newReflectiveBlockingService(dnProtoPbTranslator);

    DatanodeLifelineProtocolServerSideTranslatorPB lifelineProtoPbTranslator =
        new DatanodeLifelineProtocolServerSideTranslatorPB(this);
    BlockingService lifelineProtoPbService = DatanodeLifelineProtocolService
        .newReflectiveBlockingService(lifelineProtoPbTranslator);

    NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = 
        new NamenodeProtocolServerSideTranslatorPB(this);
    BlockingService NNPbService = NamenodeProtocolService
          .newReflectiveBlockingService(namenodeProtocolXlator);
    
    RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator = 
        new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);
    BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService
        .newReflectiveBlockingService(refreshAuthPolicyXlator);

    RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator = 
        new RefreshUserMappingsProtocolServerSideTranslatorPB(this);
    BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
        .newReflectiveBlockingService(refreshUserMappingXlator);

    RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator = 
        new RefreshCallQueueProtocolServerSideTranslatorPB(this);
    BlockingService refreshCallQueueService = RefreshCallQueueProtocolService
        .newReflectiveBlockingService(refreshCallQueueXlator);

    GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
        new GenericRefreshProtocolServerSideTranslatorPB(this);
    BlockingService genericRefreshService = GenericRefreshProtocolService
        .newReflectiveBlockingService(genericRefreshXlator);

    GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = 
        new GetUserMappingsProtocolServerSideTranslatorPB(this);
    BlockingService getUserMappingService = GetUserMappingsProtocolService
        .newReflectiveBlockingService(getUserMappingXlator);
    
    HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator = 
        new HAServiceProtocolServerSideTranslatorPB(this);
    BlockingService haPbService = HAServiceProtocolService
        .newReflectiveBlockingService(haServiceProtocolXlator);

    ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
        = new ReconfigurationProtocolServerSideTranslatorPB(this);
    BlockingService reconfigurationPbService = ReconfigurationProtocolService
        .newReflectiveBlockingService(reconfigurationProtocolXlator);

    InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
    if (serviceRpcAddr != null) {
      String bindHost = nn.getServiceRpcServerBindHost(conf);
      if (bindHost == null) {
        bindHost = serviceRpcAddr.getHostName();
      }
      LOG.info("Service RPC server is binding to " + bindHost + ":" +
          serviceRpcAddr.getPort());

      int serviceHandlerCount =
        conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      serviceRpcServer = new RPC.Builder(conf)
          .setProtocol(
              org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
          .setInstance(clientNNPbService)
          .setBindAddress(bindHost)
          .setPort(serviceRpcAddr.getPort())
          .setNumHandlers(serviceHandlerCount)
          .setVerbose(false)
          .setSecretManager(namesystem.getDelegationTokenSecretManager())
          .build();

      // Add all the RPC protocols that the namenode implements
      DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
          serviceRpcServer);
      DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class,
          reconfigurationPbService, serviceRpcServer);
      DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
          serviceRpcServer);
      DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
          serviceRpcServer);
      DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
          refreshAuthService, serviceRpcServer);
      DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, 
          refreshUserMappingService, serviceRpcServer);
      // We support Refreshing call queue here in case the client RPC queue is full
      DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
          refreshCallQueueService, serviceRpcServer);
      DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
          genericRefreshService, serviceRpcServer);
      DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
          getUserMappingService, serviceRpcServer);

      // Update the address with the correct port
      InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress();
      serviceRPCAddress = new InetSocketAddress(
            serviceRpcAddr.getHostName(), listenAddr.getPort());
      nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
    } else {
      serviceRpcServer = null;
      serviceRPCAddress = null;
    }

    InetSocketAddress lifelineRpcAddr = nn.getLifelineRpcServerAddress(conf);
    if (lifelineRpcAddr != null) {
      RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
          ProtobufRpcEngine2.class);
      String bindHost = nn.getLifelineRpcServerBindHost(conf);
      if (bindHost == null) {
        bindHost = lifelineRpcAddr.getHostName();
      }
      LOG.info("Lifeline RPC server is binding to {}:{}", bindHost,
          lifelineRpcAddr.getPort());

      int lifelineHandlerCount = conf.getInt(
          DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY, 0);
      if (lifelineHandlerCount <= 0) {
        float lifelineHandlerRatio = conf.getFloat(
            DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY,
            DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT);
        lifelineHandlerCount = Math.max(
            (int)(handlerCount * lifelineHandlerRatio), 1);
      }
      lifelineRpcServer = new RPC.Builder(conf)
          .setProtocol(HAServiceProtocolPB.class)
          .setInstance(haPbService)
          .setBindAddress(bindHost)
          .setPort(lifelineRpcAddr.getPort())
          .setNumHandlers(lifelineHandlerCount)
          .setVerbose(false)
          .setSecretManager(namesystem.getDelegationTokenSecretManager())
          .build();

      DFSUtil.addPBProtocol(conf, DatanodeLifelineProtocolPB.class,
          lifelineProtoPbService, lifelineRpcServer);

      // Update the address with the correct port
      InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress();
      lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(),
          listenAddr.getPort());
      nn.setRpcLifelineServerAddress(conf, lifelineRPCAddress);
    } else {
      lifelineRpcServer = null;
      lifelineRPCAddress = null;
    }

    InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
    String bindHost = nn.getRpcServerBindHost(conf);
    if (bindHost == null) {
      bindHost = rpcAddr.getHostName();
    }
    LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort());

    boolean enableStateContext = conf.getBoolean(
        DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY,
        DFS_NAMENODE_STATE_CONTEXT_ENABLED_DEFAULT);
    LOG.info("Enable NameNode state context:" + enableStateContext);

    GlobalStateIdContext stateIdContext = null;
    if (enableStateContext) {
      stateIdContext = new GlobalStateIdContext(namesystem);
    }

    clientRpcServer = new RPC.Builder(conf)
        .setProtocol(
            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService)
        .setBindAddress(bindHost)
        .setPort(rpcAddr.getPort())
        .setNumHandlers(handlerCount)
        .setVerbose(false)
        .setSecretManager(namesystem.getDelegationTokenSecretManager())
        .setAlignmentContext(stateIdContext)
        .build();

    // Add all the RPC protocols that the namenode implements
    DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class,
        reconfigurationPbService, clientRpcServer);
    DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class, 
        refreshAuthService, clientRpcServer);
    DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, 
        refreshUserMappingService, clientRpcServer);
    DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
        refreshCallQueueService, clientRpcServer);
    DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
        genericRefreshService, clientRpcServer);
    DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
        getUserMappingService, clientRpcServer);

    // set service-level authorization security policy
    if (serviceAuthEnabled =
          conf.getBoolean(
            CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
      clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
      if (serviceRpcServer != null) {
        serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
      }
      if (lifelineRpcServer != null) {
        lifelineRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
      }
    }

    // The rpc-server port can be ephemeral... ensure we have the correct info
    InetSocketAddress listenAddr = clientRpcServer.getListenerAddress();
    clientRpcAddress = new InetSocketAddress(
        rpcAddr.getHostName(), listenAddr.getPort());
    nn.setRpcServerAddress(conf, clientRpcAddress);
    
    minimumDataNodeVersion = conf.get(
        DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
        DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);

    defaultECPolicyName = conf.getTrimmed(
        DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
        DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);

    // Set terse exception whose stack trace won't be logged
    clientRpcServer.addTerseExceptions(SafeModeException.class,
        FileNotFoundException.class,
        HadoopIllegalArgumentException.class,
        FileAlreadyExistsException.class,
        InvalidPathException.class,
        ParentNotDirectoryException.class,
        UnresolvedLinkException.class,
        AlreadyBeingCreatedException.class,
        QuotaExceededException.class,
        RecoveryInProgressException.class,
        AccessControlException.class,
        InvalidToken.class,
        LeaseExpiredException.class,
        NSQuotaExceededException.class,
        DSQuotaExceededException.class,
        QuotaByStorageTypeExceededException.class,
        AclException.class,
        FSLimitException.PathComponentTooLongException.class,
        FSLimitException.MaxDirectoryItemsExceededException.class,
        DisallowedDatanodeException.class,
        XAttrNotFoundException.class);

    clientRpcServer.addSuppressedLoggingExceptions(StandbyException.class,
        UnresolvedPathException.class);

    clientRpcServer.setTracer(nn.tracer);
    if (serviceRpcServer != null) {
      serviceRpcServer.setTracer(nn.tracer);
    }
    if (lifelineRpcServer != null) {
      lifelineRpcServer.setTracer(nn.tracer);
    }
    int[] auxiliaryPorts =
        conf.getInts(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY);
    if (auxiliaryPorts != null && auxiliaryPorts.length != 0) {
      for (int auxiliaryPort : auxiliaryPorts) {
        this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
      }
    }
  }

  /** Allow access to the lifeline RPC server for testing */
  @VisibleForTesting
  RPC.Server getLifelineRpcServer() {
    return lifelineRpcServer;
  }

  /** Allow access to the client RPC server for testing */
  @VisibleForTesting
  public RPC.Server getClientRpcServer() {
    return clientRpcServer;
  }
  
  /** Allow access to the service RPC server for testing */
  @VisibleForTesting
  RPC.Server getServiceRpcServer() {
    return serviceRpcServer;
  }
  
  /**
   * Start client and service RPC servers.
   */
  void start() {
    clientRpcServer.start();
    if (serviceRpcServer != null) {
      serviceRpcServer.start();      
    }
    if (lifelineRpcServer != null) {
      lifelineRpcServer.start();
    }
  }
  
  /**
   * Wait until the RPC servers have shutdown.
   */
  void join() throws InterruptedException {
    clientRpcServer.join();
    if (serviceRpcServer != null) {
      serviceRpcServer.join();      
    }
    if (lifelineRpcServer != null) {
      lifelineRpcServer.join();
    }
  }

  /**
   * Stop client and service RPC servers.
   */
  void stop() {
    if (clientRpcServer != null) {
      clientRpcServer.stop();
    }
    if (serviceRpcServer != null) {
      serviceRpcServer.stop();
    }
    if (lifelineRpcServer != null) {
      lifelineRpcServer.stop();
    }
  }

  InetSocketAddress getLifelineRpcAddress() {
    return lifelineRPCAddress;
  }

  InetSocketAddress getServiceRpcAddress() {
    return serviceRPCAddress;
  }

  @VisibleForTesting
  public InetSocketAddress getRpcAddress() {
    return clientRpcAddress;
  }

  @VisibleForTesting
  public Set<InetSocketAddress> getAuxiliaryRpcAddresses() {
    return clientRpcServer.getAuxiliaryListenerAddresses();
  }

  private static UserGroupInformation getRemoteUser() throws IOException {
    return NameNode.getRemoteUser();
  }


  /////////////////////////////////////////////////////
  // NamenodeProtocol
  /////////////////////////////////////////////////////
  @Override // NamenodeProtocol
  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
      minBlockSize, long timeInterval)
      throws IOException {
    String operationName = "getBlocks";
    if(size <= 0) {
      throw new IllegalArgumentException(
          "Unexpected not positive size: "+size);
    }
    if(minBlockSize < 0) {
      throw new IllegalArgumentException(
          "Unexpected not positive size: "+size);
    }
    checkNNStartup();
    namesystem.checkSuperuserPrivilege(operationName);
    namesystem.checkNameNodeSafeMode("Cannot execute getBlocks");
    return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval);
  }

  @Override // NamenodeProtocol
  public ExportedBlockKeys getBlockKeys() throws IOException {
    String operationName = "getBlockKeys";
    checkNNStartup();
    namesystem.checkSuperuserPrivilege(operationName);
    return namesystem.getBlockManager().getBlockKeys();
  }

  @Override // NamenodeProtocol
  public void errorReport(NamenodeRegistration registration,
                          int errorCode, 
                          String msg) throws IOException {
    String operationName = "errorReport";
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.UNCHECKED);
    namesystem.checkSuperuserPrivilege(operationName);
    verifyRequest(registration);
    LOG.info("Error report from " + registration + ": " + msg);
    if (errorCode == FATAL) {
      namesystem.releaseBackupNode(registration);
    }
  }

  @Override // NamenodeProtocol
  public NamenodeRegistration registerSubordinateNamenode(
      NamenodeRegistration registration) throws IOException {
    String operationName = "registerSubordinateNamenode";
    checkNNStartup();
    namesystem.checkSuperuserPrivilege(operationName);
    verifyLayoutVersion(registration.getVersion());
    NamenodeRegistration myRegistration = nn.setRegistration();
    namesystem.registerBackupNode(registration, myRegistration);
    return myRegistration;
  }

  @Override // NamenodeProtocol
  public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
      throws IOException {
    String operationName = "startCheckpoint";
    checkNNStartup();
    namesystem.checkSuperuserPrivilege(operationName);
    verifyRequest(registration);
    if(!nn.isRole(NamenodeRole.NAMENODE))
      throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");

    CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return (NamenodeCommand) cacheEntry.getPayload();
    }
    NamenodeCommand ret = null;
    try {
      ret = namesystem.startCheckpoint(registration, nn.setRegistration());
    } finally {
      RetryCache.setState(cacheEntry, ret != null, ret);
    }
    return ret;
  }

  /**
   * Return the current CacheEntry.
   */
  private CacheEntry getCacheEntry() {
    Pair<byte[], Integer> clientInfo =
        NameNode.getClientIdAndCallId(this.ipProxyUsers);
    return RetryCache.waitForCompletion(
        retryCache, clientInfo.getLeft(), clientInfo.getRight());
  }

  /**
   * Return the current CacheEntryWithPayload.
   */
  private CacheEntryWithPayload getCacheEntryWithPayload(Object payload) {
    Pair<byte[], Integer> clientInfo =
        NameNode.getClientIdAndCallId(this.ipProxyUsers);
    return RetryCache.waitForCompletion(retryCache, payload,
        clientInfo.getLeft(), clientInfo.getRight());
  }

  @Override // NamenodeProtocol
  public void endCheckpoint(NamenodeRegistration registration,
                            CheckpointSignature sig) throws IOException {
    String operationName = "endCheckpoint";
    checkNNStartup();
    namesystem.checkSuperuserPrivilege(operationName);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return; // Return previous response
    }
    boolean success = false;
    try {
      namesystem.endCheckpoint(registration, sig);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ClientProtocol
  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
      throws IOException {
    checkNNStartup();
    return namesystem.getDelegationToken(renewer);
  }

  @Override // ClientProtocol
  public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
      throws InvalidToken, IOException {
    checkNNStartup();
    return namesystem.renewDelegationToken(token);
  }

  @Override // ClientProtocol
  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
      throws IOException {
    checkNNStartup();
    namesystem.cancelDelegationToken(token);
  }
  
  @Override // ClientProtocol
  public LocatedBlocks getBlockLocations(String src, 
                                          long offset, 
                                          long length) 
      throws IOException {
    checkNNStartup();
    metrics.incrGetBlockLocations();
    LocatedBlocks locatedBlocks =
        namesystem.getBlockLocations(getClientMachine(), src, offset, length);
    return locatedBlocks;
  }
  
  @Override // ClientProtocol
  public FsServerDefaults getServerDefaults() throws IOException {
    checkNNStartup();
    return namesystem.getServerDefaults();
  }

  @Override // ClientProtocol
  public HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable<CreateFlag> flag,
      boolean createParent, short replication, long blockSize,
      CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
      String storagePolicy)
      throws IOException {
    checkNNStartup();
    String clientMachine = getClientMachine();
    stateChangeLog.debug("*DIR* NameNode.create: file {} for {} at {}.",
        src, clientName, clientMachine);
    if (!checkPathLength(src)) {
      throw new IOException("create: Pathname too long.  Limit "
          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return (HdfsFileStatus) cacheEntry.getPayload();
    }

    HdfsFileStatus status = null;
    try {
      PermissionStatus perm = new PermissionStatus(getRemoteUser()
          .getShortUserName(), null, masked);
      status = namesystem.startFile(src, perm, clientName, clientMachine,
          flag.get(), createParent, replication, blockSize, supportedVersions,
          ecPolicyName, storagePolicy, cacheEntry != null);
    } finally {
      RetryCache.setState(cacheEntry, status != null, status);
    }

    metrics.incrFilesCreated();
    metrics.incrCreateFileOps();
    return status;
  }

  @Override // ClientProtocol
  public LastBlockWithStatus append(String src, String clientName,
      EnumSetWritable<CreateFlag> flag) throws IOException {
    checkNNStartup();
    String clientMachine = getClientMachine();
    stateChangeLog.debug("*DIR* NameNode.append: file {} for {} at {}.",
        src, clientName, clientMachine);
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return (LastBlockWithStatus) cacheEntry.getPayload();
    }

    LastBlockWithStatus info = null;
    boolean success = false;
    try {
      info = namesystem.appendFile(src, clientName, clientMachine, flag.get(),
          cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success, info);
    }
    metrics.incrFilesAppended();
    return info;
  }

  @Override // ClientProtocol
  public boolean recoverLease(String src, String clientName) throws IOException {
    checkNNStartup();
    String clientMachine = getClientMachine();
    return namesystem.recoverLease(src, clientName, clientMachine);
  }

  @Override // ClientProtocol
  public boolean setReplication(String src, short replication) 
    throws IOException {
    checkNNStartup();
    return namesystem.setReplication(src, replication);
  }

  @Override
  public void unsetStoragePolicy(String src)
      throws IOException {
    checkNNStartup();
    stateChangeLog.debug("*DIR* NameNode.unsetStoragePolicy for path: {}", src);
    namesystem.unsetStoragePolicy(src);
  }

  @Override
  public void setStoragePolicy(String src, String policyName)
      throws IOException {
    checkNNStartup();
    stateChangeLog.debug("*DIR* NameNode.setStoragePolicy for path: {}, policyName: {}",
        src, policyName);
    namesystem.setStoragePolicy(src, policyName);
  }

  @Override
  public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
    checkNNStartup();
    stateChangeLog.debug("*DIR* NameNode.getStoragePolicy for path: {}", path);
    return namesystem.getStoragePolicy(path);
  }

  @Override
  public BlockStoragePolicy[] getStoragePolicies() throws IOException {
    checkNNStartup();
    return namesystem.getStoragePolicies();
  }

  @Override // ClientProtocol
  public void setPermission(String src, FsPermission permissions)
      throws IOException {
    checkNNStartup();
    namesystem.setPermission(src, permissions);
  }

  @Override // ClientProtocol
  public void setOwner(String src, String username, String groupname)
      throws IOException {
    checkNNStartup();
    namesystem.setOwner(src, username, groupname);
  }
  
  @Override
  public LocatedBlock addBlock(String src, String clientName,
      ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
      String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
      throws IOException {
    checkNNStartup();
    LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
        clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
    if (locatedBlock != null) {
      metrics.incrAddBlockOps();
    }
    return locatedBlock;
  }

  @Override // ClientProtocol
  public LocatedBlock getAdditionalDatanode(final String src,
      final long fileId, final ExtendedBlock blk,
      final DatanodeInfo[] existings, final String[] existingStorageIDs,
      final DatanodeInfo[] excludes,
      final int numAdditionalNodes, final String clientName
      ) throws IOException {
    checkNNStartup();
    if (LOG.isDebugEnabled()) {
      LOG.debug("getAdditionalDatanode: src={}, fileId={}, blk={}, existings={}, excludes={}"
          + ", numAdditionalNodes={}, clientName={}", src, fileId, blk, Arrays.asList(existings),
          Arrays.asList(excludes), numAdditionalNodes, clientName);
    }

    metrics.incrGetAdditionalDatanodeOps();

    Set<Node> excludeSet = null;
    if (excludes != null) {
      excludeSet = new HashSet<Node>(excludes.length);
      for (Node node : excludes) {
        excludeSet.add(node);
      }
    }
    LocatedBlock locatedBlock = namesystem.getAdditionalDatanode(src, fileId,
        blk, existings, existingStorageIDs, excludeSet, numAdditionalNodes,
        clientName);
    return locatedBlock;
  }
  /**
   * The client needs to give up on the block.
   */
  @Override // ClientProtocol
  public void abandonBlock(ExtendedBlock b, long fileId, String src,
        String holder) throws IOException {
    checkNNStartup();
    namesystem.abandonBlock(b, fileId, src, holder);
  }

  @Override // ClientProtocol
  public boolean complete(String src, String clientName,
                          ExtendedBlock last,  long fileId)
      throws IOException {
    checkNNStartup();
    return namesystem.completeFile(src, clientName, last, fileId);
  }

  /**
   * The client has detected an error on the specified located blocks 
   * and is reporting them to the server.  For now, the namenode will 
   * mark the block as corrupt.  In the future we might 
   * check the blocks are actually corrupt. 
   */
  @Override // ClientProtocol, DatanodeProtocol
  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
    checkNNStartup();
    namesystem.reportBadBlocks(blocks);
  }

  @Override // ClientProtocol
  public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
      throws IOException {
    checkNNStartup();
    return namesystem.bumpBlockGenerationStamp(block, clientName);
  }


  @Override // ClientProtocol
  public void updatePipeline(String clientName, ExtendedBlock oldBlock,
      ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
      throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return; // Return previous response
    }

    boolean success = false;
    try {
      namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes,
          newStorageIDs, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }
  
  @Override // DatanodeProtocol
  public void commitBlockSynchronization(ExtendedBlock block,
      long newgenerationstamp, long newlength,
      boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
      String[] newtargetstorages)
      throws IOException {
    checkNNStartup();
    namesystem.commitBlockSynchronization(block, newgenerationstamp,
        newlength, closeFile, deleteblock, newtargets, newtargetstorages);
  }
  
  @Override // ClientProtocol
  public long getPreferredBlockSize(String filename) 
      throws IOException {
    checkNNStartup();
    return namesystem.getPreferredBlockSize(filename);
  }
    
  @Deprecated
  @Override // ClientProtocol
  public boolean rename(String src, String dst) throws IOException {
    checkNNStartup();
    stateChangeLog.debug("*DIR* NameNode.rename: {} to {}.", src, dst);
    if (!checkPathLength(dst)) {
      throw new IOException("rename: Pathname too long.  Limit "
          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return true; // Return previous response
    }

    boolean ret = false;
    try {
      ret = namesystem.renameTo(src, dst, cacheEntry != null);
    } finally {
      RetryCache.setState(cacheEntry, ret);
    }
    if (ret) {
      metrics.incrFilesRenamed();
    }
    return ret;
  }
  
  @Override // ClientProtocol
  public void concat(String trg, String[] src) throws IOException {
    checkNNStartup();
    if (stateChangeLog.isDebugEnabled()) {
      stateChangeLog.debug("*DIR* NameNode.concat: src path {} to target path {}",
          Arrays.toString(src), trg);
    }
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return; // Return previous response
    }
    boolean success = false;

    try {
      namesystem.concat(trg, src, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }
  
  @Override // ClientProtocol
  public void rename2(String src, String dst, Options.Rename... options)
      throws IOException {
    checkNNStartup();
    stateChangeLog.debug("*DIR* NameNode.rename: {} to {}.", src, dst);
    if (!checkPathLength(dst)) {
      throw new IOException("rename: Pathname too long.  Limit "
          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return; // Return previous response
    }
    boolean success = false;
    try {
      namesystem.renameTo(src, dst, cacheEntry != null, options);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
    metrics.incrFilesRenamed();
  }

  @Override // ClientProtocol
  public boolean truncate(String src, long newLength, String clientName)
      throws IOException {
    checkNNStartup();
    stateChangeLog.debug("*DIR* NameNode.truncate: {} to {}", src, newLength);
    String clientMachine = getClientMachine();
    try {
      return namesystem.truncate(
          src, newLength, clientName, clientMachine, now());
    } finally {
      metrics.incrFilesTruncated();
    }
  }

  @Override // ClientProtocol
  public boolean delete(String src, boolean recursive) throws IOException {
    checkNNStartup();
    stateChangeLog.debug("*DIR* Namenode.delete: src={}, recursive={}.", src, recursive);
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return true; // Return previous response
    }

    boolean ret = false;
    try {
      ret = namesystem.delete(src, recursive, cacheEntry != null);
    } finally {
      RetryCache.setState(cacheEntry, ret);
    }
    if (ret) 
      metrics.incrDeleteFileOps();
    return ret;
  }

  /**
   * Check path length does not exceed maximum.  Returns true if
   * length and depth are okay.  Returns false if length is too long 
   * or depth is too great.
   */
  private boolean checkPathLength(String src) {
    Path srcPath = new Path(src);
    return (src.length() <= MAX_PATH_LENGTH &&
            srcPath.depth() <= MAX_PATH_DEPTH);
  }
    
  @Override // ClientProtocol
  public boolean mkdirs(String src, FsPermission masked, boolean createParent)
      throws IOException {
    checkNNStartup();
    stateChangeLog.debug("*DIR* NameNode.mkdirs: {}.", src);
    if (!checkPathLength(src)) {
      throw new IOException("mkdirs: Pathname too long.  Limit " 
                            + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    return namesystem.mkdirs(src,
        new PermissionStatus(getRemoteUser().getShortUserName(),
            null, masked), createParent);
  }

  @Override // ClientProtocol
  public void renewLease(String clientName, List<String> namespaces)
      throws IOException {
    if (namespaces != null && namespaces.size() > 0) {
      LOG.warn("namespaces({}) should be null or empty "
          + "on NameNode side, please check it.", namespaces);
      throw new IOException("namespaces(" + namespaces
          + ") should be null or empty");
    }
    checkNNStartup();
    namesystem.renewLease(clientName);        
  }

  @Override // ClientProtocol
  public DirectoryListing getListing(String src, byte[] startAfter,
      boolean needLocation) throws IOException {
    checkNNStartup();
    DirectoryListing files = namesystem.getListing(
        src, startAfter, needLocation);
    if (files != null) {
      metrics.incrGetListingOps();
      metrics.incrFilesInGetListingOps(files.getPartialListing().length);
    }
    return files;
  }

  @Override // ClientProtocol
  public BatchedDirectoryListing getBatchedListing(
      String[] srcs,
      byte[] startAfter,
      boolean needLocation) throws IOException {
    checkNNStartup();
    BatchedDirectoryListing batchedListing =
        namesystem.getBatchedListing(srcs, startAfter, needLocation);
    if (batchedListing != null) {
      metrics.incrGetListingOps();
      int numEntries = 0;
      for (HdfsPartialListing partial : batchedListing.getListings()) {
        if (partial.getPartialListing() != null) {
          numEntries += partial.getPartialListing().size();
        }
      }
      metrics.incrFilesInGetListingOps(numEntries);
    }
    return batchedListing;
  }

  @Override // ClientProtocol
  public HdfsFileStatus getFileInfo(String src) throws IOException {
    checkNNStartup();
    metrics.incrFileInfoOps();
    return namesystem.getFileInfo(src, true, false, false);
  }

  @Override // ClientProtocol
  public HdfsLocatedFileStatus getLocatedFileInfo(String src,
      boolean needBlockToken) throws IOException {
    checkNNStartup();
    if (needBlockToken) {
      metrics.incrGetBlockLocations();
    } else {
      metrics.incrFileInfoOps();
    }
    return (HdfsLocatedFileStatus)
        namesystem.getFileInfo(src, true, true, needBlockToken);
  }

  @Override // ClientProtocol
  public boolean isFileClosed(String src) throws IOException{
    checkNNStartup();
    return namesystem.isFileClosed(src);
  }
  
  @Override // ClientProtocol
  public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
    checkNNStartup();
    metrics.incrFileInfoOps();
    return namesystem.getFileInfo(src, false, false, false);
  }
  
  @Override // ClientProtocol
  public long[] getStats() throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.READ);
    return namesystem.getStats();
  }

  @Override // ClientProtocol
  public ReplicatedBlockStats getReplicatedBlockStats() throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.READ);
    return namesystem.getReplicatedBlockStats();
  }

  @Override // ClientProtocol
  public ECBlockGroupStats getECBlockGroupStats() throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.READ);
    return namesystem.getECBlockGroupStats();
  }

  @Override // ClientProtocol
  public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
  throws IOException {
    checkNNStartup();
    DatanodeInfo results[] = namesystem.datanodeReport(type);
    return results;
  }

  @Override // ClientProtocol
  public DatanodeStorageReport[] getDatanodeStorageReport(
      DatanodeReportType type) throws IOException {
    checkNNStartup();
    final DatanodeStorageReport[] reports = namesystem.getDatanodeStorageReport(type);
    return reports;
  }

  @Override // ClientProtocol
  public boolean setSafeMode(SafeModeAction action, boolean isChecked)
      throws IOException {
    checkNNStartup();
    OperationCategory opCategory = OperationCategory.UNCHECKED;
    if (isChecked) {
      if (action == SafeModeAction.SAFEMODE_GET) {
        opCategory = OperationCategory.READ;
      } else {
        opCategory = OperationCategory.WRITE;
      }
    }
    namesystem.checkOperation(opCategory);
    return namesystem.setSafeMode(action);
  }

  @Override // ClientProtocol
  public boolean restoreFailedStorage(String arg) throws IOException { 
    checkNNStartup();
    return namesystem.restoreFailedStorage(arg);
  }

  @Override // ClientProtocol
  public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
    checkNNStartup();
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return true; // Return previous response
    }
    boolean success = false;
    try {
      namesystem.saveNamespace(timeWindow, txGap);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
    return true;
  }
  
  @Override // ClientProtocol
  public long rollEdits() throws AccessControlException, IOException {
    checkNNStartup();
    CheckpointSignature sig = namesystem.rollEditLog();
    return sig.getCurSegmentTxId();
  }

  @Override // ClientProtocol
  public void refreshNodes() throws IOException {
    checkNNStartup();
    namesystem.refreshNodes();
  }

  @Override // NamenodeProtocol
  public long getTransactionID() throws IOException {
    String operationName = "getTransactionID";
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.UNCHECKED);
    namesystem.checkSuperuserPrivilege(operationName);
    return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
  }
  
  @Override // NamenodeProtocol
  public long getMostRecentCheckpointTxId() throws IOException {
    String operationName = "getMostRecentCheckpointTxId";
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.UNCHECKED);
    namesystem.checkSuperuserPrivilege(operationName);
    return namesystem.getFSImage().getMostRecentCheckpointTxId();
  }
  
  @Override // NamenodeProtocol
  public CheckpointSignature rollEditLog() throws IOException {
    checkNNStartup();
    return namesystem.rollEditLog();
  }
  
  @Override // NamenodeProtocol
  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
      throws IOException {
    String operationName = "getEditLogManifest";
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.READ);
    namesystem.checkSuperuserPrivilege(operationName);
    return namesystem.getEditLog().getEditLogManifest(sinceTxId);
  }

  @Override // NamenodeProtocol
  public boolean isUpgradeFinalized() throws IOException {
    String operationName = "isUpgradeFinalized";
    checkNNStartup();
    namesystem.checkSuperuserPrivilege(operationName);
    return namesystem.isUpgradeFinalized();
  }

  @Override // NamenodeProtocol
  public boolean isRollingUpgrade() throws IOException {
    String operationName = "isRollingUpgrade";
    checkNNStartup();
    namesystem.checkSuperuserPrivilege(operationName);
    return namesystem.isRollingUpgrade();
  }
    
  @Override // ClientProtocol
  public void finalizeUpgrade() throws IOException {
    checkNNStartup();
    namesystem.finalizeUpgrade();
  }

  @Override // ClientProtocol
  public boolean upgradeStatus() throws IOException {
    checkNNStartup();
    return namesystem.isUpgradeFinalized();
  }

  @Override // ClientProtocol
  public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) throws IOException {
    checkNNStartup();
    LOG.info("rollingUpgrade " + action);
    switch(action) {
    case QUERY:
      return namesystem.queryRollingUpgrade();
    case PREPARE:
      return namesystem.startRollingUpgrade();
    case FINALIZE:
      return namesystem.finalizeRollingUpgrade();
    default:
      throw new UnsupportedActionException(action + " is not yet supported.");
    }
  }

  @Override // ClientProtocol
  public void metaSave(String filename) throws IOException {
    checkNNStartup();
    namesystem.metaSave(filename);
  }

  @Deprecated
  @Override // ClientProtocol
  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
      throws IOException {
    return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
        OpenFilesIterator.FILTER_PATH_DEFAULT);
  }

  @Override // ClientProtocol
  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
      EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
    checkNNStartup();
    return namesystem.listOpenFiles(prevId, openFilesTypes, path);
  }

  @Override // ClientProtocol
  public void msync() throws IOException {
    // Check for write access to ensure that msync only happens on active
    namesystem.checkOperation(OperationCategory.WRITE);
  }

  @Override // ClientProtocol
  public HAServiceState getHAServiceState() throws IOException {
    checkNNStartup();
    return nn.getServiceStatus().getState();
  }

  @Override // ClientProtocol
  public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
      throws IOException {
    checkNNStartup();
    String[] cookieTab = new String[] { cookie };
    Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
      namesystem.listCorruptFileBlocks(path, cookieTab);

    String[] files = new String[fbs.size()];
    int i = 0;
    for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
      files[i++] = fb.path;
    }
    return new CorruptFileBlocks(files, cookieTab[0]);
  }

  /**
   * Tell all datanodes to use a new, non-persistent bandwidth value for
   * dfs.datanode.balance.bandwidthPerSec.
   * @param bandwidth Balancer bandwidth in bytes per second for all datanodes.
   * @throws IOException
   */
  @Override // ClientProtocol
  public void setBalancerBandwidth(long bandwidth) throws IOException {
    if (bandwidth > HdfsServerConstants.MAX_BANDWIDTH_PER_DATANODE) {
      throw new IllegalArgumentException(
          "Bandwidth should not exceed maximum limit "
              + HdfsServerConstants.MAX_BANDWIDTH_PER_DATANODE
              + " bytes per second");
    }
    checkNNStartup();
    namesystem.setBalancerBandwidth(bandwidth);
  }
  
  @Override // ClientProtocol
  public ContentSummary getContentSummary(String path) throws IOException {
    checkNNStartup();
    return namesystem.getContentSummary(path);
  }

  @Override // ClientProtocol
  public QuotaUsage getQuotaUsage(String path) throws IOException {
    checkNNStartup();
    return namesystem.getQuotaUsage(path);
  }

  @Override // ClientProtocol
  public void satisfyStoragePolicy(String src) throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return; // Return previous response
    }
    boolean success = false;
    try {
      namesystem.satisfyStoragePolicy(src, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override
  public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
    checkNNStartup();
    return namesystem.slowDataNodesReport();
  }

  @Override // ClientProtocol
  public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
                       StorageType type)
      throws IOException {
    checkNNStartup();
    namesystem.setQuota(path, namespaceQuota, storagespaceQuota, type);
  }
  
  @Override // ClientProtocol
  public void fsync(String src, long fileId, String clientName,
                    long lastBlockLength)
      throws IOException {
    checkNNStartup();
    namesystem.fsync(src, fileId, clientName, lastBlockLength);
  }

  @Override // ClientProtocol
  public void setTimes(String src, long mtime, long atime) 
      throws IOException {
    checkNNStartup();
    namesystem.setTimes(src, mtime, atime);
  }

  @Override // ClientProtocol
  public void createSymlink(String target, String link, FsPermission dirPerms,
      boolean createParent) throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return; // Return previous response
    }

    /* We enforce the MAX_PATH_LENGTH limit even though a symlink target
     * URI may refer to a non-HDFS file system. 
     */
    if (!checkPathLength(link)) {
      throw new IOException("Symlink path exceeds " + MAX_PATH_LENGTH +
                            " character limit");
                            
    }

    final UserGroupInformation ugi = getRemoteUser();

    boolean success = false;
    try {
      PermissionStatus perm = new PermissionStatus(ugi.getShortUserName(),
          null, dirPerms);
      namesystem.createSymlink(target, link, perm, createParent,
          cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ClientProtocol
  public String getLinkTarget(String path) throws IOException {
    checkNNStartup();
    metrics.incrGetLinkTargetOps();
    HdfsFileStatus stat = null;
    try {
      stat = namesystem.getFileInfo(path, false, false, false);
    } catch (UnresolvedPathException e) {
      return e.getResolvedPath().toString();
    } catch (UnresolvedLinkException e) {
      // The NameNode should only throw an UnresolvedPathException
      throw new AssertionError("UnresolvedLinkException thrown");
    }
    if (stat == null) {
      throw new FileNotFoundException("File does not exist: " + path);
    } else if (!stat.isSymlink()) {
      throw new IOException("Path " + path + " is not a symbolic link");
    }
    return DFSUtilClient.bytes2String(stat.getSymlinkInBytes());
  }


  @Override // DatanodeProtocol
  public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
      throws IOException {
    checkNNStartup();
    verifySoftwareVersion(nodeReg);
    namesystem.registerDatanode(nodeReg);
    return nodeReg;
  }

  @Override // DatanodeProtocol
  public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
      StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
      int xmitsInProgress, int xceiverCount,
      int failedVolumes, VolumeFailureSummary volumeFailureSummary,
      boolean requestFullBlockReportLease,
      @Nonnull SlowPeerReports slowPeers,
      @Nonnull SlowDiskReports slowDisks)
          throws IOException {
    checkNNStartup();
    verifyRequest(nodeReg);
    return namesystem.handleHeartbeat(nodeReg, report,
        dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
        failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
        slowPeers, slowDisks);
  }

  @Override // DatanodeProtocol
  public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
        String poolId, final StorageBlockReport[] reports,
        final BlockReportContext context) throws IOException {
    checkNNStartup();
    verifyRequest(nodeReg);
    blockStateChangeLog.debug("*BLOCK* NameNode.blockReport: from {}, reports.length={}.",
        nodeReg, reports.length);
    final BlockManager bm = namesystem.getBlockManager(); 
    boolean noStaleStorages = false;
    try {
      if (bm.checkBlockReportLease(context, nodeReg)) {
        for (int r = 0; r < reports.length; r++) {
          final BlockListAsLongs blocks = reports[r].getBlocks();
          //
          // BlockManager.processReport accumulates information of prior calls
          // for the same node and storage, so the value returned by the last
          // call of this loop is the final updated value for noStaleStorage.
          //
          final int index = r;
          noStaleStorages = bm.runBlockOp(() ->
            bm.processReport(nodeReg, reports[index].getStorage(),
                blocks, context));
        }
      }
    } catch (UnregisteredNodeException une) {
      LOG.warn("Datanode {} is attempting to report but not register yet.",
          nodeReg);
      return RegisterCommand.REGISTER;
    }
    bm.removeBRLeaseIfNeeded(nodeReg, context);

    BlockManagerFaultInjector.getInstance().
        incomingBlockReportRpc(nodeReg, context);

    if (nn.getFSImage().isUpgradeFinalized() &&
        !namesystem.isRollingUpgrade() &&
        nn.isActiveState() &&
        noStaleStorages) {
      return new FinalizeCommand(poolId);
    }

    return null;
  }

  @Override
  public DatanodeCommand cacheReport(DatanodeRegistration nodeReg,
      String poolId, List<Long> blockIds) throws IOException {
    checkNNStartup();
    verifyRequest(nodeReg);
    blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: from {} {} blocks",
        nodeReg, blockIds.size());
    namesystem.getCacheManager().processCacheReport(nodeReg, blockIds);
    return null;
  }

  @Override // DatanodeProtocol
  public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg,
      String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
          throws IOException {
    checkNNStartup();
    verifyRequest(nodeReg);
    metrics.incrBlockReceivedAndDeletedOps();
    blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: from {} {} blocks.",
        nodeReg, receivedAndDeletedBlocks.length);
    final BlockManager bm = namesystem.getBlockManager();
    for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
      bm.enqueueBlockOp(new Runnable() {
        @Override
        public void run() {
          try {
            namesystem.processIncrementalBlockReport(nodeReg, r);
          } catch (Exception ex) {
            // usually because the node is unregistered/dead.  next heartbeat
            // will correct the problem
            blockStateChangeLog.error(
                "*BLOCK* NameNode.blockReceivedAndDeleted: "
                    + "failed from " + nodeReg + ": " + ex.getMessage());
          }
        }
      });
    }
  }

  @Override // DatanodeProtocol
  public void errorReport(DatanodeRegistration nodeReg,
                          int errorCode, String msg) throws IOException { 
    checkNNStartup();
    String dnName = 
       (nodeReg == null) ? "Unknown DataNode" : nodeReg.toString();

    if (errorCode == DatanodeProtocol.NOTIFY) {
      LOG.info("Error report from " + dnName + ": " + msg);
      return;
    }
    verifyRequest(nodeReg);

    if (errorCode == DatanodeProtocol.DISK_ERROR) {
      LOG.warn("Disk error on " + dnName + ": " + msg);
    } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
      LOG.warn("Fatal disk error on " + dnName + ": " + msg);
      namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg);            
    } else {
      LOG.info("Error report from " + dnName + ": " + msg);
    }
  }
    
  @Override // DatanodeProtocol, NamenodeProtocol
  public NamespaceInfo versionRequest() throws IOException {
    checkNNStartup();
    return namesystem.getNamespaceInfo();
  }

  @Override // DatanodeLifelineProtocol
  public void sendLifeline(DatanodeRegistration nodeReg, StorageReport[] report,
      long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress,
      int xceiverCount, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary) throws IOException {
    checkNNStartup();
    verifyRequest(nodeReg);
    namesystem.handleLifeline(nodeReg, report, dnCacheCapacity, dnCacheUsed,
        xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary);
  }

  /** 
   * Verifies the given registration.
   * 
   * @param nodeReg node registration
   * @throws UnregisteredNodeException if the registration is invalid
   */
  private void verifyRequest(NodeRegistration nodeReg) throws IOException {
    // verify registration ID
    final String id = nodeReg.getRegistrationID();
    final String expectedID = namesystem.getRegistrationID();
    if (!expectedID.equals(id)) {
      LOG.warn("Registration IDs mismatched: the "
          + nodeReg.getClass().getSimpleName() + " ID is " + id
          + " but the expected ID is " + expectedID);
       throw new UnregisteredNodeException(nodeReg);
    }
  }

  @Override // RefreshAuthorizationPolicyProtocol
  public void refreshServiceAcl() throws IOException {
    checkNNStartup();
    if (!serviceAuthEnabled) {
      throw new AuthorizationException("Service Level Authorization not enabled!");
    }

    this.clientRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
    if (this.serviceRpcServer != null) {
      this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
    }
    namesystem.logAuditEvent(true, "refreshServiceAcl", null);
  }

  @Override // RefreshAuthorizationPolicyProtocol
  public void refreshUserToGroupsMappings() throws IOException {
    LOG.info("Refreshing all user-to-groups mappings. Requested by user: " +
        getRemoteUser().getShortUserName());
    Groups.getUserToGroupsMappingService().refresh();
    namesystem.logAuditEvent(true, "refreshUserToGroupsMappings", null);
  }

  @Override // RefreshAuthorizationPolicyProtocol
  public void refreshSuperUserGroupsConfiguration() throws IOException {
    LOG.info("Refreshing SuperUser proxy group mapping list ");

    ProxyUsers.refreshSuperUserGroupsConfiguration();
    namesystem.logAuditEvent(true, "refreshSuperUserGroupsConfiguration", null);
  }

  @Override // RefreshCallQueueProtocol
  public void refreshCallQueue() throws IOException {
    LOG.info("Refreshing call queue.");

    Configuration conf = new Configuration();
    clientRpcServer.refreshCallQueue(conf);
    if (this.serviceRpcServer != null) {
      serviceRpcServer.refreshCallQueue(conf);
    }
    namesystem.logAuditEvent(true, "refreshCallQueue", null);
  }

  @Override // GenericRefreshProtocol
  public Collection<RefreshResponse> refresh(String identifier, String[] args) {
    // Let the registry handle as needed
    return RefreshRegistry.defaultRegistry().dispatch(identifier, args);
  }
  
  @Override // GetUserMappingsProtocol
  public String[] getGroupsForUser(String user) throws IOException {
    LOG.debug("Getting groups for user {}", user);
    return UserGroupInformation.createRemoteUser(user).getGroupNames();
  }

  @Override // HAServiceProtocol
  public synchronized void monitorHealth() throws HealthCheckFailedException,
      AccessControlException, IOException {
    checkNNStartup();
    nn.monitorHealth();
  }
  
  @Override // HAServiceProtocol
  public synchronized void transitionToActive(StateChangeRequestInfo req) 
      throws ServiceFailedException, AccessControlException, IOException {
    checkNNStartup();
    nn.checkHaStateChange(req);
    nn.transitionToActive();
  }

  @Override // HAServiceProtocol
  public synchronized void transitionToStandby(StateChangeRequestInfo req)
      throws ServiceFailedException, AccessControlException, IOException {
    checkNNStartup();
    // This is to eliminate any race condition between manual transition of
    // namenode into Observer, and ZKFC auto failover election, when the
    // namenode has already participated in the
    // ZKFC election, before transition to Observer state as Standby Node.
    // For more details check : HDFS-14961.
    if (nn.getState().equals(NameNode.OBSERVER_STATE.toString())
        && req.getSource() == RequestSource.REQUEST_BY_ZKFC) {
      throw new AccessControlException(
          "Request from ZK failover controller at " + Server.getRemoteAddress()
              + " denied since the namenode is in Observer state.");
    }
    nn.checkHaStateChange(req);
    nn.transitionToStandby();
  }

  @Override // HAServiceProtocol
  public synchronized void transitionToObserver(StateChangeRequestInfo req)
      throws ServiceFailedException, AccessControlException, IOException {
    checkNNStartup();
    nn.checkHaStateChange(req);
    nn.transitionToObserver();
  }

  @Override // HAServiceProtocol
  public synchronized HAServiceStatus getServiceStatus() 
      throws AccessControlException, ServiceFailedException, IOException {
    checkNNStartup();
    return nn.getServiceStatus();
  }

  /**
   * Verify version.
   * @param version layout version
   * @throws IOException on layout version mismatch
   */
  void verifyLayoutVersion(int version) throws IOException {
    if (version != HdfsServerConstants.NAMENODE_LAYOUT_VERSION)
      throw new IncorrectVersionException(
          HdfsServerConstants.NAMENODE_LAYOUT_VERSION, version, "data node");
  }
  
  private void verifySoftwareVersion(DatanodeRegistration dnReg)
      throws IncorrectVersionException {
    String dnVersion = dnReg.getSoftwareVersion();
    if (VersionUtil.compareVersions(dnVersion, minimumDataNodeVersion) < 0) {
      IncorrectVersionException ive = new IncorrectVersionException(
          minimumDataNodeVersion, dnVersion, "DataNode", "NameNode");
      LOG.warn(ive.getMessage() + " DN: " + dnReg);
      throw ive;
    }
    String nnVersion = VersionInfo.getVersion();
    if (!dnVersion.equals(nnVersion)) {
      String messagePrefix = "Reported DataNode version '" + dnVersion +
          "' of DN " + dnReg + " does not match NameNode version '" +
          nnVersion + "'";
      long nnCTime = nn.getFSImage().getStorage().getCTime();
      long dnCTime = dnReg.getStorageInfo().getCTime();
      if (nnCTime != dnCTime) {
        IncorrectVersionException ive = new IncorrectVersionException(
            messagePrefix + " and CTime of DN ('" + dnCTime +
            "') does not match CTime of NN ('" + nnCTime + "')");
        LOG.warn(ive.toString(), ive);
        throw ive;
      } else {
        LOG.info(messagePrefix +
            ". Note: This is normal during a rolling upgrade.");
      }
    }
  }

  /**
   * Get the actual client's machine.
   */
  private String getClientMachine() {
    return NameNode.getClientMachine(this.ipProxyUsers);
  }

  @Override
  public DataEncryptionKey getDataEncryptionKey() throws IOException {
    checkNNStartup();
    return namesystem.getBlockManager().generateDataEncryptionKey();
  }

  @Override
  public String createSnapshot(String snapshotRoot, String snapshotName)
      throws IOException {
    checkNNStartup();
    LOG.debug("*DIR* NameNode.createSnapshot: Path {} and SnapshotName {}",
        snapshotRoot, snapshotName);
    if (!checkPathLength(snapshotRoot)) {
      throw new IOException("createSnapshot: Pathname too long.  Limit "
          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return (String) cacheEntry.getPayload();
    }

    metrics.incrCreateSnapshotOps();
    String ret = null;
    try {
      ret = namesystem.createSnapshot(snapshotRoot, snapshotName,
          cacheEntry != null);
    } finally {
      RetryCache.setState(cacheEntry, ret != null, ret);
    }
    return ret;
  }
  
  @Override
  public void deleteSnapshot(String snapshotRoot, String snapshotName)
      throws IOException {
    checkNNStartup();
    LOG.debug("*DIR* NameNode.deleteSnapshot: Path {} and SnapshotName {}",
        snapshotRoot, snapshotName);
    if (snapshotName == null || snapshotName.isEmpty()) {
      throw new IOException("The snapshot name is null or empty.");
    }
    namesystem.checkOperation(OperationCategory.WRITE);
    metrics.incrDeleteSnapshotOps();
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return; // Return previous response
    }
    boolean success = false;
    try {
      namesystem.deleteSnapshot(snapshotRoot, snapshotName, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override
  // Client Protocol
  public void allowSnapshot(String snapshotRoot) throws IOException {
    checkNNStartup();
    metrics.incrAllowSnapshotOps();
    namesystem.allowSnapshot(snapshotRoot);
  }

  @Override
  // Client Protocol
  public void disallowSnapshot(String snapshot) throws IOException {
    checkNNStartup();
    metrics.incrDisAllowSnapshotOps();
    namesystem.disallowSnapshot(snapshot);
  }

  @Override
  // ClientProtocol
  public void renameSnapshot(String snapshotRoot, String snapshotOldName,
      String snapshotNewName) throws IOException {
    checkNNStartup();
    LOG.debug(
        "*DIR* NameNode.renameSnapshot: Snapshot Path {},snapshotOldName {}, snapshotNewName {}",
        snapshotRoot, snapshotOldName, snapshotNewName);
    if (snapshotNewName == null || snapshotNewName.isEmpty()) {
      throw new IOException("The new snapshot name is null or empty.");
    }
    namesystem.checkOperation(OperationCategory.WRITE);
    metrics.incrRenameSnapshotOps();
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return; // Return previous response
    }
    boolean success = false;
    try {
      namesystem.renameSnapshot(snapshotRoot, snapshotOldName,
          snapshotNewName, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // Client Protocol
  public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
      throws IOException {
    checkNNStartup();
    SnapshottableDirectoryStatus[] status = namesystem
        .getSnapshottableDirListing();
    metrics.incrListSnapshottableDirOps();
    return status;
  }

  @Override // Client Protocol
  public SnapshotStatus[] getSnapshotListing(String snapshotRoot)
      throws IOException {
    checkNNStartup();
    SnapshotStatus[] status = namesystem
        .getSnapshotListing(snapshotRoot);
    metrics.incrListSnapshotsOps();
    return status;
  }

  @Override // ClientProtocol
  public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
      String earlierSnapshotName, String laterSnapshotName) throws IOException {
    checkNNStartup();
    SnapshotDiffReport report = namesystem.getSnapshotDiffReport(snapshotRoot,
        earlierSnapshotName, laterSnapshotName);
    metrics.incrSnapshotDiffReportOps();
    return report;
  }

  @Override // ClientProtocol
  public SnapshotDiffReportListing getSnapshotDiffReportListing(
      String snapshotRoot, String earlierSnapshotName, String laterSnapshotName,
      byte[] startPath, int index) throws IOException {
    checkNNStartup();
    SnapshotDiffReportListing report = namesystem
        .getSnapshotDiffReportListing(snapshotRoot, earlierSnapshotName,
            laterSnapshotName, startPath, index);
    metrics.incrSnapshotDiffReportOps();
    return report;
  }

  @Override // ClientProtocol
  public long addCacheDirective(
      CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return (Long) cacheEntry.getPayload();
    }

    boolean success = false;
    long ret = 0;
    try {
      ret = namesystem.addCacheDirective(path, flags, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success, ret);
    }
    return ret;
  }

  @Override // ClientProtocol
  public void modifyCacheDirective(
      CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return;
    }

    boolean success = false;
    try {
      namesystem.modifyCacheDirective(directive, flags, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ClientProtocol
  public void removeCacheDirective(long id) throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return;
    }
    boolean success = false;
    try {
      namesystem.removeCacheDirective(id, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ClientProtocol
  public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId,
      CacheDirectiveInfo filter) throws IOException {
    checkNNStartup();
    if (filter == null) {
      filter = new CacheDirectiveInfo.Builder().build();
    }
    return namesystem.listCacheDirectives(prevId, filter);
  }

  @Override //ClientProtocol
  public void addCachePool(CachePoolInfo info) throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return; // Return previous response
    }
    boolean success = false;
    try {
      namesystem.addCachePool(info, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ClientProtocol
  public void modifyCachePool(CachePoolInfo info) throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return; // Return previous response
    }
    boolean success = false;
    try {
      namesystem.modifyCachePool(info, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ClientProtocol
  public void removeCachePool(String cachePoolName) throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return;
    }
    boolean success = false;
    try {
      namesystem.removeCachePool(cachePoolName, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ClientProtocol
  public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
      throws IOException {
    checkNNStartup();
    return namesystem.listCachePools(prevKey != null ? prevKey : "");
  }

  @Override // ClientProtocol
  public void modifyAclEntries(String src, List<AclEntry> aclSpec)
      throws IOException {
    checkNNStartup();
    namesystem.modifyAclEntries(src, aclSpec);
  }

  @Override // ClienProtocol
  public void removeAclEntries(String src, List<AclEntry> aclSpec)
      throws IOException {
    checkNNStartup();
    namesystem.removeAclEntries(src, aclSpec);
  }

  @Override // ClientProtocol
  public void removeDefaultAcl(String src) throws IOException {
    checkNNStartup();
    namesystem.removeDefaultAcl(src);
  }

  @Override // ClientProtocol
  public void removeAcl(String src) throws IOException {
    checkNNStartup();
    namesystem.removeAcl(src);
  }

  @Override // ClientProtocol
  public void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
    checkNNStartup();
    namesystem.setAcl(src, aclSpec);
  }

  @Override // ClientProtocol
  public AclStatus getAclStatus(String src) throws IOException {
    checkNNStartup();
    return namesystem.getAclStatus(src);
  }
  
  @Override // ClientProtocol
  public void createEncryptionZone(String src, String keyName)
    throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    final CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return;
    }
    boolean success = false;
    try {
      namesystem.createEncryptionZone(src, keyName, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ClientProtocol
  public EncryptionZone getEZForPath(String src)
    throws IOException {
    checkNNStartup();
    return namesystem.getEZForPath(src);
  }

  @Override // ClientProtocol
  public BatchedEntries<EncryptionZone> listEncryptionZones(
      long prevId) throws IOException {
    checkNNStartup();
    return namesystem.listEncryptionZones(prevId);
  }

  @Override // ClientProtocol
  public void reencryptEncryptionZone(final String zone,
      final ReencryptAction action) throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    final CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return;
    }
    boolean success = false;
    try {
      namesystem.reencryptEncryptionZone(zone, action, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ClientProtocol
  public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(
      final long prevId) throws IOException {
    checkNNStartup();
    return namesystem.listReencryptionStatus(prevId);
  }

  @Override // ClientProtocol
  public void setErasureCodingPolicy(String src, String ecPolicyName)
      throws IOException {
    checkNNStartup();
    final CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return;
    }
    boolean success = false;
    try {
      if (ecPolicyName == null) {
        ecPolicyName = defaultECPolicyName;
        LOG.debug("No policy name is specified, set the default policy name instead");
      }
      LOG.debug("Set erasure coding policy {} on {}", ecPolicyName, src);
      namesystem.setErasureCodingPolicy(src, ecPolicyName, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ClientProtocol
  public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
      throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return; // Return previous response
    }
    boolean success = false;
    try {
      namesystem.setXAttr(src, xAttr, flag, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }
  
  @Override // ClientProtocol
  public List<XAttr> getXAttrs(String src, List<XAttr> xAttrs) 
      throws IOException {
    checkNNStartup();
    return namesystem.getXAttrs(src, xAttrs);
  }

  @Override // ClientProtocol
  public List<XAttr> listXAttrs(String src) throws IOException {
    checkNNStartup();
    return namesystem.listXAttrs(src);
  }
  
  @Override // ClientProtocol
  public void removeXAttr(String src, XAttr xAttr) throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return; // Return previous response
    }
    boolean success = false;
    try {
      namesystem.removeXAttr(src, xAttr, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  private void checkNNStartup() throws IOException {
    if (!this.nn.isStarted()) {
      String message = NameNode.composeNotStartedMessage(this.nn.getRole());
      throw new RetriableException(message);
    }
  }

  @Override // ClientProtocol
  public void checkAccess(String path, FsAction mode) throws IOException {
    checkNNStartup();
    namesystem.checkAccess(path, mode);
  }

  @Override // ClientProtocol
  public long getCurrentEditLogTxid() throws IOException {
    String operationName = "getCurrentEditLogTxid";
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.READ); // only active
    namesystem.checkSuperuserPrivilege(operationName);
    // if it's not yet open for write, we may be in the process of transitioning
    // from standby to active and may not yet know what the latest committed
    // txid is
    return namesystem.getEditLog().isOpenForWrite() ?
        namesystem.getEditLog().getLastWrittenTxId() : -1;
  }

  private static FSEditLogOp readOp(EditLogInputStream elis)
      throws IOException {
    try {
      return elis.readOp();
      // we can get the below two exceptions if a segment is deleted
      // (because we have accumulated too many edits) or (for the local journal/
      // no-QJM case only) if a in-progress segment is finalized under us ...
      // no need to throw an exception back to the client in this case
    } catch (FileNotFoundException e) {
      LOG.debug("Tried to read from deleted or moved edit log segment", e);
      return null;
    } catch (HttpGetFailedException e) {
      LOG.debug("Tried to read from deleted edit log segment", e);
      return null;
    }
  }

  @Override // ClientProtocol
  public EventBatchList getEditsFromTxid(long txid) throws IOException {
    String operationName = "getEditsFromTxid";
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.READ); // only active
    namesystem.checkSuperuserPrivilege(operationName);
    int maxEventsPerRPC = nn.getConf().getInt(
        DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY,
        DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT);
    FSEditLog log = namesystem.getFSImage().getEditLog();
    long syncTxid = log.getSyncTxId();
    // If we haven't synced anything yet, we can only read finalized
    // segments since we can't reliably determine which txns in in-progress
    // segments have actually been committed (e.g. written to a quorum of JNs).
    // If we have synced txns, we can definitely read up to syncTxid since
    // syncTxid is only updated after a transaction is committed to all
    // journals. (In-progress segments written by old writers are already
    // discarded for us, so if we read any in-progress segments they are
    // guaranteed to have been written by this NameNode.)
    boolean readInProgress = syncTxid > 0;

    // doas the NN login user for the actual operations to get edits.
    // Notably this is necessary when polling from the remote edits via https.
    // We have validated the client is a superuser from the NN RPC, so this
    // running as the login user here is safe.
    EventBatchList ret = SecurityUtil.doAsLoginUser(
        new PrivilegedExceptionAction<EventBatchList>() {
          @Override
          public EventBatchList run() throws IOException {
            return getEventBatchList(syncTxid, txid, log, readInProgress,
                maxEventsPerRPC);
          }
        });
    return ret;
  }

  private EventBatchList getEventBatchList(long syncTxid, long txid,
      FSEditLog log, boolean readInProgress, int maxEventsPerRPC)
      throws IOException {
    List<EventBatch> batches = Lists.newArrayList();
    int totalEvents = 0;
    long maxSeenTxid = -1;
    long firstSeenTxid = -1;

    if (syncTxid > 0 && txid > syncTxid) {
      // we can't read past syncTxid, so there's no point in going any further
      return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
    }

    Collection<EditLogInputStream> streams = null;
    try {
      streams = log.selectInputStreams(txid, 0, null, readInProgress);
    } catch (IllegalStateException e) { // can happen if we have
      // transitioned out of active and haven't yet transitioned to standby
      // and are using QJM -- the edit log will be closed and this exception
      // will result
      LOG.info("NN is transitioning from active to standby and FSEditLog " +
          "is closed -- could not read edits");
      return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
    }

    boolean breakOuter = false;
    for (EditLogInputStream elis : streams) {
      // our assumption in this code is the EditLogInputStreams are ordered by
      // starting txid
      try {
        FSEditLogOp op = null;
        while ((op = readOp(elis)) != null) {
          // break out of here in the unlikely event that syncTxid is so
          // out of date that its segment has already been deleted, so the first
          // txid we get is greater than syncTxid
          if (syncTxid > 0 && op.getTransactionId() > syncTxid) {
            breakOuter = true;
            break;
          }

          EventBatch eventBatch = InotifyFSEditLogOpTranslator.translate(op);
          if (eventBatch != null) {
            batches.add(eventBatch);
            totalEvents += eventBatch.getEvents().length;
          }
          if (op.getTransactionId() > maxSeenTxid) {
            maxSeenTxid = op.getTransactionId();
          }
          if (firstSeenTxid == -1) {
            firstSeenTxid = op.getTransactionId();
          }
          if (totalEvents >= maxEventsPerRPC || (syncTxid > 0 &&
              op.getTransactionId() == syncTxid)) {
            // we're done
            breakOuter = true;
            break;
          }
        }
      } finally {
        elis.close();
      }
      if (breakOuter) {
        break;
      }
    }

    return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
  }

  @Override // ClientProtocol
  public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
      throws IOException {
    checkNNStartup();
    return namesystem.getErasureCodingPolicies();
  }

  @Override // ClientProtocol
  public Map<String, String> getErasureCodingCodecs() throws IOException {
    checkNNStartup();
    return namesystem.getErasureCodingCodecs();
  }

  @Override // ClientProtocol
  public ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException {
    checkNNStartup();
    return namesystem.getErasureCodingPolicy(src);
  }

  @Override // ClientProtocol
  public void unsetErasureCodingPolicy(String src) throws IOException {
    checkNNStartup();
    final CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return;
    }
    boolean success = false;
    try {
      LOG.debug("Unset erasure coding policy on {}", src);
      namesystem.unsetErasureCodingPolicy(src, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override
  public ECTopologyVerifierResult getECTopologyResultForPolicies(
      String... policyNames) throws IOException {
    return namesystem.getECTopologyResultForPolicies(policyNames);
  }

  @Override
  public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
      ErasureCodingPolicy[] policies) throws IOException {
    String operationName = "addErasureCodingPolicies";
    checkNNStartup();
    namesystem.checkSuperuserPrivilege(operationName);
    final CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return (AddErasureCodingPolicyResponse[]) cacheEntry.getPayload();
    }
    boolean success = false;
    AddErasureCodingPolicyResponse[] responses =
        new AddErasureCodingPolicyResponse[0];
    try {
      responses =
          namesystem.addErasureCodingPolicies(policies, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success, responses);
    }
    return responses;
  }

  @Override
  public void removeErasureCodingPolicy(String ecPolicyName)
      throws IOException {
    String operationName = "removeErasureCodingPolicy";
    checkNNStartup();
    namesystem.checkSuperuserPrivilege(operationName);
    final CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return;
    }
    boolean success = false;
    try {
      namesystem.removeErasureCodingPolicy(ecPolicyName, cacheEntry != null);
      success = true;
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ClientProtocol
  public void enableErasureCodingPolicy(String ecPolicyName)
      throws IOException {
    String operationName = "enableErasureCodingPolicy";
    checkNNStartup();
    namesystem.checkSuperuserPrivilege(operationName);
    final CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return;
    }
    boolean success = false;
    try {
      success = namesystem.enableErasureCodingPolicy(ecPolicyName,
          cacheEntry != null);
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ClientProtocol
  public void disableErasureCodingPolicy(String ecPolicyName)
      throws IOException {
    String operationName = "disableErasureCodingPolicy";
    checkNNStartup();
    namesystem.checkSuperuserPrivilege(operationName);
    final CacheEntry cacheEntry = getCacheEntry();
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return;
    }
    boolean success = false;
    try {
      success = namesystem.disableErasureCodingPolicy(ecPolicyName,
          cacheEntry != null);
    } finally {
      RetryCache.setState(cacheEntry, success);
    }
  }

  @Override // ReconfigurationProtocol
  public void startReconfiguration() throws IOException {
    checkNNStartup();
    String operationName = "startNamenodeReconfiguration";
    namesystem.checkSuperuserPrivilege(operationName);
    nn.startReconfigurationTask();
    namesystem.logAuditEvent(true, operationName, null);
  }

  @Override // ReconfigurationProtocol
  public ReconfigurationTaskStatus getReconfigurationStatus()
      throws IOException {
    checkNNStartup();
    String operationName = "getNamenodeReconfigurationStatus";
    namesystem.checkSuperuserPrivilege(operationName);
    ReconfigurationTaskStatus status = nn.getReconfigurationTaskStatus();
    namesystem.logAuditEvent(true, operationName, null);
    return status;
  }

  @Override // ReconfigurationProtocol
  public List<String> listReconfigurableProperties() throws IOException {
    checkNNStartup();
    String operationName = "listNamenodeReconfigurableProperties";
    namesystem.checkSuperuserPrivilege(operationName);
    List<String> result = Lists.newArrayList(nn.getReconfigurableProperties());
    namesystem.logAuditEvent(true, operationName, null);
    return result;
  }

  @Override
  public Long getNextSPSPath() throws IOException {
    checkNNStartup();
    String operationName = "getNextSPSPath";
    namesystem.checkSuperuserPrivilege(operationName);
    if (nn.isStandbyState()) {
      throw new StandbyException("Not supported by Standby Namenode.");
    }
    // Check that SPS is enabled externally
    StoragePolicySatisfyManager spsMgr =
        namesystem.getBlockManager().getSPSManager();
    StoragePolicySatisfierMode spsMode = (spsMgr != null ? spsMgr.getMode()
        : StoragePolicySatisfierMode.NONE);
    if (spsMode != StoragePolicySatisfierMode.EXTERNAL) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("SPS service mode is {}, so external SPS service is "
            + "not allowed to fetch the path Ids", spsMode);
      }
      throw new IOException("SPS service mode is " + spsMode + ", so "
          + "external SPS service is not allowed to fetch the path Ids");
    }
    return namesystem.getBlockManager().getSPSManager().getNextPathId();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AclEntryStatusFormat 源码

hadoop AclFeature 源码

hadoop AclStorage 源码

hadoop AclTransformation 源码

hadoop AuditLogger 源码

hadoop BackupImage 源码

hadoop BackupJournalManager 源码

hadoop BackupNode 源码

hadoop BackupState 源码

hadoop CacheManager 源码

0  赞