hadoop ServiceMaster 源码

  • 2022-10-20
  • 浏览 (210)

haddop ServiceMaster 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.service;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;

import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
import org.apache.hadoop.yarn.service.monitor.ServiceMonitor;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.Map;

import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants.KEYTAB_LOCATION;

public class ServiceMaster extends CompositeService {

  private static final Logger LOG =
      LoggerFactory.getLogger(ServiceMaster.class);

  public static final String YARNFILE_OPTION = "yarnfile";
  public static final String SERVICE_NAME_OPTION = "service_name";
  public static final String KEYTAB_OPTION = "keytab";
  public static final String PRINCIPAL_NAME_OPTION = "principal_name";

  private String serviceDefPath;
  private String serviceName;
  private String serviceKeytab;
  private String servicePrincipalName;
  protected ServiceContext context;

  public ServiceMaster(String name) {
    super(name);
  }

  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    printSystemEnv();
    context = new ServiceContext();
    Path appDir = getAppDir();
    context.serviceHdfsDir = appDir.toString();
    context.tokens = recordTokensForContainers();
    Credentials credentials = null;
    if (UserGroupInformation.isSecurityEnabled()) {
      credentials = UserGroupInformation.getCurrentUser().getCredentials();
      doSecureLogin();
    }
    SliderFileSystem fs = new SliderFileSystem(conf);
    fs.setAppDir(appDir);
    context.fs = fs;
    loadApplicationJson(context, fs);
    if (UserGroupInformation.isSecurityEnabled()) {
      // add back the credentials
      if (credentials != null) {
        UserGroupInformation.getCurrentUser().addCredentials(credentials);
      }
      removeHdfsDelegationToken(UserGroupInformation.getLoginUser());
    }

    // Take yarn config from YarnFile and merge them into YarnConfiguration
    for (Map.Entry<String, String> entry : context.service
        .getConfiguration().getProperties().entrySet()) {
      conf.set(entry.getKey(), entry.getValue());
    }

    ContainerId amContainerId = getAMContainerId();

    ApplicationAttemptId attemptId = amContainerId.getApplicationAttemptId();
    LOG.info("Service AppAttemptId: " + attemptId);
    context.attemptId = attemptId;

    // configure AM to wait forever for RM
    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, -1);
    conf.unset(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS);

    DefaultMetricsSystem.initialize("ServiceAppMaster");

    context.secretManager = new ClientToAMTokenSecretManager(attemptId, null);
    ClientAMService clientAMService = createClientAMService();
    context.clientAMService = clientAMService;
    addService(clientAMService);

    ServiceScheduler scheduler = createServiceScheduler(context);
    addService(scheduler);
    context.scheduler = scheduler;

    ServiceMonitor monitor = new ServiceMonitor("Service Monitor", context);
    addService(monitor);

    super.serviceInit(conf);
  }

  @VisibleForTesting
  protected ClientAMService createClientAMService() {
    return new ClientAMService(context);
  }

  // Record the tokens and use them for launching containers.
  // e.g. localization requires the hdfs delegation tokens
  @VisibleForTesting
  protected ByteBuffer recordTokensForContainers() throws IOException {
    Credentials copy = new Credentials(UserGroupInformation.getCurrentUser()
        .getCredentials());
    // Now remove the AM->RM token so that task containers cannot access it.
    Iterator<Token<?>> iter = copy.getAllTokens().iterator();
    while (iter.hasNext()) {
      Token<?> token = iter.next();
      LOG.info(token.toString());
      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
        iter.remove();
      }
    }
    DataOutputBuffer dob = new DataOutputBuffer();
    try {
      copy.writeTokenStorageToStream(dob);
    } finally {
      dob.close();
    }
    return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
  }

  // 1. First try to use user specified keytabs
  // 2. If not specified, then try to use pre-installed keytab at localhost
  // 3. strip off hdfs delegation tokens to ensure use keytab to talk to hdfs
  private void doSecureLogin()
      throws IOException, URISyntaxException {
    // read the localized keytab specified by user
    File keytab = new File(String.format(KEYTAB_LOCATION, getServiceName()));
    if (!keytab.exists()) {
      LOG.info("No keytab localized at " + keytab);
      // Check if there exists a pre-installed keytab at host
      String preInstalledKeytab = context.service == null ? this.serviceKeytab
          : context.service.getKerberosPrincipal().getKeytab();
      if (!StringUtils.isEmpty(preInstalledKeytab)) {
        URI uri = new URI(preInstalledKeytab);
        if (uri.getScheme().equals("file")) {
          keytab = new File(uri);
          LOG.info("Using pre-installed keytab from localhost: " +
              preInstalledKeytab);
        }
      }
    }
    if (!keytab.exists()) {
      LOG.info("No keytab exists: " + keytab);
      return;
    }
    String principal = context.service == null ? this.servicePrincipalName
        : context.service.getKerberosPrincipal().getPrincipalName();
    if (StringUtils.isEmpty((principal))) {
      principal = UserGroupInformation.getLoginUser().getShortUserName();
      LOG.info("No principal name specified.  Will use AM " +
          "login identity {} to attempt keytab-based login", principal);
    }

    LOG.info("User before logged in is: " + UserGroupInformation
        .getCurrentUser());
    String principalName = SecurityUtil.getServerPrincipal(principal,
        ServiceUtils.getLocalHostName(getConfig()));
    UserGroupInformation.loginUserFromKeytab(principalName,
        keytab.getAbsolutePath());
    LOG.info("User after logged in is: " + UserGroupInformation
        .getCurrentUser());
    context.principal = principalName;
    context.keytab = keytab.getAbsolutePath();
  }

  // Remove HDFS delegation token from login user and ensure AM to use keytab
  // to talk to hdfs
  private static void removeHdfsDelegationToken(UserGroupInformation user) {
    if (!user.isFromKeytab()) {
      LOG.error("AM is not holding on a keytab in a secure deployment:" +
          " service will fail when tokens expire");
    }
    Credentials credentials = user.getCredentials();
    Iterator<Token<? extends TokenIdentifier>> iter =
        credentials.getAllTokens().iterator();
    while (iter.hasNext()) {
      Token<? extends TokenIdentifier> token = iter.next();
      if (token.getKind().equals(
          DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) {
        LOG.info("Remove HDFS delegation token {}.", token);
        iter.remove();
      }
    }
  }

  protected ContainerId getAMContainerId() throws BadClusterStateException {
    return ContainerId.fromString(ServiceUtils.mandatoryEnvVariable(
        ApplicationConstants.Environment.CONTAINER_ID.name()));
  }

  protected Path getAppDir() {
    return new Path(serviceDefPath).getParent();
  }

  protected String getServiceName() {
    return serviceName;
  }

  protected ServiceScheduler createServiceScheduler(ServiceContext context)
      throws IOException, YarnException {
    return new ServiceScheduler(context);
  }

  protected void loadApplicationJson(ServiceContext context,
      SliderFileSystem fs) throws IOException {
    context.service = ServiceApiUtil
        .loadServiceFrom(fs, new Path(serviceDefPath));
    context.service.setState(ServiceState.ACCEPTED);
    LOG.info(context.service.toString());
  }

  @Override
  protected void serviceStart() throws Exception {
    LOG.info("Starting service as user " + UserGroupInformation
        .getCurrentUser());
    UserGroupInformation.getLoginUser().doAs(
        (PrivilegedExceptionAction<Void>) () -> {
          super.serviceStart();
          return null;
        }
    );
  }
  @Override
  protected void serviceStop() throws Exception {
    LOG.info("Stopping app master");
    super.serviceStop();
  }

  // This method should be called whenever there is an increment or decrement
  // of a READY state component of a service
  public static synchronized void checkAndUpdateServiceState(
      ServiceScheduler scheduler) {
    ServiceState curState = scheduler.getApp().getState();
    // Check the state of all components
    boolean isStable = true;
    for (org.apache.hadoop.yarn.service.api.records.Component comp : scheduler
        .getApp().getComponents()) {
      if (comp.getState() !=
          org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE) {
        isStable = false;
        break;
      }
    }
    if (isStable) {
      scheduler.getApp().setState(ServiceState.STABLE);
    } else {
      // mark new state as started only if current state is stable, otherwise
      // leave it as is
      if (curState == ServiceState.STABLE) {
        scheduler.getApp().setState(ServiceState.STARTED);
      }
    }
    if (curState != scheduler.getApp().getState()) {
      LOG.info("Service state changed from {} -> {}", curState,
          scheduler.getApp().getState());
    }
    populateYarnSysFS(scheduler);
  }

  private static void populateYarnSysFS(ServiceScheduler scheduler) {
    Service service = scheduler.getApp();
    scheduler.syncSysFs(service);
  }

  private void printSystemEnv() {
    for (Map.Entry<String, String> envs : System.getenv().entrySet()) {
      LOG.info("{} = {}", envs.getKey(), envs.getValue());
    }
  }

  public static void main(String[] args) throws Exception {
    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
    org.apache.hadoop.util.StringUtils
        .startupShutdownMessage(ServiceMaster.class, args, LOG);
    try {
      ServiceMaster serviceMaster = new ServiceMaster("Service Master");
      ShutdownHookManager.get()
          .addShutdownHook(new CompositeServiceShutdownHook(serviceMaster), 30);
      YarnConfiguration conf = new YarnConfiguration();
      Options opts = new Options();
      opts.addOption(YARNFILE_OPTION, true, "HDFS path to JSON service " +
          "specification");
      opts.getOption(YARNFILE_OPTION).setRequired(true);
      opts.addOption(SERVICE_NAME_OPTION, true, "Service name");
      opts.getOption(SERVICE_NAME_OPTION).setRequired(true);
      opts.addOption(KEYTAB_OPTION, true, "Service AM keytab");
      opts.addOption(PRINCIPAL_NAME_OPTION, true,
          "Service AM keytab principal");
      GenericOptionsParser parser = new GenericOptionsParser(conf, opts, args);
      CommandLine cmdLine = parser.getCommandLine();
      serviceMaster.serviceDefPath = cmdLine.getOptionValue(YARNFILE_OPTION);
      serviceMaster.serviceName = cmdLine.getOptionValue(SERVICE_NAME_OPTION);
      serviceMaster.serviceKeytab = cmdLine.getOptionValue(KEYTAB_OPTION);
      serviceMaster.servicePrincipalName = cmdLine
          .getOptionValue(PRINCIPAL_NAME_OPTION);
      serviceMaster.init(conf);
      serviceMaster.start();
    } catch (Throwable t) {
      LOG.error("Error starting service master", t);
      ExitUtil.terminate(1, "Error starting service master");
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ClientAMPolicyProvider 源码

hadoop ClientAMProtocol 源码

hadoop ClientAMSecurityInfo 源码

hadoop ClientAMService 源码

hadoop ContainerFailureTracker 源码

hadoop ServiceContext 源码

hadoop ServiceEvent 源码

hadoop ServiceEventType 源码

hadoop ServiceManager 源码

hadoop ServiceMetrics 源码

0  赞