hadoop JobHistory 源码

  • 2022-10-20
  • 浏览 (181)

haddop JobHistory 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.v2.hs;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Loads and manages the Job history cache.
 */
public class JobHistory extends AbstractService implements HistoryContext {
  private static final Logger LOG = LoggerFactory.getLogger(JobHistory.class);

  public static final Pattern CONF_FILENAME_REGEX = Pattern.compile("("
      + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
  public static final String OLD_SUFFIX = ".old";

  // Time interval for the move thread.
  private long moveThreadInterval;

  private Configuration conf;

  private ScheduledThreadPoolExecutor scheduledExecutor = null;

  private HistoryStorage storage = null;
  private HistoryFileManager hsManager = null;
  ScheduledFuture<?> futureHistoryCleaner = null;
  
  //History job cleaner interval
  private long cleanerInterval;
  
  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    LOG.info("JobHistory Init");
    this.conf = conf;
    this.appID = ApplicationId.newInstance(0, 0);
    this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf)
        .newRecordInstance(ApplicationAttemptId.class);

    moveThreadInterval = conf.getLong(
        JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);

    hsManager = createHistoryFileManager();
    hsManager.init(conf);
    try {
      hsManager.initExisting();
    } catch (IOException e) {
      throw new YarnRuntimeException("Failed to initialize existing directories", e);
    }

    storage = createHistoryStorage();
    
    if (storage instanceof Service) {
      ((Service) storage).init(conf);
    }
    storage.setHistoryFileManager(hsManager);

    super.serviceInit(conf);
  }

  protected HistoryStorage createHistoryStorage() {
    return ReflectionUtils.newInstance(conf.getClass(
        JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
        HistoryStorage.class), conf);
  }
  
  protected HistoryFileManager createHistoryFileManager() {
    return new HistoryFileManager();
  }

  @Override
  protected void serviceStart() throws Exception {
    hsManager.start();
    if (storage instanceof Service) {
      ((Service) storage).start();
    }

    scheduledExecutor = new HadoopScheduledThreadPoolExecutor(2,
        new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
            .build());

    scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(),
        moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);

    // Start historyCleaner
    scheduleHistoryCleaner();
    super.serviceStart();
  }

  protected int getInitDelaySecs() {
    return 30;
  }

  @Override
  protected void serviceStop() throws Exception {
    LOG.info("Stopping JobHistory");
    if (scheduledExecutor != null) {
      LOG.info("Stopping History Cleaner/Move To Done");
      scheduledExecutor.shutdown();
      int retryCnt = 50;
      try {
        while (!scheduledExecutor.awaitTermination(20,
            TimeUnit.MILLISECONDS)) {
          if (--retryCnt == 0) {
            scheduledExecutor.shutdownNow();
            break;
          }
        }
      } catch (InterruptedException iex) {
        LOG.warn("HistoryCleanerService/move to done shutdown may not have " +
            "succeeded, Forcing a shutdown", iex);
        if (!scheduledExecutor.isShutdown()) {
          scheduledExecutor.shutdownNow();
        }
      }
      scheduledExecutor = null;
    }
    // Stop the other services.
    if (storage != null && storage instanceof Service) {
      ((Service) storage).stop();
    }
    if (hsManager != null) {
      hsManager.stop();
    }
    super.serviceStop();
  }

  public JobHistory() {
    super(JobHistory.class.getName());
  }

  @Override
  public String getApplicationName() {
    return "Job History Server";
  }

  private class MoveIntermediateToDoneRunnable implements Runnable {
    @Override
    public void run() {
      try {
        LOG.info("Starting scan to move intermediate done files");
        hsManager.scanIntermediateDirectory();
      } catch (IOException e) {
        LOG.error("Error while scanning intermediate done dir ", e);
      }
    }
  }
  
  private class HistoryCleaner implements Runnable {
    public void run() {
      LOG.info("History Cleaner started");
      try {
        hsManager.clean();
      } catch (IOException e) {
        LOG.warn("Error trying to clean up ", e);
      }
      LOG.info("History Cleaner complete");
    }
  }

  /**
   * Helper method for test cases.
   */
  HistoryFileInfo getJobFileInfo(JobId jobId) throws IOException {
    return hsManager.getFileInfo(jobId);
  }

  @Override
  public Job getJob(JobId jobId) {
    return storage.getFullJob(jobId);
  }

  @Override
  public Map<JobId, Job> getAllJobs(ApplicationId appID) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Called getAllJobs(AppId): " + appID);
    }
    // currently there is 1 to 1 mapping between app and job id
    org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID);
    Map<JobId, Job> jobs = new HashMap<JobId, Job>();
    JobId jobID = TypeConverter.toYarn(oldJobID);
    jobs.put(jobID, getJob(jobID));
    return jobs;
  }

  @Override
  public Map<JobId, Job> getAllJobs() {
    return storage.getAllPartialJobs();
  }

  public void refreshLoadedJobCache() {
    if (getServiceState() == STATE.STARTED) {
      if (storage instanceof CachedHistoryStorage) {
        ((CachedHistoryStorage) storage).refreshLoadedJobCache();
      } else {
        throw new UnsupportedOperationException(storage.getClass().getName()
            + " is expected to be an instance of "
            + CachedHistoryStorage.class.getName());
      }
    } else {
      LOG.warn("Failed to execute refreshLoadedJobCache: JobHistory service is not started");
    }
  }

  @VisibleForTesting
  HistoryStorage getHistoryStorage() {
    return storage;
  }
  
  /**
   * Look for a set of partial jobs.
   * 
   * @param offset
   *          the offset into the list of jobs.
   * @param count
   *          the maximum number of jobs to return.
   * @param user
   *          only return jobs for the given user.
   * @param queue
   *          only return jobs for in the given queue.
   * @param sBegin
   *          only return Jobs that started on or after the given time.
   * @param sEnd
   *          only return Jobs that started on or before the given time.
   * @param fBegin
   *          only return Jobs that ended on or after the given time.
   * @param fEnd
   *          only return Jobs that ended on or before the given time.
   * @param jobState
   *          only return jobs that are in the give job state.
   * @return The list of filtered jobs.
   */
  @Override
  public JobsInfo getPartialJobs(Long offset, Long count, String user,
      String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
      JobState jobState) {
    return storage.getPartialJobs(offset, count, user, queue, sBegin, sEnd,
        fBegin, fEnd, jobState);
  }

  public void refreshJobRetentionSettings() {
    if (getServiceState() == STATE.STARTED) {
      conf = createConf();
      long maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
          JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
      hsManager.setMaxHistoryAge(maxHistoryAge);
      if (futureHistoryCleaner != null) {
        futureHistoryCleaner.cancel(false);
      }
      futureHistoryCleaner = null;
      scheduleHistoryCleaner();
    } else {
      LOG.warn("Failed to execute refreshJobRetentionSettings : Job History service is not started");
    }
  }

  private void scheduleHistoryCleaner() {
    boolean startCleanerService = conf.getBoolean(
        JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
    if (startCleanerService) {
      cleanerInterval = conf.getLong(
          JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
          JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);

      futureHistoryCleaner = scheduledExecutor.scheduleAtFixedRate(
          new HistoryCleaner(), getInitDelaySecs() * 1000l, cleanerInterval,
          TimeUnit.MILLISECONDS);
    }
  }

  protected Configuration createConf() {
    return new Configuration();
  }
  
  public long getCleanerInterval() {
    return cleanerInterval;
  }
  // TODO AppContext - Not Required
  private ApplicationAttemptId appAttemptID;

  @Override
  public ApplicationAttemptId getApplicationAttemptId() {
    // TODO fixme - bogus appAttemptID for now
    return appAttemptID;
  }

  // TODO AppContext - Not Required
  private ApplicationId appID;

  @Override
  public ApplicationId getApplicationID() {
    // TODO fixme - bogus appID for now
    return appID;
  }

  // TODO AppContext - Not Required
  @Override
  public EventHandler<Event> getEventHandler() {
    // TODO Auto-generated method stub
    return null;
  }

  // TODO AppContext - Not Required
  private String userName;

  @Override
  public CharSequence getUser() {
    if (userName != null) {
      userName = conf.get(MRJobConfig.USER_NAME, "history-user");
    }
    return userName;
  }

  // TODO AppContext - Not Required
  @Override
  public Clock getClock() {
    return null;
  }

  // TODO AppContext - Not Required
  @Override
  public ClusterInfo getClusterInfo() {
    return null;
  }

  // TODO AppContext - Not Required
  @Override
  public Set<String> getBlacklistedNodes() {
    // Not Implemented
    return null;
  }
  @Override
  public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
    // Not implemented.
    return null;
  }

  @Override
  public boolean isLastAMRetry() {
    // bogus - Not Required
    return false;
  }

  @Override
  public boolean hasSuccessfullyUnregistered() {
    // bogus - Not Required
    return true;
  }

  @Override
  public String getNMHostname() {
    // bogus - Not Required
    return null;
  }

  @Override
  public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
    return null;
  }

  @Override
  public String getHistoryUrl() {
    return null;
  }

  @Override
  public void setHistoryUrl(String historyUrl) {
    return;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop CachedHistoryStorage 源码

hadoop CompletedJob 源码

hadoop CompletedTask 源码

hadoop CompletedTaskAttempt 源码

hadoop HSAuditLogger 源码

hadoop HSProxies 源码

hadoop HistoryClientService 源码

hadoop HistoryContext 源码

hadoop HistoryFileManager 源码

hadoop HistoryServerFileSystemStateStoreService 源码

0  赞