hadoop StoragePolicySatisfyManager 源码

  • 2022-10-20
  • 浏览 (107)

haddop StoragePolicySatisfyManager 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode.sps;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.sps.ExternalStoragePolicySatisfier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;

/**
 * This manages satisfy storage policy invoked path ids and expose methods to
 * process these path ids. It maintains sps mode(EXTERNAL/NONE)
 * configured by the administrator.
 *
 * <p>
 * If the configured mode is {@link StoragePolicySatisfierMode#EXTERNAL}, then
 * it won't do anything, just maintains the sps invoked path ids. Administrator
 * requires to start external sps service explicitly, to fetch the sps invoked
 * path ids from namenode, then do necessary computations and block movement in
 * order to satisfy the storage policy. Please refer
 * {@link ExternalStoragePolicySatisfier} class to understand more about the
 * external sps service functionality.
 *
 * <p>
 * If the configured mode is {@link StoragePolicySatisfierMode#NONE}, then it
 * will disable the sps feature completely by clearing all queued up sps path's
 * hint.
 *
 * This class is instantiated by the BlockManager.
 */
public class StoragePolicySatisfyManager {
  private static final Logger LOG = LoggerFactory
      .getLogger(StoragePolicySatisfyManager.class);
  private final StoragePolicySatisfier spsService;
  private final boolean storagePolicyEnabled;
  private volatile StoragePolicySatisfierMode mode;
  private final Queue<Long> pathsToBeTraversed;
  private final int outstandingPathsLimit;
  private final Namesystem namesystem;

  public StoragePolicySatisfyManager(Configuration conf,
      Namesystem namesystem) {
    // StoragePolicySatisfier(SPS) configs
    storagePolicyEnabled = conf.getBoolean(
        DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
        DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
    String modeVal = conf.get(
        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
    outstandingPathsLimit = conf.getInt(
        DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY,
        DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
    mode = StoragePolicySatisfierMode.fromString(modeVal);
    pathsToBeTraversed = new LinkedList<Long>();
    this.namesystem = namesystem;
    // instantiate SPS service by just keeps config reference and not starting
    // any supporting threads.
    spsService = new StoragePolicySatisfier(conf);
  }

  /**
   * This function will do following logic based on the configured sps mode:
   *
   * <p>
   * If the configured mode is {@link StoragePolicySatisfierMode#EXTERNAL}, then
   * it won't do anything. Administrator requires to start external sps service
   * explicitly.
   *
   * <p>
   * If the configured mode is {@link StoragePolicySatisfierMode#NONE}, then the
   * service is disabled and won't do any action.
   */
  public void start() {
    if (!storagePolicyEnabled) {
      LOG.info("Disabling StoragePolicySatisfier service as {} set to {}.",
          DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
      return;
    }

    switch (mode) {
    case EXTERNAL:
      LOG.info("Storage policy satisfier is configured as external, "
          + "please start external sps service explicitly to satisfy policy");
      break;
    case NONE:
      LOG.info("Storage policy satisfier is disabled");
      break;
    default:
      LOG.info("Given mode: {} is invalid", mode);
      break;
    }
  }

  /**
   * This function will do following logic based on the configured sps mode:
   *
   * <p>
   * If the configured mode is {@link StoragePolicySatisfierMode#EXTERNAL}, then
   * it won't do anything. Administrator requires to stop external sps service
   * explicitly, if needed.
   *
   * <p>
   * If the configured mode is {@link StoragePolicySatisfierMode#NONE}, then the
   * service is disabled and won't do any action.
   */
  public void stop() {
    if (!storagePolicyEnabled) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Storage policy is not enabled, ignoring");
      }
      return;
    }

    switch (mode) {
    case EXTERNAL:
      removeAllPathIds();
      if (LOG.isDebugEnabled()) {
        LOG.debug(
            "Storage policy satisfier service is running outside namenode"
            + ", ignoring");
      }
      break;
    case NONE:
      if (LOG.isDebugEnabled()) {
        LOG.debug("Storage policy satisfier is not enabled, ignoring");
      }
      break;
    default:
      if (LOG.isDebugEnabled()) {
        LOG.debug("Invalid mode:{}, ignoring", mode);
      }
      break;
    }
  }

  /**
   * Sets new sps mode. If the new mode is none, then it will disable the sps
   * feature completely by clearing all queued up sps path's hint.
   */
  public void changeModeEvent(StoragePolicySatisfierMode newMode) {
    if (!storagePolicyEnabled) {
      LOG.info("Failed to change storage policy satisfier as {} set to {}.",
          DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
      return;
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Updating SPS service status, current mode:{}, new mode:{}",
          mode, newMode);
    }

    switch (newMode) {
    case EXTERNAL:
      if (mode == newMode) {
        LOG.info("Storage policy satisfier is already in mode:{},"
            + " so ignoring change mode event.", newMode);
        return;
      }
      spsService.stopGracefully();
      break;
    case NONE:
      if (mode == newMode) {
        LOG.info("Storage policy satisfier is already disabled, mode:{}"
            + " so ignoring change mode event.", newMode);
        return;
      }
      LOG.info("Disabling StoragePolicySatisfier, mode:{}", newMode);
      spsService.stop(true);
      clearPathIds();
      break;
    default:
      if (LOG.isDebugEnabled()) {
        LOG.debug("Given mode: {} is invalid", newMode);
      }
      break;
    }

    // update sps mode
    mode = newMode;
  }

  /**
   * @return true if the internal storage policy satisfier daemon is running,
   *         false otherwise.
   */
  @VisibleForTesting
  public boolean isSatisfierRunning() {
    return spsService.isRunning();
  }

  /**
   * @return the next SPS path id, on which path users has invoked to satisfy
   *         storages.
   */
  public Long getNextPathId() {
    synchronized (pathsToBeTraversed) {
      return pathsToBeTraversed.poll();
    }
  }

  /**
   * Verify that satisfier queue limit exceeds allowed outstanding limit.
   * @throws IOException
   */
  public void verifyOutstandingPathQLimit() throws IOException {
    long size = pathsToBeTraversed.size();
    // Checking that the SPS call Q exceeds the allowed limit.
    if (outstandingPathsLimit - size <= 0) {
      LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}",
          outstandingPathsLimit, size);
      throw new IOException("Outstanding satisfier queue limit: "
          + outstandingPathsLimit + " exceeded, try later!");
    }
  }

  /**
   * Removes the SPS path id from the list of sps paths.
   *
   * @throws IOException
   */
  private void clearPathIds(){
    synchronized (pathsToBeTraversed) {
      Iterator<Long> iterator = pathsToBeTraversed.iterator();
      while (iterator.hasNext()) {
        Long trackId = iterator.next();
        try {
          namesystem.removeXattr(trackId,
              HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
        } catch (IOException e) {
          LOG.debug("Failed to remove sps xattr!", e);
        }
        iterator.remove();
      }
    }
  }

  /**
   * Clean up all sps path ids.
   */
  public void removeAllPathIds() {
    synchronized (pathsToBeTraversed) {
      pathsToBeTraversed.clear();
    }
  }

  /**
   * Adds the sps path to SPSPathIds list.
   * @param id
   */
  public void addPathId(long id) {
    synchronized (pathsToBeTraversed) {
      pathsToBeTraversed.add(id);
    }
  }

  /**
   * @return true if sps is configured as an external
   *         service, false otherwise.
   */
  public boolean isEnabled() {
    return mode == StoragePolicySatisfierMode.EXTERNAL;
  }

  /**
   * @return sps service mode.
   */
  public StoragePolicySatisfierMode getMode() {
    return mode;
  }

  /**
   * @return the number of paths to be processed by storage policy satisfier.
   */
  public int getPendingSPSPaths() {
    return pathsToBeTraversed.size();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BlockMoveTaskHandler 源码

hadoop BlockMovementListener 源码

hadoop BlockStorageMovementAttemptedItems 源码

hadoop BlockStorageMovementNeeded 源码

hadoop Context 源码

hadoop DatanodeCacheManager 源码

hadoop FileCollector 源码

hadoop ItemInfo 源码

hadoop SPSService 源码

hadoop StoragePolicySatisfier 源码

0  赞