hadoop ServiceManager 源码

  • 2022-10-20
  • 浏览 (177)

haddop ServiceManager 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.service;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.state.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;

/**
 * Manages the state of Service.
 */
public class ServiceManager implements EventHandler<ServiceEvent> {
  private static final Logger LOG = LoggerFactory.getLogger(
      ServiceManager.class);

  private final Service serviceSpec;
  private final ServiceContext context;
  private final ServiceScheduler scheduler;
  private final ReentrantReadWriteLock.ReadLock readLock;
  private final ReentrantReadWriteLock.WriteLock writeLock;

  private final StateMachine<State, ServiceEventType, ServiceEvent>
      stateMachine;
  private final UpgradeComponentsFinder componentsFinder;

  private final AsyncDispatcher dispatcher;
  private final SliderFileSystem fs;

  private String upgradeVersion;
  private List<org.apache.hadoop.yarn.service.api.records
      .Component> componentsToUpgrade;
  private List<String> compsAffectedByUpgrade = new ArrayList<>();
  private String cancelledVersion;

  private static final StateMachineFactory<ServiceManager, State,
      ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY =
      new StateMachineFactory<ServiceManager, State,
          ServiceEventType, ServiceEvent>(State.STABLE)

          .addTransition(State.STABLE, EnumSet.of(State.STABLE,
              State.UPGRADING), ServiceEventType.UPGRADE,
              new StartUpgradeTransition())

          .addTransition(State.STABLE, EnumSet.of(State.STABLE),
              ServiceEventType.CHECK_STABLE, new CheckStableTransition())

          .addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
              State.UPGRADING), ServiceEventType.START,
              new StartFromUpgradeTransition())

          .addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
              State.UPGRADING), ServiceEventType.CHECK_STABLE,
              new CheckStableTransition())

          .addTransition(State.UPGRADING, State.UPGRADING,
              ServiceEventType.CANCEL_UPGRADE, new CancelUpgradeTransition())
          .installTopology();

  public ServiceManager(ServiceContext context) {
    Preconditions.checkNotNull(context);
    this.context = context;
    serviceSpec = context.service;
    scheduler = context.scheduler;
    stateMachine = STATE_MACHINE_FACTORY.make(this);
    dispatcher = scheduler.getDispatcher();

    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    readLock = lock.readLock();
    writeLock = lock.writeLock();
    fs = context.fs;
    componentsFinder = new UpgradeComponentsFinder
        .DefaultUpgradeComponentsFinder();
  }

  @Override
  public void handle(ServiceEvent event) {
    writeLock.lock();
    try {
      State oldState = getState();
      try {
        stateMachine.doTransition(event.getType(), event);
      } catch (InvalidStateTransitionException e) {
        LOG.error(MessageFormat.format(
            "[SERVICE]: Invalid event {1} at {2}.", event.getType(),
            oldState), e);
      }
      if (oldState != getState()) {
        LOG.info("[SERVICE] Transitioned from {} to {} on {} event.",
            oldState, getState(), event.getType());
      }
    } finally {
      writeLock.unlock();
    }
  }

  private State getState() {
    this.readLock.lock();
    try {
      return this.stateMachine.getCurrentState();
    } finally {
      this.readLock.unlock();
    }
  }

  private static class StartUpgradeTransition implements
      MultipleArcTransition<ServiceManager, ServiceEvent, State> {

    @Override
    public State transition(ServiceManager serviceManager,
        ServiceEvent event) {
      serviceManager.upgradeVersion = event.getVersion();
      serviceManager.componentsToUpgrade = event.getCompsToUpgrade();
      event.getCompsToUpgrade().forEach(comp ->
          serviceManager.compsAffectedByUpgrade.add(comp.getName()));
      try {
        if (event.isExpressUpgrade()) {
          serviceManager.dispatchNeedUpgradeEvents(false);
          serviceManager.upgradeNextCompIfAny(false);
        } else {
          serviceManager.dispatchNeedUpgradeEvents(false);
        }

        if (event.isExpressUpgrade()) {
          serviceManager.setServiceState(ServiceState.EXPRESS_UPGRADING);
        } else if (event.isAutoFinalize()) {
          serviceManager.setServiceState(ServiceState.UPGRADING_AUTO_FINALIZE);
        } else {
          serviceManager.setServiceState(ServiceState.UPGRADING);
        }
        ServiceApiUtil.checkServiceDependencySatisified(serviceManager
            .getServiceSpec());
        return State.UPGRADING;
      } catch (Throwable e) {
        LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(),
            e);
        return State.STABLE;
      }
    }
  }

  private static class CheckStableTransition implements
      MultipleArcTransition<ServiceManager, ServiceEvent, State> {

    @Override
    public State transition(ServiceManager serviceManager,
        ServiceEvent event) {
      //trigger check of service state
      ServiceState currState = serviceManager.serviceSpec.getState();
      if (currState.equals(ServiceState.STABLE)) {
        return State.STABLE;
      }
      if (currState.equals(ServiceState.EXPRESS_UPGRADING) ||
          currState.equals(ServiceState.CANCEL_UPGRADING)) {

        if (!serviceManager.componentsToUpgrade.isEmpty()) {
          org.apache.hadoop.yarn.service.api.records.Component compSpec =
              serviceManager.componentsToUpgrade.get(0);
          Component component = serviceManager.scheduler.getAllComponents()
              .get(compSpec.getName());

          if (!component.isUpgrading()) {
            serviceManager.componentsToUpgrade.remove(0);
            serviceManager.upgradeNextCompIfAny(
                currState.equals(ServiceState.CANCEL_UPGRADING));
          }
        }
      }

      if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
          ((currState.equals(ServiceState.EXPRESS_UPGRADING) ||
              currState.equals(ServiceState.CANCEL_UPGRADING)) &&
              serviceManager.componentsToUpgrade.isEmpty())) {

        ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
        if (targetState.equals(ServiceState.STABLE)) {
          if (serviceManager.finalizeUpgrade(
              currState.equals(ServiceState.CANCEL_UPGRADING))) {
            return State.STABLE;
          }
        }
      }
      return State.UPGRADING;
    }
  }

  private static class StartFromUpgradeTransition implements
      MultipleArcTransition<ServiceManager, ServiceEvent, State> {

    @Override
    public State transition(ServiceManager serviceManager, ServiceEvent event) {
      ServiceState currState = serviceManager.serviceSpec.getState();

      ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
      if (targetState.equals(ServiceState.STABLE)) {
        if (serviceManager.finalizeUpgrade(
            currState.equals(ServiceState.CANCEL_UPGRADING))) {
          return State.STABLE;
        }
      }
      return State.UPGRADING;
    }
  }

  private static class CancelUpgradeTransition implements
      SingleArcTransition<ServiceManager, ServiceEvent> {

    @Override
    public void transition(ServiceManager serviceManager,
        ServiceEvent event) {
      if (!serviceManager.getState().equals(State.UPGRADING)) {
        LOG.info("[SERVICE]: Cannot cancel the upgrade in {} state",
            serviceManager.getState());
        return;
      }
      try {
        Service targetSpec = ServiceApiUtil.loadService(
            serviceManager.context.fs, serviceManager.getName());

        Service sourceSpec = ServiceApiUtil.loadServiceUpgrade(
            serviceManager.context.fs, serviceManager.getName(),
            serviceManager.upgradeVersion);
        serviceManager.cancelledVersion = serviceManager.upgradeVersion;
        LOG.info("[SERVICE] cancel version {}",
            serviceManager.cancelledVersion);
        serviceManager.upgradeVersion = serviceManager.serviceSpec.getVersion();
        serviceManager.componentsToUpgrade = serviceManager
            .resolveCompsToUpgrade(sourceSpec, targetSpec);

        serviceManager.compsAffectedByUpgrade.clear();
        serviceManager.componentsToUpgrade.forEach(comp ->
            serviceManager.compsAffectedByUpgrade.add(comp.getName()));

        serviceManager.dispatchNeedUpgradeEvents(true);
        serviceManager.upgradeNextCompIfAny(true);
        serviceManager.setServiceState(ServiceState.CANCEL_UPGRADING);
      } catch (Throwable e) {
        LOG.error("[SERVICE]: Cancellation of upgrade failed", e);
      }
    }
  }

  private void upgradeNextCompIfAny(boolean cancelUpgrade) {
    if (!componentsToUpgrade.isEmpty()) {
      org.apache.hadoop.yarn.service.api.records.Component component =
          componentsToUpgrade.get(0);

      serviceSpec.getComponent(component.getName()).getContainers().forEach(
          container -> {
            ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
                ContainerId.fromString(container.getId()),
                !cancelUpgrade ? ComponentInstanceEventType.UPGRADE :
                    ComponentInstanceEventType.CANCEL_UPGRADE);
            LOG.info("Upgrade container {} {}", container.getId(),
                cancelUpgrade);
            dispatcher.getEventHandler().handle(upgradeEvent);
          });
    }
  }

  private void dispatchNeedUpgradeEvents(boolean cancelUpgrade) {
    if (componentsToUpgrade != null) {
      componentsToUpgrade.forEach(component -> {
        ComponentEvent needUpgradeEvent = new ComponentEvent(
            component.getName(), !cancelUpgrade ? ComponentEventType.UPGRADE :
            ComponentEventType.CANCEL_UPGRADE)
            .setTargetSpec(component)
            .setUpgradeVersion(upgradeVersion);
        LOG.info("Upgrade component {} {}", component.getName(), cancelUpgrade);
        context.scheduler.getDispatcher().getEventHandler()
            .handle(needUpgradeEvent);
      });
    }
  }

  /**
   * @return whether finalization of upgrade was successful.
   */
  private boolean finalizeUpgrade(boolean cancelUpgrade) {
    if (!cancelUpgrade) {
      try {
        // save the application id and state to
        Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
            fs, getName(), upgradeVersion);
        targetSpec.setId(serviceSpec.getId());
        targetSpec.setState(ServiceState.STABLE);
        Map<String, Component> allComps = scheduler.getAllComponents();
        targetSpec.getComponents().forEach(compSpec -> {
          Component comp = allComps.get(compSpec.getName());
          compSpec.setState(comp.getComponentSpec().getState());
        });
        jsonSerDeser.save(fs.getFileSystem(),
            ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true);
      } catch (IOException e) {
        LOG.error("Upgrade did not complete because unable to re-write the" +
            " service definition", e);
        return false;
      }
    }

    try {
      String upgradeVersionToDel = cancelUpgrade? cancelledVersion :
          upgradeVersion;
      LOG.info("[SERVICE]: delete upgrade dir version {}", upgradeVersionToDel);
      fs.deleteClusterUpgradeDir(getName(), upgradeVersionToDel);

      for (String comp : compsAffectedByUpgrade) {
        String compDirVersionToDel = cancelUpgrade? cancelledVersion :
            serviceSpec.getVersion();
        LOG.info("[SERVICE]: delete {} dir version {}",  comp,
            compDirVersionToDel);
        fs.deleteComponentDir(compDirVersionToDel, comp);
      }

      if (cancelUpgrade) {
        fs.deleteComponentsVersionDirIfEmpty(cancelledVersion);
      } else {
        fs.deleteComponentsVersionDirIfEmpty(serviceSpec.getVersion());
      }

    } catch (IOException e) {
      LOG.warn("Unable to delete upgrade definition for service {} " +
          "version {}", getName(), upgradeVersion);
    }
    setServiceState(ServiceState.STABLE);
    serviceSpec.setVersion(upgradeVersion);
    upgradeVersion = null;
    cancelledVersion = null;
    compsAffectedByUpgrade.clear();
    return true;
  }

  private static ServiceState checkIfStable(Service service) {
    // if desired == running
    for (org.apache.hadoop.yarn.service.api.records.Component comp :
        service.getComponents()) {
      if (!comp.getState().equals(
          org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE)) {
        return service.getState();
      }
    }
    return ServiceState.STABLE;
  }

  /**
   * Service state gets directly modified by ServiceMaster and Component.
   * This is a problem for upgrade and flexing. For now, invoking
   * ServiceMaster.checkAndUpdateServiceState here to make it easy to fix
   * this in future.
   */
  public void checkAndUpdateServiceState() {
    writeLock.lock();
    try {
      if (!getState().equals(State.UPGRADING)) {
        ServiceMaster.checkAndUpdateServiceState(this.scheduler);
      }
    } finally {
      writeLock.unlock();
    }
  }

  void processUpgradeRequest(String upgradeVersion,
      boolean autoFinalize, boolean expressUpgrade) throws IOException {
    Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
        context.fs, context.service.getName(), upgradeVersion);

    List<org.apache.hadoop.yarn.service.api.records.Component>
        compsNeedUpgradeList = resolveCompsToUpgrade(context.service,
        targetSpec);

    ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
        .setVersion(upgradeVersion)
        .setAutoFinalize(autoFinalize)
        .setExpressUpgrade(expressUpgrade);

    if (expressUpgrade) {
      // In case of express upgrade  components need to be upgraded in order.
      // Once the service manager gets notified that a component finished
      // upgrading, it then issues event to upgrade the next component.
      Map<String, org.apache.hadoop.yarn.service.api.records.Component>
          compsNeedUpgradeByName = new HashMap<>();
      if (compsNeedUpgradeList != null) {
        compsNeedUpgradeList.forEach(component ->
            compsNeedUpgradeByName.put(component.getName(), component));
      }
      List<String> resolvedComps = ServiceApiUtil
          .resolveCompsDependency(targetSpec);

      List<org.apache.hadoop.yarn.service.api.records.Component>
          orderedCompUpgrade = new LinkedList<>();
      resolvedComps.forEach(compName -> {
        org.apache.hadoop.yarn.service.api.records.Component component =
            compsNeedUpgradeByName.get(compName);
        if (component != null ) {
          orderedCompUpgrade.add(component);
        }
      });
      event.setCompsToUpgrade(orderedCompUpgrade);
    } else {
      event.setCompsToUpgrade(compsNeedUpgradeList);
    }
    context.scheduler.getDispatcher().getEventHandler().handle(
        event);

    if (autoFinalize && (compsNeedUpgradeList == null ||
        compsNeedUpgradeList.isEmpty())) {
      // nothing to upgrade and auto finalize is requested, trigger a
      // state check.
      context.scheduler.getDispatcher().getEventHandler().handle(
          new ServiceEvent(ServiceEventType.CHECK_STABLE));
    }
  }

  private List<org.apache.hadoop.yarn.service.api.records.Component>
      resolveCompsToUpgrade(Service sourceSpec, Service targetSpec) {

    List<org.apache.hadoop.yarn.service.api.records.Component>
        compsNeedUpgradeList = componentsFinder.
        findTargetComponentSpecs(sourceSpec, targetSpec);

    // remove all components from need upgrade list if there restart policy
    // doesn't all upgrade.
    if (compsNeedUpgradeList != null) {
      compsNeedUpgradeList.removeIf(component -> {
        org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
            restartPolicy = component.getRestartPolicy();

        final ComponentRestartPolicy restartPolicyHandler =
            Component.getRestartPolicyHandler(restartPolicy);
        // Do not allow upgrades for components which have NEVER/ON_FAILURE
        // restart policy
        if (!restartPolicyHandler.allowUpgrades()) {
          LOG.info("The component {} has a restart policy that doesnt " +
                  "allow upgrades {} ", component.getName(),
              component.getRestartPolicy().toString());
          return true;
        }

        return false;
      });
    }

    return compsNeedUpgradeList;
  }

  /**
   * Sets the state of the service in the service spec.
   * @param state service state
   */
  private void setServiceState(
      org.apache.hadoop.yarn.service.api.records.ServiceState state) {
    org.apache.hadoop.yarn.service.api.records.ServiceState curState =
        serviceSpec.getState();
    if (!curState.equals(state)) {
      serviceSpec.setState(state);
      LOG.info("[SERVICE] spec state changed from {} -> {}", curState, state);
    }
  }

  /**
   * Returns the name of the service.
   */
  public String getName() {
    return serviceSpec.getName();
  }

  /**
   * State of {@link ServiceManager}.
   */
  public enum State {
    STABLE, UPGRADING
  }

  @VisibleForTesting
  Service getServiceSpec() {
    return serviceSpec;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ClientAMPolicyProvider 源码

hadoop ClientAMProtocol 源码

hadoop ClientAMSecurityInfo 源码

hadoop ClientAMService 源码

hadoop ContainerFailureTracker 源码

hadoop ServiceContext 源码

hadoop ServiceEvent 源码

hadoop ServiceEventType 源码

hadoop ServiceMaster 源码

hadoop ServiceMetrics 源码

0  赞