hadoop ResourceLocalizationService 源码

  • 2022-10-20
  • 浏览 (193)

haddop ResourceLocalizationService 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;

import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;

import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskValidator;
import org.apache.hadoop.util.DiskValidatorFactory;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
import org.apache.hadoop.yarn.util.FSDownload;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

public class ResourceLocalizationService extends CompositeService
    implements EventHandler<LocalizationEvent>, LocalizationProtocol {

  private static final Logger LOG =
       LoggerFactory.getLogger(ResourceLocalizationService.class);
  public static final String NM_PRIVATE_DIR = "nmPrivate";
  public static final FsPermission NM_PRIVATE_PERM = new FsPermission((short) 0700);
  private static final FsPermission PUBLIC_FILECACHE_FOLDER_PERMS =
      new FsPermission((short) 0755);

  private Server server;
  private InetSocketAddress localizationServerAddress;
  @VisibleForTesting
  long cacheTargetSize;
  private long cacheCleanupPeriod;

  private final ContainerExecutor exec;
  protected final Dispatcher dispatcher;
  private final DeletionService delService;
  private LocalizerTracker localizerTracker;
  private RecordFactory recordFactory;
  private final ScheduledExecutorService cacheCleanup;
  private LocalizerTokenSecretManager secretManager;
  private NMStateStoreService stateStore;
  @VisibleForTesting
  final NodeManagerMetrics metrics;

  @VisibleForTesting
  LocalResourcesTracker publicRsrc;

  private LocalDirsHandlerService dirsHandler;
  private DirsChangeListener localDirsChangeListener;
  private DirsChangeListener logDirsChangeListener;
  private Context nmContext;
  private DiskValidator diskValidator;

  /**
   * Map of LocalResourceTrackers keyed by username, for private
   * resources.
   */
  @VisibleForTesting
  final ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
    new ConcurrentHashMap<String,LocalResourcesTracker>();

  /**
   * Map of LocalResourceTrackers keyed by appid, for application
   * resources.
   */
  private final ConcurrentMap<String,LocalResourcesTracker> appRsrc =
    new ConcurrentHashMap<String,LocalResourcesTracker>();
  
  FileContext lfs;

  public ResourceLocalizationService(Dispatcher dispatcher,
      ContainerExecutor exec, DeletionService delService,
      LocalDirsHandlerService dirsHandler, Context context,
      NodeManagerMetrics metrics) {

    super(ResourceLocalizationService.class.getName());
    this.exec = exec;
    this.dispatcher = dispatcher;
    this.delService = delService;
    this.dirsHandler = dirsHandler;

    this.cacheCleanup = new HadoopScheduledThreadPoolExecutor(1,
        new ThreadFactoryBuilder()
          .setNameFormat("ResourceLocalizationService Cache Cleanup")
          .build());
    this.stateStore = context.getNMStateStore();
    this.nmContext = context;
    this.metrics = metrics;
  }

  FileContext getLocalFileContext(Configuration conf) {
    try {
      return FileContext.getLocalFSFileContext(conf);
    } catch (IOException e) {
      throw new YarnRuntimeException("Failed to access local fs");
    }
  }

  private void validateConf(Configuration conf) {
    int perDirFileLimit =
        conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
          YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY);
    if (perDirFileLimit <= 36) {
      LOG.error(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
          + " parameter is configured with very low value.");
      throw new YarnRuntimeException(
        YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
            + " parameter is configured with a value less than 37.");
    } else {
      LOG.info("per directory file limit = " + perDirFileLimit);
    }
  }

  @Override
  public void serviceInit(Configuration conf) throws Exception {
    this.validateConf(conf);
    this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher,
        true, conf, stateStore, dirsHandler);
    this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);

    try {
      lfs = getLocalFileContext(conf);
      lfs.setUMask(new FsPermission((short) FsPermission.DEFAULT_UMASK));

      if (!stateStore.canRecover()|| stateStore.isNewlyCreated()) {
        cleanUpLocalDirs(lfs, delService);
        cleanupLogDirs(lfs, delService);
        initializeLocalDirs(lfs);
        initializeLogDirs(lfs);
      }
    } catch (Exception e) {
      throw new YarnRuntimeException(
        "Failed to initialize LocalizationService", e);
    }

    diskValidator = DiskValidatorFactory.getInstance(
        YarnConfiguration.DEFAULT_DISK_VALIDATOR);
    cacheTargetSize =
      conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) << 20;
    cacheCleanupPeriod =
      conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
    localizationServerAddress = conf.getSocketAddr(
        YarnConfiguration.NM_BIND_HOST,
        YarnConfiguration.NM_LOCALIZER_ADDRESS,
        YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
        YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);

    localizerTracker = createLocalizerTracker(conf);
    addService(localizerTracker);
    dispatcher.register(LocalizerEventType.class, localizerTracker);
    localDirsChangeListener = new DirsChangeListener() {
      @Override
      public void onDirsChanged() {
        checkAndInitializeLocalDirs();
      }
    };
    logDirsChangeListener = new DirsChangeListener() {
      @Override
      public void onDirsChanged() {
        initializeLogDirs(lfs);
      }
    };
    super.serviceInit(conf);
  }

  //Recover localized resources after an NM restart
  public void recoverLocalizedResources(RecoveredLocalizationState state)
      throws URISyntaxException, IOException {
    LocalResourceTrackerState trackerState = state.getPublicTrackerState();
    recoverTrackerResources(publicRsrc, trackerState);

    try (RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it
             = state.getIterator()) {
      while (it.hasNext()) {
        Map.Entry<String, RecoveredUserResources> userEntry = it.next();
        String user = userEntry.getKey();
        RecoveredUserResources userResources = userEntry.getValue();
        trackerState = userResources.getPrivateTrackerState();
        LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
            null, dispatcher, true, super.getConfig(), stateStore,
            dirsHandler);
        LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
            tracker);
        if (oldTracker != null) {
          tracker = oldTracker;
        }
        recoverTrackerResources(tracker, trackerState);

        for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
            userResources.getAppTrackerStates().entrySet()) {
          trackerState = appEntry.getValue();
          ApplicationId appId = appEntry.getKey();
          String appIdStr = appId.toString();
          LocalResourcesTracker tracker1 = new LocalResourcesTrackerImpl(user,
              appId, dispatcher, false, super.getConfig(), stateStore,
              dirsHandler);
          LocalResourcesTracker oldTracker1 = appRsrc.putIfAbsent(appIdStr,
              tracker1);
          if (oldTracker1 != null) {
            tracker1 = oldTracker1;
          }
          recoverTrackerResources(tracker1, trackerState);
        }
      }
    }
  }

  private void recoverTrackerResources(LocalResourcesTracker tracker,
      LocalResourceTrackerState state) throws URISyntaxException, IOException {
    try (RecoveryIterator<LocalizedResourceProto> it =
             state.getCompletedResourcesIterator()) {
      while (it != null && it.hasNext()) {
        LocalizedResourceProto proto = it.next();
        LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
        LocalResourceRequest req = new LocalResourceRequest(rsrc);
        LOG.debug("Recovering localized resource {} at {}",
            req, proto.getLocalPath());
        tracker.handle(new ResourceRecoveredEvent(req,
            new Path(proto.getLocalPath()), proto.getSize()));
      }
    }

    try (RecoveryIterator<Map.Entry<LocalResourceProto, Path>> it =
             state.getStartedResourcesIterator()) {
      while (it != null && it.hasNext()) {
        Map.Entry<LocalResourceProto, Path> entry = it.next();
        LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
        LocalResourceRequest req = new LocalResourceRequest(rsrc);
        Path localPath = entry.getValue();
        tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));

        // delete any in-progress localizations, containers will request again
        LOG.info("Deleting in-progress localization for " + req + " at "
            + localPath);
        tracker.remove(tracker.getLocalizedResource(req), delService);
      }
    }

    // TODO: remove untracked directories in local filesystem
  }

  @Override
  public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
    return localizerTracker.processHeartbeat(status);
  }

  @Override
  public void serviceStart() throws Exception {
    cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
        cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
    server = createServer();
    server.start();
    localizationServerAddress =
        getConfig().updateConnectAddr(YarnConfiguration.NM_BIND_HOST,
                                      YarnConfiguration.NM_LOCALIZER_ADDRESS,
                                      YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
                                      server.getListenerAddress());
    LOG.info("Localizer started on port " + server.getPort());
    super.serviceStart();
    dirsHandler.registerLocalDirsChangeListener(localDirsChangeListener);
    dirsHandler.registerLogDirsChangeListener(logDirsChangeListener);
  }

  LocalizerTracker createLocalizerTracker(Configuration conf) {
    return new LocalizerTracker(conf);
  }

  Server createServer() {
    Configuration conf = getConfig();
    YarnRPC rpc = YarnRPC.create(conf);
    if (UserGroupInformation.isSecurityEnabled()) {
      secretManager = new LocalizerTokenSecretManager();      
    }
    
    Server server = rpc.getServer(LocalizationProtocol.class, this,
        localizationServerAddress, conf, secretManager, 
        conf.getInt(YarnConfiguration.NM_LOCALIZER_CLIENT_THREAD_COUNT, 
            YarnConfiguration.DEFAULT_NM_LOCALIZER_CLIENT_THREAD_COUNT));
    
    // Enable service authorization?
    if (conf.getBoolean(
        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
        false)) {
      server.refreshServiceAcl(conf, NMPolicyProvider.getInstance());
    }
    
    return server;
  }

  @Override
  public void serviceStop() throws Exception {
    dirsHandler.deregisterLocalDirsChangeListener(localDirsChangeListener);
    dirsHandler.deregisterLogDirsChangeListener(logDirsChangeListener);
    if (server != null) {
      server.stop();
    }
    cacheCleanup.shutdown();
    super.serviceStop();
  }

  @Override
  public void handle(LocalizationEvent event) {
    // TODO: create log dir as $logdir/$user/$appId
    switch (event.getType()) {
    case INIT_APPLICATION_RESOURCES:
      handleInitApplicationResources(
          ((ApplicationLocalizationEvent)event).getApplication());
      break;
    case LOCALIZE_CONTAINER_RESOURCES:
      handleInitContainerResources((ContainerLocalizationRequestEvent) event);
      break;
    case CONTAINER_RESOURCES_LOCALIZED:
      handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
      break;
    case CACHE_CLEANUP:
      handleCacheCleanup();
      break;
    case CLEANUP_CONTAINER_RESOURCES:
      handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
      break;
    case DESTROY_APPLICATION_RESOURCES:
      handleDestroyApplicationResources(
          ((ApplicationLocalizationEvent)event).getApplication());
      break;
    default:
      throw new YarnRuntimeException("Unknown localization event: " + event);
    }
  }
  
  /**
   * Handle event received the first time any container is scheduled
   * by a given application.
   */
  @SuppressWarnings("unchecked")
  private void handleInitApplicationResources(Application app) {
    // 0) Create application tracking structs
    String userName = app.getUser();
    privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
        null, dispatcher, true, super.getConfig(), stateStore, dirsHandler));
    String appIdStr = app.getAppId().toString();
    appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
        app.getAppId(), dispatcher, false, super.getConfig(), stateStore,
        dirsHandler));
    // 1) Signal container init
    //
    // This is handled by the ApplicationImpl state machine and allows
    // containers to proceed with launching.
    dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
          app.getAppId()));
  }
  
  /**
   * For each of the requested resources for a container, determines the
   * appropriate {@link LocalResourcesTracker} and forwards a 
   * {@link LocalResourceRequest} to that tracker.
   */
  private void handleInitContainerResources(
      ContainerLocalizationRequestEvent rsrcReqs) {
    Container c = rsrcReqs.getContainer();
    EnumSet<ContainerState> set =
        EnumSet.of(ContainerState.LOCALIZING,
            ContainerState.RUNNING, ContainerState.REINITIALIZING);
    if (!set.contains(c.getContainerState())) {
      LOG.warn(c.getContainerId() + " is at " + c.getContainerState()
          + " state, do not localize resources.");
      return;
    }
    // create a loading cache for the file statuses
    LoadingCache<Path,Future<FileStatus>> statCache =
        CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
    LocalizerContext ctxt = new LocalizerContext(
        c.getUser(), c.getContainerId(), c.getCredentials(), statCache);
    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
      rsrcReqs.getRequestedResources();
    for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
         rsrcs.entrySet()) {
      LocalResourcesTracker tracker =
          getLocalResourcesTracker(e.getKey(), c.getUser(),
              c.getContainerId().getApplicationAttemptId()
                  .getApplicationId());
      for (LocalResourceRequest req : e.getValue()) {
        tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
        LOG.debug("Localizing {} for container {}",
            req.getPath(), c.getContainerId());
      }
    }
  }

  /**
   * Once a container's resources are localized, kill the corresponding
   * {@link ContainerLocalizer}
   */
  private void handleContainerResourcesLocalized(
      ContainerLocalizationEvent event) {
    Container c = event.getContainer();
    String locId = c.getContainerId().toString();
    localizerTracker.endContainerLocalization(locId);
  }

  @VisibleForTesting
  LocalCacheCleanerStats handleCacheCleanup() {
    LocalCacheCleaner cleaner =
        new LocalCacheCleaner(delService, cacheTargetSize);
    cleaner.addResources(publicRsrc);
    for (LocalResourcesTracker t : privateRsrc.values()) {
      cleaner.addResources(t);
    }
    LocalCacheCleaner.LocalCacheCleanerStats stats = cleaner.cleanCache();
    if (LOG.isDebugEnabled()) {
      LOG.debug(stats.toStringDetailed());
    } else if (LOG.isInfoEnabled()) {
      LOG.info(stats.toString());
    }

    // Update metrics
    metrics.setCacheSizeBeforeClean(stats.getCacheSizeBeforeClean());
    metrics.setTotalBytesDeleted(stats.getTotalDelSize());
    metrics.setPrivateBytesDeleted(stats.getPrivateDelSize());
    metrics.setPublicBytesDeleted(stats.getPublicDelSize());
    return stats;
  }


  @SuppressWarnings("unchecked")
  private void handleCleanupContainerResources(
      ContainerLocalizationCleanupEvent rsrcCleanup) {
    Container c = rsrcCleanup.getContainer();
    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
      rsrcCleanup.getResources();
    for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
         rsrcs.entrySet()) {
      LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
          c.getContainerId().getApplicationAttemptId()
          .getApplicationId());
      for (LocalResourceRequest req : e.getValue()) {
        tracker.handle(new ResourceReleaseEvent(req,
            c.getContainerId()));
      }
    }
    String locId = c.getContainerId().toString();
    localizerTracker.cleanupPrivLocalizers(locId);

    // Delete the container directories
    String userName = c.getUser();
    String containerIDStr = c.toString();
    String appIDStr =
        c.getContainerId().getApplicationAttemptId().getApplicationId()
            .toString();
    
    // Try deleting from good local dirs and full local dirs because a dir might
    // have gone bad while the app was running(disk full). In addition
    // a dir might have become good while the app was running.
    // Check if the container dir exists and if it does, try to delete it

    for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
      // Delete the user-owned container-dir
      Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
      Path userdir = new Path(usersdir, userName);
      Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
      Path appDir = new Path(allAppsdir, appIDStr);
      Path containerDir = new Path(appDir, containerIDStr);
      submitDirForDeletion(userName, containerDir);

      // Delete the nmPrivate container-dir

      Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
      Path appSysDir = new Path(sysDir, appIDStr);
      Path containerSysDir = new Path(appSysDir, containerIDStr);
      submitDirForDeletion(null, containerSysDir);
    }

    dispatcher.getEventHandler().handle(
        new ContainerEvent(c.getContainerId(),
            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
  }
  
  private void submitDirForDeletion(String userName, Path dir) {
    try {
      lfs.getFileStatus(dir);
      FileDeletionTask deletionTask = new FileDeletionTask(delService, userName,
          dir, null);
      delService.delete(deletionTask);
    } catch (UnsupportedFileSystemException ue) {
      LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue);
    } catch (IOException ie) {
      // ignore
      return;
    }
  }


  @SuppressWarnings({"unchecked"})
  private void handleDestroyApplicationResources(Application application) {
    String userName = application.getUser();
    ApplicationId appId = application.getAppId();
    String appIDStr = application.toString();
    LocalResourcesTracker appLocalRsrcsTracker =
      appRsrc.remove(appId.toString());
    if (appLocalRsrcsTracker != null) {
      for (LocalizedResource rsrc : appLocalRsrcsTracker ) {
        Path localPath = rsrc.getLocalPath();
        if (localPath != null) {
          try {
            stateStore.removeLocalizedResource(userName, appId, localPath);
          } catch (IOException e) {
            LOG.error("Unable to remove resource " + rsrc + " for " + appIDStr
                + " from state store", e);
          }
        }
      }
    } else {
      LOG.warn("Removing uninitialized application " + application);
    }

    // Delete the application directories
    userName = application.getUser();
    appIDStr = application.toString();

    for (String localDir : dirsHandler.getLocalDirsForCleanup()) {

      // Delete the user-owned app-dir
      Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
      Path userdir = new Path(usersdir, userName);
      Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
      Path appDir = new Path(allAppsdir, appIDStr);
      submitDirForDeletion(userName, appDir);

      // Delete the nmPrivate app-dir
      Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
      Path appSysDir = new Path(sysDir, appIDStr);
      submitDirForDeletion(null, appSysDir);
    }

    // TODO: decrement reference counts of all resources associated with this
    // app

    dispatcher.getEventHandler().handle(new ApplicationEvent(
          application.getAppId(),
          ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
  }


  LocalResourcesTracker getLocalResourcesTracker(
      LocalResourceVisibility visibility, String user, ApplicationId appId) {
    switch (visibility) {
      default:
      case PUBLIC:
        return publicRsrc;
      case PRIVATE:
        return privateRsrc.get(user);
      case APPLICATION:
        return appRsrc.get(appId.toString());
    }
  }

  private String getUserFileCachePath(String user) {
    return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
      ContainerLocalizer.USERCACHE, user, ContainerLocalizer.FILECACHE));

  }

  private String getAppFileCachePath(String user, String appId) {
    return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
        ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
        ContainerLocalizer.FILECACHE));
  }
  
  @VisibleForTesting
  @Private
  public PublicLocalizer getPublicLocalizer() {
    return localizerTracker.publicLocalizer;
  }

  @VisibleForTesting
  @Private
  public LocalizerRunner getLocalizerRunner(String locId) {
    return localizerTracker.privLocalizers.get(locId);
  }
  
  @VisibleForTesting
  @Private
  public Map<String, LocalizerRunner> getPrivateLocalizers() {
    return localizerTracker.privLocalizers;
  }
  
  /**
   * Sub-component handling the spawning of {@link ContainerLocalizer}s
   */
  class LocalizerTracker extends AbstractService implements EventHandler<LocalizerEvent>  {

    private final PublicLocalizer publicLocalizer;
    private final Map<String,LocalizerRunner> privLocalizers;
    private final Map<String, String> recentlyCleanedLocalizers;
    private final int maxRecentlyCleaned = 128;

    LocalizerTracker(Configuration conf) {
      this(conf, new HashMap<String,LocalizerRunner>());
    }

    LocalizerTracker(Configuration conf,
        Map<String,LocalizerRunner> privLocalizers) {
      super(LocalizerTracker.class.getName());
      this.publicLocalizer = new PublicLocalizer(conf);
      this.privLocalizers = privLocalizers;
      this.recentlyCleanedLocalizers =
          new LRUCacheHashMap<String, String>(maxRecentlyCleaned, false);
    }
    
    @Override
    public synchronized void serviceStart() throws Exception {
      publicLocalizer.start();
      super.serviceStart();
    }

    public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
      String locId = status.getLocalizerId();
      synchronized (privLocalizers) {
        LocalizerRunner localizer = privLocalizers.get(locId);
        if (null == localizer) {
          // TODO process resources anyway
          LOG.info("Unknown localizer with localizerId " + locId
              + " is sending heartbeat. Ordering it to DIE");
          LocalizerHeartbeatResponse response =
            recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
          response.setLocalizerAction(LocalizerAction.DIE);
          return response;
        }
        return localizer.processHeartbeat(status.getResources());
      }
    }
    
    @Override
    public void serviceStop() throws Exception {
      for (LocalizerRunner localizer : privLocalizers.values()) {
        localizer.interrupt();
      }
      publicLocalizer.interrupt();
      super.serviceStop();
    }

    @Override
    public void handle(LocalizerEvent event) {
      String locId = event.getLocalizerId();
      switch (event.getType()) {
      case REQUEST_RESOURCE_LOCALIZATION:
        // 0) find running localizer or start new thread
        LocalizerResourceRequestEvent req =
          (LocalizerResourceRequestEvent)event;
        switch (req.getVisibility()) {
        case PUBLIC:
          publicLocalizer.addResource(req);
          break;
        case PRIVATE:
        case APPLICATION:
          synchronized (privLocalizers) {
            LocalizerRunner localizer = privLocalizers.get(locId);
            if (localizer != null && localizer.killContainerLocalizer.get()) {
              // Old localizer thread has been stopped, remove it and create
              // a new localizer thread.
              LOG.info("New " + event.getType() + " localize request for "
                  + locId + ", remove old private localizer.");
              privLocalizers.remove(locId);
              localizer.interrupt();
              localizer = null;
            }
            if (null == localizer) {
              // Don't create a new localizer if this one has been recently
              // cleaned up - this can happen if localization requests come
              // in after cleanupPrivLocalizers has been called.
              if (recentlyCleanedLocalizers.containsKey(locId)) {
                LOG.info(
                    "Skipping localization request for recently cleaned " +
                    "localizer " + locId + " resource:" + req.getResource());
                break;
              }
              LOG.info("Created localizer for " + locId);
              localizer = new LocalizerRunner(req.getContext(), locId);
              privLocalizers.put(locId, localizer);
              localizer.start();
            }
            // 1) propagate event
            localizer.addResource(req);
          }
          break;
        }
        break;
      }
    }

    public void cleanupPrivLocalizers(String locId) {
      synchronized (privLocalizers) {
        LocalizerRunner localizer = privLocalizers.get(locId);
        recentlyCleanedLocalizers.put(locId, locId);
        if (null == localizer) {
          return; // ignore; already gone
        }
        privLocalizers.remove(locId);
        localizer.interrupt();
      }
    }

    public void endContainerLocalization(String locId) {
      LocalizerRunner localizer;
      synchronized (privLocalizers) {
        localizer = privLocalizers.get(locId);
        if (null == localizer) {
          return; // ignore
        }
      }
      localizer.endContainerLocalization();
    }
  }
  

  private static ExecutorService createLocalizerExecutor(Configuration conf) {
    int nThreads = conf.getInt(
        YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT,
        YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT);
    ThreadFactory tf = new ThreadFactoryBuilder()
      .setNameFormat("PublicLocalizer #%d")
      .build();
    return HadoopExecutors.newFixedThreadPool(nThreads, tf);
  }


  class PublicLocalizer extends Thread {

    final FileContext lfs;
    final Configuration conf;
    final ExecutorService threadPool;
    final CompletionService<Path> queue;
    // Its shared between public localizer and dispatcher thread.
    final Map<Future<Path>,LocalizerResourceRequestEvent> pending;

    PublicLocalizer(Configuration conf) {
      super("Public Localizer");
      this.lfs = getLocalFileContext(conf);
      this.conf = conf;
      this.pending = Collections.synchronizedMap(
          new HashMap<Future<Path>, LocalizerResourceRequestEvent>());
      this.threadPool = createLocalizerExecutor(conf);
      this.queue = new ExecutorCompletionService<Path>(threadPool);
    }

    public void addResource(LocalizerResourceRequestEvent request) {
      // TODO handle failures, cancellation, requests by other containers
      LocalizedResource rsrc = request.getResource();
      LocalResourceRequest key = rsrc.getRequest();
      LOG.info("Downloading public resource: " + key);
      /*
       * Here multiple containers may request the same resource. So we need
       * to start downloading only when
       * 1) ResourceState == DOWNLOADING
       * 2) We are able to acquire non blocking semaphore lock.
       * If not we will skip this resource as either it is getting downloaded
       * or it FAILED / LOCALIZED.
       */

      if (rsrc.tryAcquire()) {
        if (rsrc.getState() == ResourceState.DOWNLOADING) {
          LocalResource resource = request.getResource().getRequest();
          try {
            Path publicRootPath =
                dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
                    + ContainerLocalizer.FILECACHE,
                  ContainerLocalizer.getEstimatedSize(resource), true);
            Path publicDirDestPath =
                publicRsrc.getPathForLocalization(key, publicRootPath,
                    delService);
            if (publicDirDestPath == null) {
              return;
            }
            if (!publicDirDestPath.getParent().equals(publicRootPath)) {
              createParentDirs(publicDirDestPath, publicRootPath);
              if (diskValidator != null) {
                diskValidator.checkStatus(
                    new File(publicDirDestPath.toUri().getPath()));
              } else {
                throw new DiskChecker.DiskErrorException(
                    "Disk Validator is null!");
              }
            }

            // explicitly synchronize pending here to avoid future task
            // completing and being dequeued before pending updated
            synchronized (pending) {
              pending.put(queue.submit(new FSDownload(lfs, null, conf,
                  publicDirDestPath, resource, request.getContext().getStatCache())),
                  request);
            }
          } catch (IOException e) {
            rsrc.unlock();
            publicRsrc.handle(new ResourceFailedLocalizationEvent(request
              .getResource().getRequest(), e.getMessage()));
            LOG.error("Local path for public localization is not found. "
                + " May be disks failed.", e);
          } catch (IllegalArgumentException ie) {
            rsrc.unlock();
            publicRsrc.handle(new ResourceFailedLocalizationEvent(request
                .getResource().getRequest(), ie.getMessage()));
            LOG.error("Local path for public localization is not found. "
                + " Incorrect path. " + request.getResource().getRequest()
                .getPath(), ie);
          } catch (RejectedExecutionException re) {
            rsrc.unlock();
            publicRsrc.handle(new ResourceFailedLocalizationEvent(request
              .getResource().getRequest(), re.getMessage()));
            LOG.error("Failed to submit rsrc " + rsrc + " for download."
                + " Either queue is full or threadpool is shutdown.", re);
          }
        } else {
          LOG.debug("Skip downloading resource: {} since it's in"
                + " state: {}", key, rsrc.getState());
          rsrc.unlock();
        }
      } else {
        LOG.debug("Skip downloading resource: {} since it is locked"
              + " by other threads", key);
      }
    }

    private void createParentDirs(Path destDirPath, Path destDirRoot)
        throws IOException {
      if (destDirPath == null || destDirPath.equals(destDirRoot)) {
        return;
      }
      createParentDirs(destDirPath.getParent(), destDirRoot);
      createDir(destDirPath, PUBLIC_FILECACHE_FOLDER_PERMS);
    }

    private void createDir(Path dirPath, FsPermission perms)
        throws IOException {
      lfs.mkdir(dirPath, perms, false);
      if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
        lfs.setPermission(dirPath, perms);
      }
    }

    @Override
    public void run() {
      try {
        // TODO shutdown, better error handling esp. DU
        while (!Thread.currentThread().isInterrupted()) {
          try {
            Future<Path> completed = queue.take();
            LocalizerResourceRequestEvent assoc = pending.remove(completed);
            try {
              if (null == assoc) {
                LOG.error("Localized unknown resource to " + completed);
                // TODO delete
                return;
              }
              Path local = completed.get();
              LocalResourceRequest key = assoc.getResource().getRequest();
              publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
                .getDU(new File(local.toUri()))));
              assoc.getResource().unlock();
            } catch (ExecutionException e) {
              String user = assoc.getContext().getUser();
              ApplicationId applicationId = assoc.getContext().getContainerId().getApplicationAttemptId().getApplicationId();
              LocalResourcesTracker tracker =
                getLocalResourcesTracker(LocalResourceVisibility.APPLICATION, user, applicationId);
              final String diagnostics = "Failed to download resource " +
                  assoc.getResource() + " " + e.getCause();
              if(tracker != null) {
                tracker.handle(new ResourceFailedLocalizationEvent(
                    assoc.getResource().getRequest(), diagnostics));
              }
              publicRsrc.handle(new ResourceFailedLocalizationEvent(
                  assoc.getResource().getRequest(), diagnostics));
              LOG.error(diagnostics);
              assoc.getResource().unlock();
            } catch (CancellationException e) {
              // ignore; shutting down
            }
          } catch (InterruptedException e) {
            return;
          }
        }
      } catch(Throwable t) {
        LOG.error("Error: Shutting down", t);
      } finally {
        LOG.info("Public cache exiting");
        threadPool.shutdownNow();
      }
    }

  }

  /**
   * Runs the {@link ContainerLocalizer} itself in a separate process with
   * access to user's credentials. One {@link LocalizerRunner} per localizerId.
   * 
   */
  class LocalizerRunner extends Thread {

    final LocalizerContext context;
    final String localizerId;
    final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
    // Its a shared list between Private Localizer and dispatcher thread.
    final List<LocalizerResourceRequestEvent> pending;
    private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false);

    // TODO: threadsafe, use outer?
    private final RecordFactory recordFactory =
      RecordFactoryProvider.getRecordFactory(getConfig());

    private final String tokenFileName;

    LocalizerRunner(LocalizerContext context, String localizerId) {
      super("LocalizerRunner for " + localizerId);
      this.context = context;
      this.localizerId = localizerId;
      this.pending =
          Collections
            .synchronizedList(new ArrayList<LocalizerResourceRequestEvent>());
      this.scheduled = new HashMap<>();
      tokenFileName =  String.format(ContainerExecutor.TOKEN_FILE_NAME_FMT,
         localizerId + Long.toHexString(System.currentTimeMillis()));
    }

    public void addResource(LocalizerResourceRequestEvent request) {
      pending.add(request);
    }

    public void endContainerLocalization() {
      killContainerLocalizer.set(true);
    }

    /**
     * Find next resource to be given to a spawned localizer.
     * 
     * @return the next resource to be localized
     */
    private ResourceLocalizationSpec findNextResource(
        String user, ApplicationId applicationId) {
      synchronized (pending) {
        for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
            i.hasNext();) {
          LocalizerResourceRequestEvent evt = i.next();
          LocalizedResource nRsrc = evt.getResource();
          // Resource download should take place ONLY if resource is in
          // Downloading state
          if (nRsrc.getState() != ResourceState.DOWNLOADING) {
            i.remove();
            continue;
          }
          /*
           * Multiple containers will try to download the same resource. So the
           * resource download should start only if
           * 1) We can acquire a non blocking semaphore lock on resource
           * 2) Resource is still in DOWNLOADING state
           */
          if (nRsrc.tryAcquire()) {
            if (nRsrc.getState() == ResourceState.DOWNLOADING) {
              LocalResourceRequest nextRsrc = nRsrc.getRequest();
              LocalResource next =
                  recordFactory.newRecordInstance(LocalResource.class);
              next.setResource(URL.fromPath(nextRsrc.getPath()));
              next.setTimestamp(nextRsrc.getTimestamp());
              next.setType(nextRsrc.getType());
              next.setVisibility(evt.getVisibility());
              next.setPattern(evt.getPattern());
              ResourceLocalizationSpec nextSpec = null;
              try {
                LocalResourcesTracker tracker = getLocalResourcesTracker(
                    next.getVisibility(), user, applicationId);
                if (tracker != null) {
                  Path localPath = getPathForLocalization(next, tracker);
                  if (localPath != null) {
                    nextSpec = NodeManagerBuilderUtils.
                        newResourceLocalizationSpec(next, localPath);
                  }
                }
              } catch (IOException e) {
                LOG.error("local path for PRIVATE localization could not be " +
                    "found. Disks might have failed.", e);
              } catch (IllegalArgumentException e) {
                LOG.error("Incorrect path for PRIVATE localization."
                    + next.getResource().getFile(), e);
              } catch (URISyntaxException e) {
                LOG.error(
                    "Got exception in parsing URL of LocalResource:"
                        + next.getResource(), e);
              }
              if (nextSpec != null) {
                scheduled.put(nextRsrc, evt);
                return nextSpec;
              } else {
                // We failed to get a path for this, don't try to localize this
                // resource again.
                nRsrc.unlock();
                i.remove();
                continue;
              }
            } else {
              // Need to release acquired lock
              nRsrc.unlock();
            }
          }
        }
        return null;
      }
    }

    LocalizerHeartbeatResponse processHeartbeat(
        List<LocalResourceStatus> remoteResourceStatuses) {
      LocalizerHeartbeatResponse response =
        recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
      String user = context.getUser();
      ApplicationId applicationId =
          context.getContainerId().getApplicationAttemptId().getApplicationId();

      boolean fetchFailed = false;
      // Update resource statuses.
      for (LocalResourceStatus stat : remoteResourceStatuses) {
        LocalResource rsrc = stat.getResource();
        LocalResourceRequest req = null;
        try {
          req = new LocalResourceRequest(rsrc);
        } catch (URISyntaxException e) {
          LOG.error(
              "Got exception in parsing URL of LocalResource:"
                  + rsrc.getResource(), e);
          continue;
        }
        LocalizerResourceRequestEvent assoc = scheduled.get(req);
        if (assoc == null) {
          // internal error
          LOG.error("Unknown resource reported: " + req);
          continue;
        }
        LocalResourcesTracker tracker =
            getLocalResourcesTracker(req.getVisibility(), user, applicationId);
        if (tracker == null) {
          // This is likely due to a race between heartbeat and
          // app cleaning up.
          continue;
        }
        switch (stat.getStatus()) {
          case FETCH_SUCCESS:
            // notify resource
            try {
              tracker.handle(new ResourceLocalizedEvent(req,
                  stat.getLocalPath().toPath(), stat.getLocalSize()));
            } catch (URISyntaxException e) { }

            // unlocking the resource and removing it from scheduled resource
            // list
            assoc.getResource().unlock();
            scheduled.remove(req);
            break;
          case FETCH_PENDING:
            break;
          case FETCH_FAILURE:
            final String diagnostics = stat.getException().toString();
            LOG.warn("{} failed for {} : {}", req, localizerId, diagnostics);
            fetchFailed = true;
            tracker.handle(new ResourceFailedLocalizationEvent(req,
                diagnostics));

            // unlocking the resource and removing it from scheduled resource
            // list
            assoc.getResource().unlock();
            scheduled.remove(req);
            break;
          default:
            LOG.info("Unknown status: " + stat.getStatus());
            fetchFailed = true;
            tracker.handle(new ResourceFailedLocalizationEvent(req,
                stat.getException().getMessage()));
            break;
        }
      }
      if (fetchFailed || killContainerLocalizer.get()) {
        response.setLocalizerAction(LocalizerAction.DIE);
        return response;
      }

      // Give the localizer resources for remote-fetching.
      List<ResourceLocalizationSpec> rsrcs =
          new ArrayList<ResourceLocalizationSpec>();

      /*
       * TODO : It doesn't support multiple downloads per ContainerLocalizer
       * at the same time. We need to think whether we should support this.
       */
      ResourceLocalizationSpec next = findNextResource(user, applicationId);
      if (next != null) {
        rsrcs.add(next);
      }

      response.setLocalizerAction(LocalizerAction.LIVE);
      response.setResourceSpecs(rsrcs);
      return response;
    }

    private Path getPathForLocalization(LocalResource rsrc,
        LocalResourcesTracker tracker) throws IOException, URISyntaxException {
      String user = context.getUser();
      ApplicationId appId =
          context.getContainerId().getApplicationAttemptId().getApplicationId();
      LocalResourceVisibility vis = rsrc.getVisibility();
      String cacheDirectory = null;
      if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
        cacheDirectory = getUserFileCachePath(user);
      } else {// APPLICATION ONLY
        cacheDirectory = getAppFileCachePath(user, appId.toString());
      }
      Path dirPath =
          dirsHandler.getLocalPathForWrite(cacheDirectory,
            ContainerLocalizer.getEstimatedSize(rsrc), false);
      return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
          dirPath, delService);
    }

    @Override
    @SuppressWarnings("unchecked") // dispatcher not typed
    public void run() {
      Path nmPrivateCTokensPath = null;
      Throwable exception = null;
      try {
        // Get nmPrivateDir
        nmPrivateCTokensPath = dirsHandler.getLocalPathForWrite(
                NM_PRIVATE_DIR + Path.SEPARATOR + tokenFileName);

        // 0) init queue, etc.
        // 1) write credentials to private dir
        writeCredentials(nmPrivateCTokensPath);
        // 2) exec initApplication and wait
        if (dirsHandler.areDisksHealthy()) {
          exec.startLocalizer(new LocalizerStartContext.Builder()
              .setNmPrivateContainerTokens(nmPrivateCTokensPath)
              .setNmAddr(localizationServerAddress)
              .setUser(context.getUser())
              .setAppId(context.getContainerId()
                  .getApplicationAttemptId().getApplicationId().toString())
              .setLocId(localizerId)
              .setDirsHandler(dirsHandler)
              .build());
        } else {
          throw new IOException("All disks failed. "
              + dirsHandler.getDisksHealthReport(false));
        }
      // TODO handle ExitCodeException separately?
      } catch (FSError fe) {
        exception = fe;
      } catch (Exception e) {
        exception = e;
      } finally {
        if (exception != null) {
          LOG.info("Localizer failed for "+localizerId, exception);
          // On error, report failure to Container and signal ABORT
          // Notify resource of failed localization
          ContainerId cId = context.getContainerId();
          dispatcher.getEventHandler().handle(new ContainerResourceFailedEvent(
              cId, null, exception.getMessage()));
        }
        List<Path> paths = new ArrayList<Path>();
        for (LocalizerResourceRequestEvent event : scheduled.values()) {
          // This means some resources were in downloading state. Schedule
          // deletion task for localization dir and tmp dir used for downloading
          Path locRsrcPath = event.getResource().getLocalPath();
          if (locRsrcPath != null) {
            Path locRsrcDirPath = locRsrcPath.getParent();
            paths.add(locRsrcDirPath);
            paths.add(new Path(locRsrcDirPath + "_tmp"));
          }
          event.getResource().unlock();
        }
        if (!paths.isEmpty()) {
          FileDeletionTask deletionTask = new FileDeletionTask(delService,
              context.getUser(), null, paths);
          delService.delete(deletionTask);
        }
        FileDeletionTask deletionTask = new FileDeletionTask(delService, null,
            nmPrivateCTokensPath, null);
        delService.delete(deletionTask);
      }
    }

    private Credentials getSystemCredentialsSentFromRM(
        LocalizerContext localizerContext) throws IOException {
      ApplicationId appId =
          localizerContext.getContainerId().getApplicationAttemptId()
            .getApplicationId();
      Credentials systemCredentials =
          nmContext.getSystemCredentialsForApps().get(appId);
      if (systemCredentials == null) {
        return null;
      }

      LOG.debug("Adding new framework-token for {} for localization: {}",
          appId, systemCredentials.getAllTokens());

      return systemCredentials;
    }
    
    private void writeCredentials(Path nmPrivateCTokensPath)
        throws IOException {
      DataOutputStream tokenOut = null;
      try {
        Credentials credentials = context.getCredentials();
        if (UserGroupInformation.isSecurityEnabled()) {
          Credentials systemCredentials =
              getSystemCredentialsSentFromRM(context);
          if (systemCredentials != null) {
            credentials = systemCredentials;
          }
        }

        FileContext lfs = getLocalFileContext(getConfig());
        tokenOut =
            lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
        LOG.info("Writing credentials to the nmPrivate file "
            + nmPrivateCTokensPath.toString());
        if (LOG.isDebugEnabled()) {
          LOG.debug("Credentials list in {}: " + nmPrivateCTokensPath);
          for (Token<? extends TokenIdentifier> tk : credentials
              .getAllTokens()) {
            LOG.debug("{} : {}", tk, buildTokenFingerprint(tk));
          }
        }
        if (UserGroupInformation.isSecurityEnabled()) {
          credentials = new Credentials(credentials);
          LocalizerTokenIdentifier id = secretManager.createIdentifier();
          Token<LocalizerTokenIdentifier> localizerToken =
              new Token<LocalizerTokenIdentifier>(id, secretManager);
          credentials.addToken(id.getKind(), localizerToken);
        }
        credentials.writeTokenStorageToStream(tokenOut);
      } finally {
        if (tokenOut != null) {
          tokenOut.close();
        }
      }
    }

  }

  /**
   * Returns a fingerprint of a token.  The fingerprint is suitable for use in
   * logging, because it cannot be used to determine the secret.  The
   * fingerprint is built using the first 10 bytes of a SHA-256 hash of the
   * string encoding of the token.  The returned string contains the hex
   * representation of each byte, delimited by a space.
   *
   * @param tk token
   * @return token fingerprint
   * @throws IOException if there is an I/O error
   */
  @VisibleForTesting
  static String buildTokenFingerprint(Token<? extends TokenIdentifier> tk)
      throws IOException {
    char[] digest = DigestUtils.sha256Hex(tk.encodeToUrlString()).toCharArray();
    StringBuilder fingerprint = new StringBuilder();
    for (int i = 0; i < 10; ++i) {
      if (i > 0) {
        fingerprint.append(' ');
      }
      fingerprint.append(digest[2 * i]);
      fingerprint.append(digest[2 * i + 1]);
    }
    return fingerprint.toString();
  }

  static class CacheCleanup extends Thread {

    private final Dispatcher dispatcher;

    public CacheCleanup(Dispatcher dispatcher) {
      super("CacheCleanup");
      this.dispatcher = dispatcher;
    }

    @Override
    @SuppressWarnings("unchecked") // dispatcher not typed
    public void run() {
      dispatcher.getEventHandler().handle(
          new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP));
    }

  }

  private void initializeLocalDirs(FileContext lfs) {
    List<String> localDirs = dirsHandler.getLocalDirs();
    for (String localDir : localDirs) {
      initializeLocalDir(lfs, localDir);
    }
  }

  private void initializeLocalDir(FileContext lfs, String localDir) {

    Map<Path, FsPermission> pathPermissionMap = getLocalDirsPathPermissionsMap(localDir);
    for (Map.Entry<Path, FsPermission> entry : pathPermissionMap.entrySet()) {
      FileStatus status;
      try {
        status = lfs.getFileStatus(entry.getKey());
      }
      catch(FileNotFoundException fs) {
        status = null;
      }
      catch(IOException ie) {
        String msg = "Could not get file status for local dir " + entry.getKey();
        LOG.warn(msg, ie);
        throw new YarnRuntimeException(msg, ie);
      }
      if(status == null) {
        try {
          lfs.mkdir(entry.getKey(), entry.getValue(), true);
          status = lfs.getFileStatus(entry.getKey());
        } catch (IOException e) {
          String msg = "Could not initialize local dir " + entry.getKey();
          LOG.warn(msg, e);
          throw new YarnRuntimeException(msg, e);
        }
      }
      FsPermission perms = status.getPermission();
      if(!perms.equals(entry.getValue())) {
        try {
          lfs.setPermission(entry.getKey(), entry.getValue());
        }
        catch(IOException ie) {
          String msg = "Could not set permissions for local dir " + entry.getKey();
          LOG.warn(msg, ie);
          throw new YarnRuntimeException(msg, ie);
        }
      }
    }
  }

  private void initializeLogDirs(FileContext lfs) {
    List<String> logDirs = dirsHandler.getLogDirs();
    for (String logDir : logDirs) {
      initializeLogDir(lfs, logDir);
    }
  }

  private void initializeLogDir(FileContext fs, String logDir) {
    try {
      fs.mkdir(new Path(logDir), null, true);
    } catch (FileAlreadyExistsException fe) {
      // do nothing
    } catch (IOException e) {
      String msg = "Could not initialize log dir " + logDir;
      LOG.warn(msg, e);
      throw new YarnRuntimeException(msg, e);
    }
  }

  private void cleanupLogDirs(FileContext fs, DeletionService del) {
    for (String logDir : dirsHandler.getLogDirsForCleanup()) {
      try {
        cleanupLogDir(fs, del, logDir);
      } catch (IOException e) {
        LOG.warn("failed to cleanup app log dir " + logDir, e);
      }
    }
  }

  private void cleanupLogDir(FileContext fs, DeletionService del,
      String logDir) throws IOException {
    if (!fs.util().exists(new Path(logDir))){
      return;
    }
    renameAppLogDir(logDir);
    deleteAppLogDir(fs, del, logDir);
  }

  private void renameAppLogDir(String logDir) throws IOException {
    long currentTimeStamp = System.currentTimeMillis();
    RemoteIterator<FileStatus> fileStatuses =
        lfs.listStatus(new Path(logDir));
    if (fileStatuses != null) {
      while (fileStatuses.hasNext()) {
        FileStatus fileStatus = fileStatuses.next();
        String appName = fileStatus.getPath().getName();
        if (appName.matches("^application_\\d+_\\d+$")) {
          lfs.rename(new Path(logDir, appName),
              new Path(logDir, appName + "_DEL_" + currentTimeStamp));
        }
      }
    }
  }

  private void deleteAppLogDir(FileContext fs, DeletionService del,
      String logDir) throws IOException {
    RemoteIterator<FileStatus> fileStatuses =
        fs.listStatus(new Path(logDir));
    if (fileStatuses != null) {
      while (fileStatuses.hasNext()) {
        FileStatus fileStatus = fileStatuses.next();
        String appName = fileStatus.getPath().getName();
        if (appName.matches("^application_\\d+_\\d+_DEL_\\d+$")) {
          LOG.info("delete app log dir," + appName);
          FileDeletionTask deletionTask = new FileDeletionTask(del, null,
              fileStatus.getPath(), null);
          del.delete(deletionTask);
        }
      }
    }
  }

  private void cleanUpLocalDirs(FileContext lfs, DeletionService del) {
    for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
      cleanUpLocalDir(lfs, del, localDir);
    }
  }

  private void cleanUpLocalDir(FileContext lfs, DeletionService del,
      String localDir) {
    long currentTimeStamp = System.currentTimeMillis();
    renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
      currentTimeStamp);
    renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
      currentTimeStamp);
    renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
      currentTimeStamp);
    try {
      deleteLocalDir(lfs, del, localDir);
    } catch (IOException e) {
      // Do nothing, just give the warning
      LOG.warn("Failed to delete localDir: " + localDir);
    }
  }

  private void renameLocalDir(FileContext lfs, String localDir,
      String localSubDir, long currentTimeStamp) {
    try {
      lfs.rename(new Path(localDir, localSubDir), new Path(
          localDir, localSubDir + "_DEL_" + currentTimeStamp));
    } catch (FileNotFoundException ex) {
      // No need to handle this exception
      // localSubDir may not be exist
    } catch (Exception ex) {
      // Do nothing, just give the warning
      LOG.warn("Failed to rename the local file under " +
          localDir + "/" + localSubDir);
    }
  }

  private void deleteLocalDir(FileContext lfs, DeletionService del,
      String localDir) throws IOException {
    RemoteIterator<FileStatus> fileStatus = lfs.listStatus(new Path(localDir));
    if (fileStatus != null) {
      while (fileStatus.hasNext()) {
        FileStatus status = fileStatus.next();
        try {
          if (status.getPath().getName().matches(".*" +
              ContainerLocalizer.USERCACHE + "_DEL_.*")) {
            LOG.info("usercache path : " + status.getPath().toString());
            cleanUpFilesPerUserDir(lfs, del, status.getPath());
          } else if (status.getPath().getName()
              .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
              ||
              status.getPath().getName()
                  .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) {
            FileDeletionTask deletionTask = new FileDeletionTask(del, null,
                status.getPath(), null);
            del.delete(deletionTask);
          }
        } catch (IOException ex) {
          // Do nothing, just give the warning
          LOG.warn("Failed to delete this local Directory: " +
              status.getPath().getName());
        }
      }
    }
  }

  private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
      Path userDirPath) throws IOException {
    RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
    FileDeletionTask dependentDeletionTask = new FileDeletionTask(del, null,
        userDirPath, new ArrayList<Path>());
    if (userDirStatus != null && userDirStatus.hasNext()) {
      List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
      while (userDirStatus.hasNext()) {
        FileStatus status = userDirStatus.next();
        String owner = status.getOwner();
        List<Path> pathList = new ArrayList<>();
        pathList.add(status.getPath());
        FileDeletionTask deletionTask = new FileDeletionTask(del, owner, null,
            pathList);
        deletionTask.addDeletionTaskDependency(dependentDeletionTask);
        deletionTasks.add(deletionTask);
      }
      for (FileDeletionTask task : deletionTasks) {
        del.delete(task);
      }
    } else {
      del.delete(dependentDeletionTask);
    }
  }
  
  /**
   * Check each local dir to ensure it has been setup correctly and will
   * attempt to fix any issues it finds.
   * @return void
   */
  @VisibleForTesting
  void checkAndInitializeLocalDirs() {
    List<String> dirs = dirsHandler.getLocalDirs();
    List<String> checkFailedDirs = new ArrayList<String>();
    for (String dir : dirs) {
      try {
        checkLocalDir(dir);
      } catch (YarnRuntimeException e) {
        checkFailedDirs.add(dir);
      }
    }
    for (String dir : checkFailedDirs) {
      LOG.info("Attempting to initialize " + dir);
      initializeLocalDir(lfs, dir);
      try {
        checkLocalDir(dir);
      } catch (YarnRuntimeException e) {
        String msg =
            "Failed to setup local dir " + dir + ", which was marked as good.";
        LOG.warn(msg, e);
        throw new YarnRuntimeException(msg, e);
      }
    }
  }

  private boolean checkLocalDir(String localDir) {

    Map<Path, FsPermission> pathPermissionMap = getLocalDirsPathPermissionsMap(localDir);

    for (Map.Entry<Path, FsPermission> entry : pathPermissionMap.entrySet()) {
      FileStatus status;
      try {
        status = lfs.getFileStatus(entry.getKey());
      } catch (Exception e) {
        String msg =
            "Could not carry out resource dir checks for " + localDir
                + ", which was marked as good";
        LOG.warn(msg, e);
        throw new YarnRuntimeException(msg, e);
      }

      if (!status.getPermission().equals(entry.getValue())) {
        String msg =
            "Permissions incorrectly set for dir " + entry.getKey()
                + ", should be " + entry.getValue() + ", actual value = "
                + status.getPermission();
        LOG.warn(msg);
        throw new YarnRuntimeException(msg);
      }
    }
    return true;
  }

  private Map<Path, FsPermission> getLocalDirsPathPermissionsMap(String localDir) {
    Map<Path, FsPermission> localDirPathFsPermissionsMap = new HashMap<Path, FsPermission>();

    FsPermission defaultPermission =
        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
    FsPermission nmPrivatePermission =
        NM_PRIVATE_PERM.applyUMask(lfs.getUMask());

    Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
    Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
    Path sysDir = new Path(localDir, NM_PRIVATE_DIR);

    localDirPathFsPermissionsMap.put(userDir, defaultPermission);
    localDirPathFsPermissionsMap.put(fileDir, defaultPermission);
    localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission);
    return localDirPathFsPermissionsMap;
  }

  public LocalizedResource getLocalizedResource(LocalResourceRequest req,
      String user, ApplicationId appId) {
    LocalResourcesTracker tracker =
        getLocalResourcesTracker(req.getVisibility(), user, appId);
    if (tracker == null) {
      return null;
    }
    return tracker.getLocalizedResource(req);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ContainerLocalizer 源码

hadoop LocalCacheCleaner 源码

hadoop LocalCacheDirectoryManager 源码

hadoop LocalResourceRequest 源码

hadoop LocalResourcesTracker 源码

hadoop LocalResourcesTrackerImpl 源码

hadoop LocalizedResource 源码

hadoop LocalizerContext 源码

hadoop ResourceSet 源码

hadoop ResourceState 源码

0  赞