hadoop LocalResourcesTrackerImpl 源码

  • 2022-10-20
  • 浏览 (178)

haddop LocalResourcesTrackerImpl 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;

import org.apache.hadoop.classification.VisibleForTesting;


/**
 * A collection of {@link LocalizedResource}s all of same
 * {@link LocalResourceVisibility}.
 * 
 */

class LocalResourcesTrackerImpl implements LocalResourcesTracker {

  static final Logger LOG =
       LoggerFactory.getLogger(LocalResourcesTrackerImpl.class);
  private static final String RANDOM_DIR_REGEX = "-?\\d+";
  private static final Pattern RANDOM_DIR_PATTERN = Pattern
      .compile(RANDOM_DIR_REGEX);

  private final String user;
  private final ApplicationId appId;
  private final Dispatcher dispatcher;
  @VisibleForTesting
  final ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc;
  private Configuration conf;
  private LocalDirsHandlerService dirsHandler;
  /*
   * This flag controls whether this resource tracker uses hierarchical
   * directories or not. For PRIVATE and PUBLIC resource trackers it
   * will be set whereas for APPLICATION resource tracker it would
   * be false.
   */
  private final boolean useLocalCacheDirectoryManager;
  private ConcurrentHashMap<Path, LocalCacheDirectoryManager> directoryManagers;
  /*
   * It is used to keep track of resource into hierarchical directory
   * while it is getting downloaded. It is useful for reference counting
   * in case resource localization fails.
   */
  private ConcurrentHashMap<LocalResourceRequest, Path>
    inProgressLocalResourcesMap;
  /*
   * starting with 10 to accommodate 0-9 directories created as a part of
   * LocalCacheDirectoryManager. So there will be one unique number generator
   * per APPLICATION, USER and PUBLIC cache.
   */
  private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
  private NMStateStoreService stateStore;

  public LocalResourcesTrackerImpl(String user, ApplicationId appId,
      Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
      Configuration conf, NMStateStoreService stateStore,
      LocalDirsHandlerService dirHandler) {
    this(user, appId, dispatcher,
        new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
        useLocalCacheDirectoryManager, conf, stateStore, dirHandler);
  }

  LocalResourcesTrackerImpl(String user, ApplicationId appId,
      Dispatcher dispatcher,
      ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc,
      boolean useLocalCacheDirectoryManager, Configuration conf,
      NMStateStoreService stateStore, LocalDirsHandlerService dirHandler) {
    this.appId = appId;
    this.user = user;
    this.dispatcher = dispatcher;
    this.localrsrc = localrsrc;
    this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
    if (this.useLocalCacheDirectoryManager) {
      directoryManagers =
          new ConcurrentHashMap<>();
      inProgressLocalResourcesMap =
          new ConcurrentHashMap<>();
    }
    this.conf = conf;
    this.stateStore = stateStore;
    this.dirsHandler = dirHandler;
  }

  /*
   * Synchronizing this method for avoiding races due to multiple ResourceEvent's
   * coming to LocalResourcesTracker from Public/Private localizer and
   * Resource Localization Service.
   */
  @Override
  public synchronized void handle(ResourceEvent event) {
    LocalResourceRequest req = event.getLocalResourceRequest();
    LocalizedResource rsrc = localrsrc.get(req);
    switch (event.getType()) {
    case LOCALIZED:
      if (useLocalCacheDirectoryManager) {
        inProgressLocalResourcesMap.remove(req);
      }
      break;
    case REQUEST:
      if (rsrc != null && (!isResourcePresent(rsrc))) {
        LOG.info("Resource " + rsrc.getLocalPath()
            + " is missing, localizing it again");
        removeResource(req);
        rsrc = null;
      }
      if (null == rsrc) {
        rsrc = new LocalizedResource(req, dispatcher);
        localrsrc.put(req, rsrc);
      }
      break;
    case RELEASE:
      if (null == rsrc) {
        // The container sent a release event on a resource which 
        // 1) Failed
        // 2) Removed for some reason (ex. disk is no longer accessible)
        ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
        LOG.info("Container " + relEvent.getContainer()
            + " sent RELEASE event on a resource request " + req
            + " not present in cache.");
        return;
      }
      break;
    case LOCALIZATION_FAILED:
      /*
       * If resource localization fails then Localized resource will be
       * removed from local cache.
       */
      removeResource(req);
      break;
    case RECOVERED:
      if (rsrc != null) {
        LOG.warn("Ignoring attempt to recover existing resource " + rsrc);
        return;
      }
      rsrc = recoverResource(req, (ResourceRecoveredEvent) event);
      localrsrc.put(req, rsrc);
      break;
    }

    if (rsrc == null) {
      LOG.warn("Received " + event.getType() + " event for request " + req
          + " but localized resource is missing");
      return;
    }
    rsrc.handle(event);

    // Remove the resource if its downloading and its reference count has
    // become 0 after RELEASE. This maybe because a container was killed while
    // localizing and no other container is referring to the resource.
    // NOTE: This should NOT be done for public resources since the
    //       download is not associated with a container-specific localizer.
    if (event.getType() == ResourceEventType.RELEASE) {
      if (rsrc.getState() == ResourceState.DOWNLOADING &&
          rsrc.getRefCount() <= 0 &&
          rsrc.getRequest().getVisibility() != LocalResourceVisibility.PUBLIC) {
        removeResource(req);
      }
    }

    if (event.getType() == ResourceEventType.LOCALIZED) {
      if (rsrc.getLocalPath() != null) {
        try {
          stateStore.finishResourceLocalization(user, appId,
              buildLocalizedResourceProto(rsrc));
        } catch (IOException ioe) {
          LOG.error("Error storing resource state for " + rsrc, ioe);
        }
      } else {
        LOG.warn("Resource " + rsrc + " localized without a location");
      }
    }
  }

  private LocalizedResource recoverResource(LocalResourceRequest req,
      ResourceRecoveredEvent event) {
    // unique number for a resource is the directory of the resource
    Path localDir = event.getLocalPath().getParent();
    long rsrcId = Long.parseLong(localDir.getName());

    // update ID generator to avoid conflicts with existing resources
    while (true) {
      long currentRsrcId = uniqueNumberGenerator.get();
      long nextRsrcId = Math.max(currentRsrcId, rsrcId);
      if (uniqueNumberGenerator.compareAndSet(currentRsrcId, nextRsrcId)) {
        break;
      }
    }

    incrementFileCountForLocalCacheDirectory(localDir.getParent());

    return new LocalizedResource(req, dispatcher);
  }

  private LocalizedResourceProto buildLocalizedResourceProto(
      LocalizedResource rsrc) {
    return LocalizedResourceProto.newBuilder()
        .setResource(buildLocalResourceProto(rsrc.getRequest()))
        .setLocalPath(rsrc.getLocalPath().toString())
        .setSize(rsrc.getSize())
        .build();
  }

  private LocalResourceProto buildLocalResourceProto(LocalResource lr) {
    LocalResourcePBImpl lrpb;
    if (!(lr instanceof LocalResourcePBImpl)) {
      lr = LocalResource.newInstance(lr.getResource(), lr.getType(),
          lr.getVisibility(), lr.getSize(), lr.getTimestamp(),
          lr.getPattern());
    }
    lrpb = (LocalResourcePBImpl) lr;
    return lrpb.getProto();
  }

  public void incrementFileCountForLocalCacheDirectory(Path cacheDir) {
    if (useLocalCacheDirectoryManager) {
      Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot(
          cacheDir);
      if (cacheRoot != null) {
        LocalCacheDirectoryManager dir = directoryManagers.get(cacheRoot);
        if (dir == null) {
          dir = new LocalCacheDirectoryManager(conf);
          LocalCacheDirectoryManager otherDir =
              directoryManagers.putIfAbsent(cacheRoot, dir);
          if (otherDir != null) {
            dir = otherDir;
          }
        }
        if (cacheDir.equals(cacheRoot)) {
          dir.incrementFileCountForPath("");
        } else {
          String dirStr = cacheDir.toUri().getRawPath();
          String rootStr = cacheRoot.toUri().getRawPath();
          dir.incrementFileCountForPath(
              dirStr.substring(rootStr.length() + 1));
        }
      }
    }
  }

  /*
   * Update the file-count statistics for a local cache-directory.
   * This will retrieve the localized path for the resource from
   * 1) inProgressRsrcMap if the resource was under localization and it
   * failed.
   * 2) LocalizedResource if the resource is already localized.
   * From this path it will identify the local directory under which the
   * resource was localized. Then rest of the path will be used to decrement
   * file count for the HierarchicalSubDirectory pointing to this relative
   * path.
   */
  private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req,
      LocalizedResource rsrc) {
    if ( useLocalCacheDirectoryManager) {
      Path rsrcPath = null;
      if (inProgressLocalResourcesMap.containsKey(req)) {
        // This happens when localization of a resource fails.
        rsrcPath = inProgressLocalResourcesMap.remove(req);
      } else if (rsrc != null && rsrc.getLocalPath() != null) {
        rsrcPath = rsrc.getLocalPath().getParent().getParent();
      }
      if (rsrcPath != null) {
        Path parentPath = new Path(rsrcPath.toUri().getRawPath());
        while (!directoryManagers.containsKey(parentPath)) {
          parentPath = parentPath.getParent();
          if ( parentPath == null) {
            return;
          }
        }
        if ( parentPath != null) {
          String parentDir = parentPath.toUri().getRawPath().toString();
          LocalCacheDirectoryManager dir = directoryManagers.get(parentPath);
          String rsrcDir = rsrcPath.toUri().getRawPath(); 
          if (rsrcDir.equals(parentDir)) {
            dir.decrementFileCountForPath("");
          } else {
            dir.decrementFileCountForPath(
              rsrcDir.substring(
              parentDir.length() + 1));
          }
        }
      }
    }
  }

/**
   * This module checks if the resource which was localized is already present
   * or not
   * 
   * @param rsrc
   * @return true/false based on resource is present or not
   */
  public boolean isResourcePresent(LocalizedResource rsrc) {
    boolean ret = true;
    if (rsrc.getState() == ResourceState.LOCALIZED) {
      File file = new File(rsrc.getLocalPath().toUri().getRawPath().
        toString());
      if (!file.exists()) {
        ret = false;
      } else if (dirsHandler != null) {
        ret = checkLocalResource(rsrc);
      }
    }
    return ret;
  }

  /**
   * Check if the rsrc is Localized on a good dir.
   *
   * @param rsrc
   * @return
   */
  @VisibleForTesting
  boolean checkLocalResource(LocalizedResource rsrc) {
    List<String> localDirs = dirsHandler.getLocalDirsForRead();
    for (String dir : localDirs) {
      if (isParent(rsrc.getLocalPath().toUri().getPath(), dir)) {
        return true;
      } else {
        continue;
      }
    }
    return false;
  }

  /**
   * @param path
   * @param parentdir
   * @return true if parentdir is parent of path else false.
   */
  private boolean isParent(String path, String parentdir) {
    // Add separator if not present.
    if (path.charAt(path.length() - 1) != File.separatorChar) {
      path += File.separator;
    }
    return path.startsWith(parentdir);
  }

  @Override
  public boolean remove(LocalizedResource rem, DeletionService delService) {
 // current synchronization guaranteed by crude RLS event for cleanup
    LocalizedResource rsrc = localrsrc.get(rem.getRequest());
    if (null == rsrc) {
      LOG.error("Attempt to remove absent resource: " + rem.getRequest()
          + " from " + getUser());
      return true;
    }
    if (rsrc.getRefCount() > 0
        || ResourceState.DOWNLOADING.equals(rsrc.getState()) || rsrc != rem) {
      // internal error
      LOG.error("Attempt to remove resource: " + rsrc
          + " with non-zero refcount");
      return false;
    } else { // ResourceState is LOCALIZED or INIT
      if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
        FileDeletionTask deletionTask = new FileDeletionTask(delService,
            getUser(), getPathToDelete(rsrc.getLocalPath()), null);
        delService.delete(deletionTask);
      }
      removeResource(rem.getRequest());
      LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
      return true;
    }
  }

  private void removeResource(LocalResourceRequest req) {
    LocalizedResource rsrc = localrsrc.remove(req);
    decrementFileCountForLocalCacheDirectory(req, rsrc);
    if (rsrc != null) {
      Path localPath = rsrc.getLocalPath();
      if (localPath != null) {
        try {
          stateStore.removeLocalizedResource(user, appId, localPath);
        } catch (IOException e) {
          LOG.error("Unable to remove resource " + rsrc + " from state store",
              e);
        }
      }
    }
  }

  /**
   * Returns the path up to the random directory component.
   */
  private Path getPathToDelete(Path localPath) {
    Path delPath = localPath.getParent();
    String name = delPath.getName();
    Matcher matcher = RANDOM_DIR_PATTERN.matcher(name);
    if (matcher.matches()) {
      return delPath;
    } else {
      LOG.warn("Random directory component did not match. " +
      		"Deleting localized path only");
      return localPath;
    }
  }

  @Override
  public String getUser() {
    return user;
  }

  @Override
  public Iterator<LocalizedResource> iterator() {
    return localrsrc.values().iterator();
  }

  /**
   * @return {@link Path} absolute path for localization which includes local
   *         directory path and the relative hierarchical path (if use local
   *         cache directory manager is enabled)
   * 
   * @param {@link LocalResourceRequest} Resource localization request to
   *        localize the resource.
   * @param {@link Path} local directory path
   * @param {@link DeletionService} Deletion Service to delete existing
   *        path for localization.
   */
  @Override
  public Path getPathForLocalization(LocalResourceRequest req,
      Path localDirPath, DeletionService delService) {
    Path rPath = localDirPath;
    if (useLocalCacheDirectoryManager && localDirPath != null) {

      if (!directoryManagers.containsKey(localDirPath)) {
        directoryManagers.putIfAbsent(localDirPath,
          new LocalCacheDirectoryManager(conf));
      }
      LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);

      rPath = localDirPath;
      String hierarchicalPath = dir.getRelativePathForLocalization();
      // For most of the scenarios we will get root path only which
      // is an empty string
      if (!hierarchicalPath.isEmpty()) {
        rPath = new Path(localDirPath, hierarchicalPath);
      }
      inProgressLocalResourcesMap.put(req, rPath);
    }

    while (true) {
      Path uniquePath = new Path(rPath,
          Long.toString(uniqueNumberGenerator.incrementAndGet()));
      File file = new File(uniquePath.toUri().getRawPath());
      if (!file.exists()) {
        rPath = uniquePath;
        break;
      }
      // If the directory already exists, delete it and move to next one.
      LOG.warn("Directory " + uniquePath + " already exists, " +
          "try next one.");
      if (delService != null) {
        FileDeletionTask deletionTask = new FileDeletionTask(delService,
            getUser(), uniquePath, null);
        delService.delete(deletionTask);
      }
    }

    Path localPath = new Path(rPath, req.getPath().getName());
    LocalizedResource rsrc = localrsrc.get(req);
    if (rsrc == null) {
      LOG.warn("Resource " + req + " has been removed"
          + " and will no longer be localized");
      return null;
    }
    rsrc.setLocalPath(localPath);
    LocalResource lr = LocalResource.newInstance(req.getResource(),
        req.getType(), req.getVisibility(), req.getSize(),
        req.getTimestamp());
    try {
      stateStore.startResourceLocalization(user, appId,
          ((LocalResourcePBImpl) lr).getProto(), localPath);
    } catch (IOException e) {
      LOG.error("Unable to record localization start for " + rsrc, e);
    }
    return rPath;
  }

  @Override
  public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
    return localrsrc.get(request);
  }

  @VisibleForTesting
  LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) {
    LocalCacheDirectoryManager mgr = null;
    if (useLocalCacheDirectoryManager) {
      mgr = directoryManagers.get(localDirPath);
    }
    return mgr;
  }

  @VisibleForTesting
  LocalDirsHandlerService getDirsHandler() {
    return dirsHandler;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ContainerLocalizer 源码

hadoop LocalCacheCleaner 源码

hadoop LocalCacheDirectoryManager 源码

hadoop LocalResourceRequest 源码

hadoop LocalResourcesTracker 源码

hadoop LocalizedResource 源码

hadoop LocalizerContext 源码

hadoop ResourceLocalizationService 源码

hadoop ResourceSet 源码

hadoop ResourceState 源码

0  赞