hadoop AuxServices 源码

  • 2022-10-20
  • 浏览 (180)

haddop AuxServices 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.containermanager;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.regex.Pattern;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceFile;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecord;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.util.FSDownload;
import org.apache.hadoop.util.Preconditions;

public class AuxServices extends AbstractService
    implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {

  public static final String NM_AUX_SERVICE_DIR = "nmAuxService";
  public static final FsPermission NM_AUX_SERVICE_DIR_PERM =
      new FsPermission((short) 0700);

  public static final String CLASS_NAME = "class.name";
  public static final String SYSTEM_CLASSES = "system.classes";

  static final String STATE_STORE_ROOT_NAME = "nm-aux-services";

  private static final Logger LOG =
       LoggerFactory.getLogger(AuxServices.class);
  private static final String DEL_SUFFIX = "_DEL_";

  private final Map<String, AuxiliaryService> serviceMap;
  private final Map<String, AuxServiceRecord> serviceRecordMap;
  private final Map<String, ByteBuffer> serviceMetaData;
  private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler;
  private final LocalDirsHandlerService dirsHandler;
  private final DeletionService delService;
  private final UserGroupInformation userUGI;

  private final FsPermission storeDirPerms = new FsPermission((short)0700);
  private Path stateStoreRoot = null;
  private FileSystem stateStoreFs = null;

  private volatile boolean manifestEnabled = false;
  private volatile Path manifest;
  private volatile FileSystem manifestFS;
  private Timer manifestReloadTimer;
  private TimerTask manifestReloadTask;
  private long manifestReloadInterval;
  private long manifestModifyTS = -1;

  private final ObjectMapper mapper;

  private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$");

  AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler,
      Context nmContext, DeletionService deletionService) {
    super(AuxServices.class.getName());
    serviceMap =
      Collections.synchronizedMap(new HashMap<String, AuxiliaryService>());
    serviceRecordMap =
        Collections.synchronizedMap(new HashMap<String, AuxServiceRecord>());
    serviceMetaData =
      Collections.synchronizedMap(new HashMap<String,ByteBuffer>());
    this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler;
    this.dirsHandler = nmContext.getLocalDirsHandler();
    this.delService = deletionService;
    this.userUGI = getRemoteUgi();
    this.mapper = new ObjectMapper();
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    // Obtain services from configuration in init()
  }

  /**
   * Returns whether aux services manifest / dynamic loading is enabled.
   */
  public boolean isManifestEnabled() {
    return manifestEnabled;
  }

  /**
   * Adds a service to the service map.
   *
   * @param name aux service name
   * @param service aux service
   * @param serviceRecord aux service record
   */
  protected final synchronized void addService(String name,
      AuxiliaryService service, AuxServiceRecord serviceRecord) {
    LOG.info("Adding auxiliary service " + serviceRecord.getName() +
        " version " + serviceRecord.getVersion());
    serviceMap.put(name, service);
    serviceRecordMap.put(name, serviceRecord);
  }

  Collection<AuxiliaryService> getServices() {
    return Collections.unmodifiableCollection(serviceMap.values());
  }

  /**
   * Gets current aux service records.
   *
   * @return a collection of service records
   */
  public Collection<AuxServiceRecord> getServiceRecords() {
    return Collections.unmodifiableCollection(serviceRecordMap.values());
  }

  /**
   * @return the meta data for all registered services, that have been started.
   * If a service has not been started no metadata will be available. The key
   * is the name of the service as defined in the configuration.
   */
  public Map<String, ByteBuffer> getMetaData() {
    Map<String, ByteBuffer> metaClone = new HashMap<>(serviceMetaData.size());
    synchronized (serviceMetaData) {
      for (Entry<String, ByteBuffer> entry : serviceMetaData.entrySet()) {
        metaClone.put(entry.getKey(), entry.getValue().duplicate());
      }
    }
    return metaClone;
  }

  /**
   * Creates an auxiliary service from a specification using the Configuration
   * classloader.
   *
   * @param service aux service record
   * @return auxiliary service
   */
  private AuxiliaryService createAuxServiceFromConfiguration(AuxServiceRecord
      service) {
    Configuration c = new Configuration(false);
    c.set(CLASS_NAME, getClassName(service));
    Class<? extends AuxiliaryService> sClass = c.getClass(CLASS_NAME,
        null, AuxiliaryService.class);

    if (sClass == null) {
      throw new YarnRuntimeException("No class defined for auxiliary " +
          "service" + service.getName());
    }
    return ReflectionUtils.newInstance(sClass, null);
  }

  /**
   * Creates an auxiliary service from a specification using a custom local
   * classpath.
   *
   * @param service aux service record
   * @param appLocalClassPath local class path
   * @param conf configuration
   * @return auxiliary service
   * @throws IOException
   * @throws ClassNotFoundException
   */
  private AuxiliaryService createAuxServiceFromLocalClasspath(
      AuxServiceRecord service, String appLocalClassPath, Configuration conf)
      throws IOException, ClassNotFoundException {
    Preconditions.checkArgument(appLocalClassPath != null &&
        !appLocalClassPath.isEmpty(),
        "local classpath was null in createAuxServiceFromLocalClasspath");
    final String sName = service.getName();
    final String className = getClassName(service);

    if (service.getConfiguration() != null && service.getConfiguration()
        .getFiles().size() > 0) {
      throw new YarnRuntimeException("The aux service:" + sName
          + " has configured local classpath:" + appLocalClassPath
          + " and config files:" + service.getConfiguration().getFiles()
          + ". Only one of them should be configured.");
    }

    return AuxiliaryServiceWithCustomClassLoader.getInstance(conf, className,
        appLocalClassPath, getSystemClasses(service));
  }

  /**
   * Creates an auxiliary service from a specification.
   *
   * @param service aux service record
   * @param conf configuration
   * @param fromConfiguration true if from configuration, false if from manifest
   * @return auxiliary service
   * @throws IOException
   * @throws ClassNotFoundException
   */
  private AuxiliaryService createAuxService(AuxServiceRecord service,
      Configuration conf, boolean fromConfiguration) throws IOException,
      ClassNotFoundException {
    final String sName = service.getName();
    final String className = getClassName(service);
    if (className == null || className.isEmpty()) {
      throw new YarnRuntimeException("Class name not provided for auxiliary " +
          "service " + sName);
    }
    if (fromConfiguration) {
      // aux services from the configuration have an additional configuration
      // option specifying a local classpath that will not be localized
      final String appLocalClassPath = conf.get(String.format(
          YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName));
      if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) {
        return createAuxServiceFromLocalClasspath(service, appLocalClassPath,
            conf);
      }
    }
    AuxServiceConfiguration serviceConf = service.getConfiguration();
    List<Path> destFiles = new ArrayList<>();
    if (serviceConf != null) {
      List<AuxServiceFile> files = serviceConf.getFiles();
      if (files != null) {
        for (AuxServiceFile file : files) {
          // localize file (if needed) and add it to the list of paths that
          // will become the classpath
          destFiles.add(maybeDownloadJars(sName, className, file.getSrcFile(),
              file.getType(), conf));
        }
      }
    }

    if (destFiles.size() > 0) {
      // create aux service using a custom localized classpath
      LOG.info("The aux service:" + sName
          + " is using the custom classloader with classpath " + destFiles);
      return AuxiliaryServiceWithCustomClassLoader.getInstance(conf,
          className, StringUtils.join(File.pathSeparatorChar, destFiles),
          getSystemClasses(service));
    } else {
      return createAuxServiceFromConfiguration(service);
    }
  }

  /**
   * Copies the specified remote file to local NM aux service directory. If the
   * same file already exists (as determined by modification time), the file
   * will not be copied again.
   *
   * @param sName service name
   * @param className service class name
   * @param remoteFile location of the file to download
   * @param type type of file (STATIC for a jar or ARCHIVE for a tarball)
   * @param conf configuration
   * @return path of the downloaded file
   * @throws IOException
   */
  @VisibleForTesting
  protected Path maybeDownloadJars(String sName, String className, String
      remoteFile, AuxServiceFile.TypeEnum type, Configuration conf)
      throws IOException {
    // load AuxiliaryService from remote classpath
    FileContext localLFS = getLocalFileContext(conf);
    // create NM aux-service dir in NM localdir if it does not exist.
    Path nmAuxDir = dirsHandler.getLocalPathForWrite("."
        + Path.SEPARATOR + NM_AUX_SERVICE_DIR);
    if (!localLFS.util().exists(nmAuxDir)) {
      try {
        localLFS.mkdir(nmAuxDir, NM_AUX_SERVICE_DIR_PERM, true);
      } catch (IOException ex) {
        throw new YarnRuntimeException("Fail to create dir:"
            + nmAuxDir.toString(), ex);
      }
    }
    Path src = new Path(remoteFile);
    FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf);
    FileStatus scFileStatus = remoteLFS.getFileStatus(src);
    if (!scFileStatus.getOwner().equals(
        this.userUGI.getShortUserName())) {
      throw new YarnRuntimeException("The remote jarfile owner:"
          + scFileStatus.getOwner() + " is not the same as the NM user:"
          + this.userUGI.getShortUserName() + ".");
    }
    if ((scFileStatus.getPermission().toShort() & 0022) != 0) {
      throw new YarnRuntimeException("The remote jarfile should not "
          + "be writable by group or others. "
          + "The current Permission is "
          + scFileStatus.getPermission().toShort());
    }
    Path downloadDest = new Path(nmAuxDir,
        className + "_" + scFileStatus.getModificationTime());
    // check whether we need to re-download the jar
    // from remote directory
    Path targetDirPath = new Path(downloadDest,
        scFileStatus.getPath().getName());
    FileStatus[] allSubDirs = localLFS.util().listStatus(nmAuxDir);
    for (FileStatus sub : allSubDirs) {
      if (sub.getPath().getName().equals(downloadDest.getName())) {
        return targetDirPath;
      } else {
        if (sub.getPath().getName().contains(className) &&
            !sub.getPath().getName().endsWith(DEL_SUFFIX)) {
          Path delPath = new Path(sub.getPath().getParent(),
              sub.getPath().getName() + DEL_SUFFIX);
          localLFS.rename(sub.getPath(), delPath);
          LOG.info("delete old aux service jar dir:"
              + delPath.toString());
          FileDeletionTask deletionTask = new FileDeletionTask(
              this.delService, null, delPath, null);
          this.delService.delete(deletionTask);
        }
      }
    }
    LocalResourceType srcType;
    if (type == AuxServiceFile.TypeEnum.STATIC) {
      srcType = LocalResourceType.FILE;
    } else if (type == AuxServiceFile.TypeEnum.ARCHIVE) {
      srcType = LocalResourceType.ARCHIVE;
    } else {
      throw new YarnRuntimeException(
          "Cannot unpack file of type " + type + " from remote-file-path:" +
              src + "for aux-service:" + ".\n");
    }
    LocalResource scRsrc = LocalResource.newInstance(
        URL.fromURI(src.toUri()),
        srcType, LocalResourceVisibility.PRIVATE,
        scFileStatus.getLen(), scFileStatus.getModificationTime());
    FSDownload download = new FSDownload(localLFS, null, conf,
        downloadDest, scRsrc, null);
    try {
      // don't need to convert downloaded path into a dir
      // since it's already a jar path.
      return download.call();
    } catch (Exception ex) {
      throw new YarnRuntimeException(
          "Exception happend while downloading files "
              + "for aux-service:" + sName + " and remote-file-path:"
              + src + ".\n" + ex.getMessage());
    }
  }

  /**
   * If recovery is enabled, creates a recovery directory for the named
   * service and sets it on the service.
   *
   * @param sName name of the service
   * @param s auxiliary service
   * @throws IOException
   */
  private void setStateStoreDir(String sName, AuxiliaryService s) throws
      IOException {
    if (stateStoreRoot != null) {
      Path storePath = new Path(stateStoreRoot, sName);
      stateStoreFs.mkdirs(storePath, storeDirPerms);
      s.setRecoveryPath(storePath);
    }
  }

  /**
   * Removes a service from the service map and stops it, if it exists.
   *
   * @param sName name of the service
   */
  private synchronized void maybeRemoveAuxService(String sName) {
    AuxiliaryService s;
    s = serviceMap.remove(sName);
    serviceRecordMap.remove(sName);
    serviceMetaData.remove(sName);
    if (s != null) {
      LOG.info("Removing aux service " + sName);
      stopAuxService(s);
    }
  }

  /**
   * Constructs an AuxiliaryService then configures and initializes it based
   * on a service specification.
   *
   * @param service aux service record
   * @param conf configuration
   * @param fromConfiguration true if from configuration, false if from manifest
   * @return aux service
   * @throws IOException
   */
  private AuxiliaryService initAuxService(AuxServiceRecord service,
      Configuration conf, boolean fromConfiguration) throws IOException {
    final String sName = service.getName();
    AuxiliaryService s;
    try {
      Preconditions
          .checkArgument(
              validateAuxServiceName(sName),
              "The auxiliary service name: " + sName + " is invalid. " +
                  "The valid service name should only contain a-zA-Z0-9_ " +
                  "and cannot start with numbers.");
      s = createAuxService(service, conf, fromConfiguration);
      if (s == null) {
        throw new YarnRuntimeException("No auxiliary service class loaded for" +
            " " + sName);
      }
      // TODO better use s.getName()?
      if (!sName.equals(s.getName())) {
        LOG.warn("The Auxiliary Service named '" + sName + "' in the "
            + "configuration is for " + s.getClass() + " which has "
            + "a name of '" + s.getName() + "'. Because these are "
            + "not the same tools trying to send ServiceData and read "
            + "Service Meta Data may have issues unless the refer to "
            + "the name in the config.");
      }
      s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler);
      setStateStoreDir(sName, s);
      Configuration customConf = new Configuration(conf);
      if (service.getConfiguration() != null) {
        for (Entry<String, String> entry : service.getConfiguration()
            .getProperties().entrySet()) {
          customConf.set(entry.getKey(), entry.getValue());
        }
      }
      s.init(customConf);

      LOG.info("Initialized auxiliary service " + sName);
    } catch (RuntimeException e) {
      LOG.error("Failed to initialize " + sName, e);
      throw e;
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException(e);
    }
    return s;
  }

  /**
   * Reloads auxiliary services manifest. Must be called after service init.
   *
   * @throws IOException if manifest can't be loaded
   */
  @VisibleForTesting
  protected void reloadManifest() throws IOException {
    loadManifest(getConfig(), true);
  }

  /**
   * Reloads auxiliary services. Must be called after service init.
   *
   * @param services a list of auxiliary services
   * @throws IOException if aux services have not been started yet or dynamic
   * reloading is not enabled
   */
  public synchronized void reload(AuxServiceRecords services) throws
      IOException {
    if (!manifestEnabled) {
      throw new IOException("Dynamic reloading is not enabled via " +
          YarnConfiguration.NM_AUX_SERVICES_MANIFEST_ENABLED);
    }
    if (getServiceState() != Service.STATE.STARTED) {
      throw new IOException("Auxiliary services have not been started yet, " +
          "please retry later");
    }
    LOG.info("Received list of auxiliary services: " + mapper
        .writeValueAsString(services));
    loadServices(services, getConfig(), true);
  }

  private boolean checkManifestPermissions(FileStatus status) throws
      IOException {
    if ((status.getPermission().toShort() & 0022) != 0) {
      LOG.error("Manifest file and parents must not be writable by group or " +
          "others. The current Permission of " + status.getPath() + " is " +
          status.getPermission());
      return false;
    }
    Path parent = status.getPath().getParent();
    if (parent == null) {
      return true;
    }
    return checkManifestPermissions(manifestFS.getFileStatus(parent));
  }

  private boolean checkManifestOwnerAndPermissions(FileStatus status) throws
      IOException {
    AccessControlList yarnAdminAcl = new AccessControlList(getConfig().get(
        YarnConfiguration.YARN_ADMIN_ACL,
        YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
    if (!yarnAdminAcl.isUserAllowed(
        UserGroupInformation.createRemoteUser(status.getOwner()))) {
      LOG.error("Manifest must be owned by YARN admin: " + manifest);
      return false;
    }

    return checkManifestPermissions(status);
  }

  /**
   * Reads the manifest file if it is configured, exists, and has not been
   * modified since the last read.
   *
   * @return aux service records
   * @throws IOException
   */
  private synchronized AuxServiceRecords maybeReadManifestFile() throws
      IOException {
    if (manifest == null) {
      return null;
    }
    if (!manifestFS.exists(manifest)) {
      LOG.warn("Manifest file " + manifest + " doesn't exist");
      return null;
    }
    FileStatus status;
    try {
      status = manifestFS.getFileStatus(manifest);
    } catch (FileNotFoundException e) {
      LOG.warn("Manifest file " + manifest + " doesn't exist");
      return null;
    }
    if (!status.isFile()) {
      LOG.warn("Manifest file " + manifest + " is not a file");
    }
    if (!checkManifestOwnerAndPermissions(status)) {
      return null;
    }
    if (status.getModificationTime() == manifestModifyTS) {
      return null;
    }
    manifestModifyTS = status.getModificationTime();
    LOG.info("Reading auxiliary services manifest " + manifest);
    try (FSDataInputStream in = manifestFS.open(manifest)) {
      return mapper.readValue((InputStream) in, AuxServiceRecords.class);
    }
  }

  /**
   * Updates current aux services based on changes found in the manifest.
   *
   * @param conf configuration
   * @param startServices if true starts services, otherwise only inits services
   * @throws IOException
   */
  @VisibleForTesting
  protected synchronized void loadManifest(Configuration conf, boolean
      startServices) throws IOException {
    if (!manifestEnabled) {
      throw new IOException("Dynamic reloading is not enabled via " +
          YarnConfiguration.NM_AUX_SERVICES_MANIFEST_ENABLED);
    }
    if (manifest == null) {
      return;
    }
    if (!manifestFS.exists(manifest)) {
      if (serviceMap.isEmpty()) {
        return;
      }
      LOG.info("Manifest file " + manifest + " doesn't exist, stopping " +
          "auxiliary services");
      Set<String> servicesToRemove = new HashSet<>(serviceMap.keySet());
      for (String sName : servicesToRemove) {
        maybeRemoveAuxService(sName);
      }
      return;
    }
    AuxServiceRecords services = maybeReadManifestFile();
    loadServices(services, conf, startServices);
  }

  /**
   * Updates current aux services based on changes found in the service list.
   *
   * @param services list of auxiliary services
   * @param conf configuration
   * @param startServices if true starts services, otherwise only inits services
   * @throws IOException
   */
  private synchronized void loadServices(AuxServiceRecords services,
      Configuration conf, boolean startServices) throws IOException {
    if (services == null) {
      // read did not occur or no changes detected
      return;
    }
    Set<String> loadedAuxServices = new HashSet<>();
    boolean foundChanges = false;
    if (services.getServices() != null) {
      for (AuxServiceRecord service : services.getServices()) {
        AuxServiceRecord existingService = serviceRecordMap.get(service
            .getName());
        loadedAuxServices.add(service.getName());
        if (existingService != null && existingService.equals(service)) {
          LOG.debug("Auxiliary service already loaded: {}", service.getName());
          continue;
        }
        foundChanges = true;
        try {
          // stop aux service
          maybeRemoveAuxService(service.getName());
          // init aux service
          AuxiliaryService s = initAuxService(service, conf, false);
          if (startServices) {
            // start aux service
            startAuxService(service.getName(), s, service);
          }
          // add aux service to serviceMap
          addService(service.getName(), s, service);
        } catch (IOException e) {
          LOG.error("Failed to load auxiliary service " + service.getName());
        }
      }
    }

    // remove aux services that do not appear in the new list
    Set<String> servicesToRemove = new HashSet<>(serviceMap.keySet());
    servicesToRemove.removeAll(loadedAuxServices);
    for (String sName : servicesToRemove) {
      foundChanges = true;
      maybeRemoveAuxService(sName);
    }

    if (!foundChanges) {
      LOG.info("No auxiliary services changes detected");
    }
  }

  private static String getClassName(AuxServiceRecord service) {
    AuxServiceConfiguration serviceConf = service.getConfiguration();
    if (serviceConf == null) {
      return null;
    }
    return serviceConf.getProperty(CLASS_NAME);
  }

  private static String[] getSystemClasses(AuxServiceRecord service) {
    AuxServiceConfiguration serviceConf = service.getConfiguration();
    if (serviceConf == null || serviceConf.getProperty(SYSTEM_CLASSES) == null) {
      return new String[]{};
    }
    return StringUtils.split(serviceConf.getProperty(SYSTEM_CLASSES));
  }

  /**
   * Translates an aux service specified in the Configuration to an aux
   * service record.
   *
   * @param sName aux service name
   * @param conf configuration
   * @return
   */
  private static AuxServiceRecord createServiceRecordFromConfiguration(String
      sName, Configuration conf) {
    String className = conf.get(String.format(
        YarnConfiguration.NM_AUX_SERVICE_FMT, sName));
    String remoteClassPath = conf.get(String.format(
        YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName));
    String[] systemClasses = conf.getTrimmedStrings(String.format(
        YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, sName));

    AuxServiceConfiguration serviceConf = new AuxServiceConfiguration();
    if (className != null) {
      serviceConf.setProperty(CLASS_NAME, className);
    }
    if (systemClasses != null) {
      serviceConf.setProperty(SYSTEM_CLASSES, StringUtils.join(",",
          systemClasses));
    }
    if (remoteClassPath != null) {
      AuxServiceFile.TypeEnum type;
      String lcClassPath = StringUtils.toLowerCase(remoteClassPath);
      if (lcClassPath.endsWith(".jar")) {
        type = AuxServiceFile.TypeEnum.STATIC;
      } else if (lcClassPath.endsWith(".zip") ||
          lcClassPath.endsWith(".tar.gz") || lcClassPath.endsWith(".tgz") ||
          lcClassPath.endsWith(".tar")) {
        type = AuxServiceFile.TypeEnum.ARCHIVE;
      } else {
        throw new YarnRuntimeException("Cannot unpack file from " +
            "remote-file-path:" + remoteClassPath + "for aux-service:" +
            sName + ".\n");
      }
      AuxServiceFile file = new AuxServiceFile().srcFile(remoteClassPath)
          .type(type);
      serviceConf.getFiles().add(file);
    }
    return new AuxServiceRecord().name(sName).configuration(serviceConf);
  }

  @Override
  public synchronized void serviceInit(Configuration conf) throws Exception {
    boolean recoveryEnabled = conf.getBoolean(
        YarnConfiguration.NM_RECOVERY_ENABLED,
        YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
    if (recoveryEnabled) {
      stateStoreRoot = new Path(conf.get(YarnConfiguration.NM_RECOVERY_DIR),
          STATE_STORE_ROOT_NAME);
      stateStoreFs = FileSystem.getLocal(conf);
    }
    manifestEnabled = conf.getBoolean(
        YarnConfiguration.NM_AUX_SERVICES_MANIFEST_ENABLED,
        YarnConfiguration.DEFAULT_NM_AUX_SERVICES_MANIFEST_ENABLED);
    if (!manifestEnabled) {
      Collection<String> auxNames = conf.getStringCollection(
          YarnConfiguration.NM_AUX_SERVICES);
      for (final String sName : auxNames) {
        AuxServiceRecord service = createServiceRecordFromConfiguration(sName,
            conf);
        maybeRemoveAuxService(sName);
        AuxiliaryService s = initAuxService(service, conf, true);
        addService(sName, s, service);
      }
    } else {
      String manifestStr = conf.get(YarnConfiguration.NM_AUX_SERVICES_MANIFEST);
      if (manifestStr != null) {
        manifest = new Path(manifestStr);
        manifestFS = FileSystem.get(new URI(manifestStr), conf);
        loadManifest(conf, false);
        manifestReloadInterval = conf.getLong(
            YarnConfiguration.NM_AUX_SERVICES_MANIFEST_RELOAD_MS,
            YarnConfiguration.DEFAULT_NM_AUX_SERVICES_MANIFEST_RELOAD_MS);
        manifestReloadTask = new ManifestReloadTask();
      } else {
        LOG.info("Auxiliary services manifest is enabled, but no manifest " +
            "file is specified in the configuration.");
      }
    }

    super.serviceInit(conf);
  }

  private void startAuxService(String name, AuxiliaryService service,
      AuxServiceRecord serviceRecord) {
    service.start();
    service.registerServiceListener(this);
    ByteBuffer meta = service.getMetaData();
    if (meta != null) {
      serviceMetaData.put(name, meta);
    }
    serviceRecord.setLaunchTime(new Date());
  }

  private void stopAuxService(Service service) {
    if (service.getServiceState() == Service.STATE.STARTED) {
      service.unregisterServiceListener(this);
      service.stop();
    }
  }

  @Override
  public synchronized void serviceStart() throws Exception {
    // TODO fork(?) services running as configured user
    //      monitor for health, shutdown/restart(?) if any should die
    for (Map.Entry<String, AuxiliaryService> entry : serviceMap.entrySet()) {
      AuxiliaryService service = entry.getValue();
      String name = entry.getKey();
      startAuxService(name, service, serviceRecordMap.get(name));
    }
    if (manifestEnabled && manifest != null && manifestReloadInterval > 0) {
      LOG.info("Scheduling reloading auxiliary services manifest file at " +
          "interval " + manifestReloadInterval + " ms");
      manifestReloadTimer = new Timer("AuxServicesManifestReload-Timer",
          true);
      manifestReloadTimer.schedule(manifestReloadTask,
          manifestReloadInterval, manifestReloadInterval);
    }
    super.serviceStart();
  }

  @Override
  public synchronized void serviceStop() throws Exception {
    try {
      for (Service service : serviceMap.values()) {
        stopAuxService(service);
      }
      serviceMap.clear();
      serviceRecordMap.clear();
      serviceMetaData.clear();
      if (manifestFS != null) {
        manifestFS.close();
      }
      if (manifestReloadTimer != null) {
        manifestReloadTimer.cancel();
      }
    } finally {
      super.serviceStop();
    }
  }

  @Override
  public void stateChanged(Service service) {
    // services changing state is expected on reload
    LOG.info("Service " + service.getName() + " changed state: " +
        service.getServiceState());
  }

  @Override
  public void handle(AuxServicesEvent event) {
    LOG.info("Got event " + event.getType() + " for appId "
        + event.getApplicationID());
    switch (event.getType()) {
      case APPLICATION_INIT:
        LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
        AuxiliaryService service = null;
        try {
          service = serviceMap.get(event.getServiceID());
          service
              .initializeApplication(new ApplicationInitializationContext(event
                  .getUser(), event.getApplicationID(), event.getServiceData()));
        } catch (Throwable th) {
          logWarningWhenAuxServiceThrowExceptions(service,
              AuxServicesEventType.APPLICATION_INIT, th);
        }
        break;
      case APPLICATION_STOP:
        for (AuxiliaryService serv : serviceMap.values()) {
          try {
            serv.stopApplication(new ApplicationTerminationContext(event
                .getApplicationID()));
          } catch (Throwable th) {
            logWarningWhenAuxServiceThrowExceptions(serv,
                AuxServicesEventType.APPLICATION_STOP, th);
          }
        }
        break;
      case CONTAINER_INIT:
        for (AuxiliaryService serv : serviceMap.values()) {
          try {
            serv.initializeContainer(new ContainerInitializationContext(
                event.getContainer().getUser(),
                event.getContainer().getContainerId(),
                event.getContainer().getResource(), event.getContainer()
                .getContainerTokenIdentifier().getContainerType()));
          } catch (Throwable th) {
            logWarningWhenAuxServiceThrowExceptions(serv,
                AuxServicesEventType.CONTAINER_INIT, th);
          }
        }
        break;
      case CONTAINER_STOP:
        for (AuxiliaryService serv : serviceMap.values()) {
          try {
            serv.stopContainer(new ContainerTerminationContext(
                event.getUser(), event.getContainer().getContainerId(),
                event.getContainer().getResource(), event.getContainer()
                .getContainerTokenIdentifier().getContainerType()));
          } catch (Throwable th) {
            logWarningWhenAuxServiceThrowExceptions(serv,
                AuxServicesEventType.CONTAINER_STOP, th);
          }
        }
        break;
      default:
        throw new RuntimeException("Unknown type: " + event.getType());
    }
  }

  private boolean validateAuxServiceName(String name) {
    if (name == null || name.trim().isEmpty()) {
      return false;
    }
    return p.matcher(name).matches();
  }

  private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service,
      AuxServicesEventType eventType, Throwable th) {
    LOG.warn((null == service ? "The auxService is null"
        : "The auxService name is " + service.getName())
        + " and it got an error at event: " + eventType, th);
  }

  FileContext getLocalFileContext(Configuration conf) {
    try {
      return FileContext.getLocalFSFileContext(conf);
    } catch (IOException e) {
      throw new YarnRuntimeException("Failed to access local fs");
    }
  }

  FileContext getRemoteFileContext(final URI path, Configuration conf) {
    try {
      return FileContext.getFileContext(path, conf);
    } catch (IOException e) {
      throw new YarnRuntimeException("Failed to access remote fs");
    }
  }

  private UserGroupInformation getRemoteUgi() {
    UserGroupInformation remoteUgi;
    try {
      remoteUgi = UserGroupInformation.getCurrentUser();
    } catch (IOException e) {
      String msg = "Cannot obtain the user-name. Got exception: "
          + StringUtils.stringifyException(e);
      LOG.warn(msg);
      throw new YarnRuntimeException(msg);
    }
    return remoteUgi;
  }

  protected static AuxServiceRecord newAuxService(String name, String
      className) {
    AuxServiceConfiguration serviceConf = new AuxServiceConfiguration();
    serviceConf.setProperty(CLASS_NAME, className);
    return new AuxServiceRecord().name(name).configuration(serviceConf);
  }

  protected static void setClasspath(AuxServiceRecord service, String
      classpath) {
    service.getConfiguration().getFiles().add(new AuxServiceFile()
        .srcFile(classpath).type(AuxServiceFile.TypeEnum.STATIC));
  }

  protected static void setSystemClasses(AuxServiceRecord service, String
      systemClasses) {
    service.getConfiguration().setProperty(SYSTEM_CLASSES, systemClasses);
  }

  /**
   * Class which is used by the {@link Timer} class to periodically execute the
   * manifest reload.
   */
  private final class ManifestReloadTask extends TimerTask {
    @Override
    public void run() {
      try {
        reloadManifest();
      } catch (Throwable t) {
        // Prevent uncaught exceptions from killing this thread
        LOG.warn("Error while reloading manifest: ", t);
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AuxServicesEvent 源码

hadoop AuxServicesEventType 源码

hadoop AuxiliaryServiceWithCustomClassLoader 源码

hadoop ContainerManager 源码

hadoop ContainerManagerImpl 源码

0  赞