hadoop ServiceMonitor 源码

  • 2022-10-20
  • 浏览 (220)

haddop ServiceMonitor 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ServiceMonitor.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.service.monitor;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.ComponentState;
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.REINITIALIZED;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.STARTED;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.FLEX;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_NOT_READY;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_READY;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.READY;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_FAILURE_WINDOW;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.DEFAULT_CONTAINER_FAILURE_WINDOW;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.DEFAULT_READINESS_CHECK_INTERVAL;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.READINESS_CHECK_INTERVAL;

public class ServiceMonitor extends AbstractService {

  private static final Logger LOG =
      LoggerFactory.getLogger(ServiceMonitor.class);

  public ScheduledExecutorService executorService;
  private  Map<ContainerId, ComponentInstance> liveInstances = null;
  private ServiceContext context;
  private Configuration conf;

  public ServiceMonitor(String name, ServiceContext context) {
    super(name);
    liveInstances = context.scheduler.getLiveInstances();
    this.context = context;
  }

  @Override
  public void serviceInit(Configuration conf) throws Exception {
    executorService = Executors.newScheduledThreadPool(1);
    this.conf = conf;
    super.serviceInit(conf);
  }

  @Override
  public void serviceStart() throws Exception {
    long readinessCheckInterval = YarnServiceConf
        .getLong(READINESS_CHECK_INTERVAL, DEFAULT_READINESS_CHECK_INTERVAL,
            context.service.getConfiguration(), conf);

    executorService
        .scheduleAtFixedRate(new ReadinessChecker(), readinessCheckInterval,
            readinessCheckInterval, TimeUnit.SECONDS);

    // Default 6 hours.
    long failureResetInterval = YarnServiceConf
        .getLong(CONTAINER_FAILURE_WINDOW, DEFAULT_CONTAINER_FAILURE_WINDOW,
            context.service.getConfiguration(), conf);

    executorService
        .scheduleAtFixedRate(new ContainerFailureReset(), failureResetInterval,
            failureResetInterval, TimeUnit.SECONDS);
  }

  @Override
  public void serviceStop() throws Exception {
    if (executorService != null) {
      executorService.shutdownNow();
    }
  }

  private class ReadinessChecker implements Runnable {

    @Override
    public void run() {

      // check if the comp instance are ready
      for (Map.Entry<ContainerId, ComponentInstance> entry : liveInstances
          .entrySet()) {
        ComponentInstance instance = entry.getValue();

        ProbeStatus status = instance.ping();
        ComponentInstanceState instanceState = instance.getState();
        if (status.isSuccess()) {
          if (instanceState == STARTED || instanceState == REINITIALIZED) {
            LOG.info("Readiness check succeeded for {}: {}", instance
                .getCompInstanceName(), status);
            // synchronously update the state.
            instance.handle(
                new ComponentInstanceEvent(entry.getKey(), BECOME_READY));
          }
        } else {
          LOG.info("Readiness check failed for {}: {}", instance
              .getCompInstanceName(), status);
          if (instance.getState() == READY) {
            instance.handle(
                new ComponentInstanceEvent(entry.getKey(), BECOME_NOT_READY));
          }
        }
      }

      for (Component component : context.scheduler.getAllComponents()
          .values()) {
        // If comp hasn't started yet and its dependencies are satisfied
        if (component.getState() == ComponentState.INIT && component
            .areDependenciesReady()) {
          LOG.info("[COMPONENT {}]: Dependencies satisfied, ramping up.",
              component.getName());
          ComponentEvent event = new ComponentEvent(component.getName(), FLEX)
              .setDesired(component.getComponentSpec().getNumberOfContainers());
          component.handle(event);
        }
      }
    }
  }

  private class ContainerFailureReset implements Runnable {
    @Override
    public void run() {
      for (Component component : context.scheduler.getAllComponents().values()) {
        component.resetCompFailureCount();
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ComponentHealthThresholdMonitor 源码

0  赞