hadoop FpgaResourceAllocator 源码

  • 2022-10-20
  • 浏览 (180)

haddop FpgaResourceAllocator 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga;


import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDevice;


import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;

/**
 * This FPGA resource allocator tends to be used by different FPGA vendor's plugin
 * A "type" parameter is taken into consideration when allocation
 * */
public class FpgaResourceAllocator {

  static final Logger LOG = LoggerFactory.
      getLogger(FpgaResourceAllocator.class);

  private List<FpgaDevice> allowedFpgas = new LinkedList<>();

  //key is resource type of FPGA, vendor plugin supported ID
  private Map<String, List<FpgaDevice>> availableFpgas = new HashMap<>();

  //key is the container ID
  private Map<String, List<FpgaDevice>> containerToFpgaMapping =
      new HashMap<>();

  private Context nmContext;

  @VisibleForTesting
  Map<String, List<FpgaDevice>> getAvailableFpga() {
    return availableFpgas;
  }

  @VisibleForTesting
  List<FpgaDevice> getAllowedFpga() {
    return allowedFpgas;
  }

  public FpgaResourceAllocator(Context ctx) {
    this.nmContext = ctx;
  }

  @VisibleForTesting
  int getAvailableFpgaCount() {
    int count = 0;

    count = availableFpgas.values()
      .stream()
      .mapToInt(i -> i.size())
      .sum();

    return count;
  }

  @VisibleForTesting
  Map<String, List<FpgaDevice>> getUsedFpga() {
    return containerToFpgaMapping;
  }

  @VisibleForTesting
  int getUsedFpgaCount() {
    int count = 0;

    count = containerToFpgaMapping.values()
        .stream()
        .mapToInt(i -> i.size())
        .sum();

    return count;
  }

  public static class FpgaAllocation {

    private List<FpgaDevice> allowed = Collections.emptyList();

    private List<FpgaDevice> denied = Collections.emptyList();

    FpgaAllocation(List<FpgaDevice> allowed, List<FpgaDevice> denied) {
      if (allowed != null) {
        this.allowed = ImmutableList.copyOf(allowed);
      }
      if (denied != null) {
        this.denied = ImmutableList.copyOf(denied);
      }
    }

    public List<FpgaDevice> getAllowed() {
      return allowed;
    }

    public List<FpgaDevice> getDenied() {
      return denied;
    }

    @Override
    public String toString() {
      StringBuilder sb = new StringBuilder();
      sb.append("\nFpgaAllocation\n\tAllowed:\n");
      for (FpgaDevice device : allowed) {
        sb.append("\t\t");
        sb.append(device + "\n");
      }
      sb.append("\tDenied\n");
      for (FpgaDevice device : denied) {
        sb.append("\t\t");
        sb.append(device + "\n");
      }
      return sb.toString();
    }
  }

  // called once during initialization
  public synchronized void addFpgaDevices(String type, List<FpgaDevice> list) {
    availableFpgas.putIfAbsent(type, new LinkedList<>());
    List<FpgaDevice> fpgaDevices = new LinkedList<>();

    for (FpgaDevice device : list) {
      if (!allowedFpgas.contains(device)) {
        fpgaDevices.add(device);
        availableFpgas.get(type).add(device);
      } else {
        LOG.warn("Duplicate device found: " + device + ". Ignored");
      }
    }

    allowedFpgas = ImmutableList.copyOf(fpgaDevices);
    LOG.info("Added a list of FPGA Devices: " + allowedFpgas);
  }

  public synchronized void updateFpga(String requestor,
      FpgaDevice device, String newIPID, String newHash) {
    device.setIPID(newIPID);
    device.setAocxHash(newHash);
    LOG.info("Update IPID to " + newIPID +
        " for this allocated device: " + device);
    LOG.info("Update IP hash to " + newHash);
  }

  /**
   * Assign {@link FpgaAllocation} with preferred IPID, if no, with random FPGAs
   * @param type vendor plugin supported FPGA device type
   * @param count requested FPGA slot count
   * @param container container id
   * @param ipidHash hash of the localized aocx file
   * @return Instance consists two List of allowed and denied {@link FpgaDevice}
   * @throws ResourceHandlerException When failed to allocate or write state store
   * */
  public synchronized FpgaAllocation assignFpga(String type, long count,
      Container container, String ipidHash) throws ResourceHandlerException {
    List<FpgaDevice> currentAvailableFpga = availableFpgas.get(type);

    String requestor = container.getContainerId().toString();
    if (null == currentAvailableFpga) {
      throw new ResourceHandlerException("No such type of FPGA resource available: " + type);
    }
    if (count < 0 || count > currentAvailableFpga.size()) {
      throw new ResourceHandlerException("Invalid FPGA request count or not enough, requested:" +
          count + ", available:" + getAvailableFpgaCount());
    }
    if (count > 0) {
      // Allocate devices with matching IP first, then any device is ok
      List<FpgaDevice> assignedFpgas = new LinkedList<>();
      int matchIPCount = 0;
      for (int i = 0; i < currentAvailableFpga.size(); i++) {
        String deviceIPIDhash = currentAvailableFpga.get(i).getAocxHash();
        if (deviceIPIDhash != null &&
            deviceIPIDhash.equalsIgnoreCase(ipidHash)) {
          assignedFpgas.add(currentAvailableFpga.get(i));
          currentAvailableFpga.remove(i);
          matchIPCount++;
        }
      }
      int remaining = (int) count - matchIPCount;
      while (remaining > 0) {
        assignedFpgas.add(currentAvailableFpga.remove(0));
        remaining--;
      }

      // Record in state store if we allocated anything
      if (!assignedFpgas.isEmpty()) {
        try {
          nmContext.getNMStateStore().storeAssignedResources(container,
              FPGA_URI, new LinkedList<>(assignedFpgas));
        } catch (IOException e) {
          // failed, give the allocation back
          currentAvailableFpga.addAll(assignedFpgas);
          throw new ResourceHandlerException(e);
        }

        // update state store success, update internal used FPGAs
        containerToFpgaMapping.putIfAbsent(requestor, new LinkedList<>());
        containerToFpgaMapping.get(requestor).addAll(assignedFpgas);
      }

      return new FpgaAllocation(assignedFpgas, currentAvailableFpga);
    }
    return new FpgaAllocation(null, allowedFpgas);
  }

  public synchronized void recoverAssignedFpgas(ContainerId containerId) throws ResourceHandlerException {
    Container c = nmContext.getContainers().get(containerId);
    if (null == c) {
      throw new ResourceHandlerException(
          "This shouldn't happen, cannot find container with id="
              + containerId);
    }

    for (Serializable fpgaDevice :
        c.getResourceMappings().getAssignedResources(FPGA_URI)) {
      if (!(fpgaDevice instanceof FpgaDevice)) {
        throw new ResourceHandlerException(
            "Trying to recover allocated FPGA devices, however it"
                + " is not FpgaDevice type, this shouldn't happen");
      }

      // Make sure it is in allowed FPGA device.
      if (!allowedFpgas.contains(fpgaDevice)) {
        throw new ResourceHandlerException("Try to recover FpgaDevice = " + fpgaDevice
            + " however it is not in allowed device list:" + StringUtils
            .join(";", allowedFpgas));
      }

      // Make sure it is not occupied by anybody else
      Iterator<Map.Entry<String, List<FpgaDevice>>> iterator =
          getUsedFpga().entrySet().iterator();
      while (iterator.hasNext()) {
        if (iterator.next().getValue().contains(fpgaDevice)) {
          throw new ResourceHandlerException("Try to recover FpgaDevice = " + fpgaDevice
              + " however it is already assigned to others");
        }
      }
      getUsedFpga().putIfAbsent(containerId.toString(), new LinkedList<>());
      getUsedFpga().get(containerId.toString()).add((FpgaDevice) fpgaDevice);
      // remove them from available list
      getAvailableFpga().get(((FpgaDevice) fpgaDevice).getType()).remove(fpgaDevice);
    }
  }

  public synchronized void cleanupAssignFpgas(String requestor) {
    List<FpgaDevice> usedFpgas = containerToFpgaMapping.get(requestor);
    if (usedFpgas != null) {
      for (FpgaDevice device : usedFpgas) {
        // Add back to availableFpga
        availableFpgas.get(device.getType()).add(device);
      }
      containerToFpgaMapping.remove(requestor);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop FpgaResourceHandlerImpl 源码

0  赞