hadoop WindowsSecureContainerExecutor 源码

  • 2022-10-20
  • 浏览 (205)

haddop WindowsSecureContainerExecutor 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.nodemanager;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DelegateToFileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO.Windows;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.CommandExecutor;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;

/**
 * Windows secure container executor (WSCE).
 * This class offers a secure container executor on Windows, similar to the 
 * LinuxContainerExecutor. As the NM does not run on a high privileged context, 
 * this class delegates elevated operations to the helper hadoopwintuilsvc, 
 * implemented by the winutils.exe running as a service.
 * JNI and LRPC is used to communicate with the privileged service.
 */
public class WindowsSecureContainerExecutor extends DefaultContainerExecutor {
  
  private static final Logger LOG = LoggerFactory
      .getLogger(WindowsSecureContainerExecutor.class);
  
  public static final String LOCALIZER_PID_FORMAT = "STAR_LOCALIZER_%s";
  
  
  /**
   * This class is a container for the JNI Win32 native methods used by WSCE.
   */
  private static class Native {

    private static boolean nativeLoaded = false;

    static {
      if (NativeCodeLoader.isNativeCodeLoaded()) {
        try {
          initWsceNative();
          nativeLoaded = true;
        } catch (Throwable t) {
          LOG.info("Unable to initialize WSCE Native libraries", t);
        }
      }
    }

    /** Initialize the JNI method ID and class ID cache */
    private static native void initWsceNative();
    
    
    /**
     * This class contains methods used by the WindowsSecureContainerExecutor
     * file system operations.
     */
    public static class Elevated {
      private static final int MOVE_FILE = 1;
      private static final int COPY_FILE = 2;

      public static void mkdir(Path dirName) throws IOException {
        if (!nativeLoaded) {
          throw new IOException("Native WSCE libraries are required for mkdir");
        }
        elevatedMkDirImpl(dirName.toString());
      }
      
      private static native void elevatedMkDirImpl(String dirName) 
          throws IOException;
      
      public static void chown(Path fileName, String user, String group) 
          throws IOException {
        if (!nativeLoaded) {
          throw new IOException("Native WSCE libraries are required for chown");
        }
        elevatedChownImpl(fileName.toString(), user, group);
      }
      
      private static native void elevatedChownImpl(String fileName, String user, 
          String group) throws IOException;
      
      public static void move(Path src, Path dst, boolean replaceExisting) 
          throws IOException {
        if (!nativeLoaded) {
          throw new IOException("Native WSCE libraries are required for move");
        }
        elevatedCopyImpl(MOVE_FILE, src.toString(), dst.toString(), 
            replaceExisting);
      }
      
      public static void copy(Path src, Path dst, boolean replaceExisting) 
          throws IOException {
        if (!nativeLoaded) {
          throw new IOException("Native WSCE libraries are required for copy");
        }
        elevatedCopyImpl(COPY_FILE, src.toString(), dst.toString(), 
            replaceExisting);
      }
      
      private static native void elevatedCopyImpl(int operation, String src, 
          String dst, boolean replaceExisting) throws IOException;
      
      public static void chmod(Path fileName, int mode) throws IOException {
        if (!nativeLoaded) {
          throw new IOException("Native WSCE libraries are required for chmod");
        }
        elevatedChmodImpl(fileName.toString(), mode);
      }
      
      private static native void elevatedChmodImpl(String path, int mode) 
          throws IOException;
      
      public static void killTask(String containerName) throws IOException {
        if (!nativeLoaded) {
          throw new IOException("Native WSCE libraries are required for killTask");
        }
        elevatedKillTaskImpl(containerName);
      }
      
      private static native void elevatedKillTaskImpl(String containerName) 
          throws IOException;

      public static OutputStream create(Path f, boolean append)  
          throws IOException {
        if (!nativeLoaded) {
          throw new IOException("Native WSCE libraries are required for create");
        }
        
        long desiredAccess = Windows.GENERIC_WRITE;
        long shareMode = 0L;
        long creationDisposition = append ? 
            Windows.OPEN_ALWAYS : Windows.CREATE_ALWAYS;
        long flags = Windows.FILE_ATTRIBUTE_NORMAL;
        
        String fileName = f.toString();
        fileName = fileName.replace('/', '\\');
        
        long hFile = elevatedCreateImpl(
            fileName, desiredAccess, shareMode, creationDisposition, flags);
        return new FileOutputStream(
            WinutilsProcessStub.getFileDescriptorFromHandle(hFile));
      }
      
      private static native long elevatedCreateImpl(String path, 
          long desiredAccess, long shareMode,
          long creationDisposition, long flags) throws IOException;
      
      
      public static boolean deleteFile(Path path) throws IOException {
        if (!nativeLoaded) {
          throw new IOException("Native WSCE libraries are required for deleteFile");
        }
        
        return elevatedDeletePathImpl(path.toString(), false);
      }

      public static boolean deleteDirectory(Path path) throws IOException {
        if (!nativeLoaded) {
          throw new IOException("Native WSCE libraries are required for deleteDirectory");
        }
        
        return elevatedDeletePathImpl(path.toString(), true);
      }

      public native static boolean elevatedDeletePathImpl(String path, 
          boolean isDir) throws IOException;
    }

    /**
     * Wraps a process started by the winutils service helper.
     *
     */
    public static class WinutilsProcessStub extends Process {
      
      private final long hProcess;
      private final long hThread;
      private boolean disposed = false;
      
      private final InputStream stdErr;
      private final InputStream stdOut;
      private final OutputStream stdIn;
      
      public WinutilsProcessStub(long hProcess, long hThread, long hStdIn, 
          long hStdOut, long hStdErr) {
        this.hProcess = hProcess;
        this.hThread = hThread;
        
        this.stdIn = new FileOutputStream(getFileDescriptorFromHandle(hStdIn));
        this.stdOut = new FileInputStream(getFileDescriptorFromHandle(hStdOut));
        this.stdErr = new FileInputStream(getFileDescriptorFromHandle(hStdErr));
      }
      
      public static native FileDescriptor getFileDescriptorFromHandle(long handle);
      
      @Override
      public native void destroy();
      
      @Override
      public native int exitValue();
      
      @Override
      public InputStream getErrorStream() {
        return stdErr;
      }
      @Override
      public InputStream getInputStream() {
        return stdOut;
      }
      @Override
      public OutputStream getOutputStream() {
        return stdIn;
      }
      @Override
      public native int waitFor() throws InterruptedException;

      public synchronized native void dispose();

      public native void resume() throws NativeIOException;
    }
    
    public synchronized static WinutilsProcessStub createTaskAsUser(
        String cwd, String jobName, String user, String pidFile, String cmdLine)
      throws IOException {
      if (!nativeLoaded) {
        throw new IOException(
            "Native WSCE  libraries are required for createTaskAsUser");
      }
      synchronized(Shell.WindowsProcessLaunchLock) {
        return createTaskAsUser0(cwd, jobName, user, pidFile, cmdLine);
      }
    }

    private static native WinutilsProcessStub createTaskAsUser0(
      String cwd, String jobName, String user, String pidFile, String cmdLine)
      throws NativeIOException;
  }

  /**
   * A shell script wrapper builder for WSCE.  
   * Overwrites the default behavior to remove the creation of the PID file in 
   * the script wrapper. WSCE creates the pid file as part of launching the 
   * task in winutils.
   */
  private class WindowsSecureWrapperScriptBuilder 
    extends LocalWrapperScriptBuilder {

    public WindowsSecureWrapperScriptBuilder(Path containerWorkDir) {
      super(containerWorkDir);
    }

    @Override
    protected void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout) {
      pout.format("@call \"%s\"", launchDst);
    }
  }

  /**
   * This is a skeleton file system used to elevate certain operations.
   * WSCE has to create container dirs under local/userchache/$user but
   * this dir itself is owned by $user, with chmod 750. As ther NM has no
   * write access, it must delegate the write operations to the privileged
   * hadoopwintuilsvc.
   */
  private static class ElevatedFileSystem extends DelegateToFileSystem {

    /**
     * This overwrites certain RawLocalSystem operations to be performed by a 
     * privileged process.
     * 
     */
    private static class ElevatedRawLocalFilesystem extends RawLocalFileSystem {
      
      @Override
      protected boolean mkOneDirWithMode(Path path, File p2f,
          FsPermission permission) throws IOException {
        if (LOG.isDebugEnabled()) {
          LOG.debug(String.format("EFS:mkOneDirWithMode: %s %s", path,
              permission));
        }
        boolean ret = false;

        // File.mkdir returns false, does not throw. Must mimic it.
        try {
          Native.Elevated.mkdir(path);
          setPermission(path, permission);
          ret = true;
        }
        catch(Throwable e) {
          if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("EFS:mkOneDirWithMode: %s",
                org.apache.hadoop.util.StringUtils.stringifyException(e)));
          }
        }
        return ret;
      }
      
      @Override
      public void setPermission(Path p, FsPermission permission) 
          throws IOException {
        if (LOG.isDebugEnabled()) {
          LOG.debug(String.format("EFS:setPermission: %s %s", p, permission));
        }
        Native.Elevated.chmod(p, permission.toShort());
      }
      
      @Override
      public void setOwner(Path p, String username, String groupname) 
          throws IOException {
        if (LOG.isDebugEnabled()) {
          LOG.debug(String.format("EFS:setOwner: %s %s %s", 
              p, username, groupname));
        }
        Native.Elevated.chown(p, username, groupname);
      }
      
      @Override
      protected OutputStream createOutputStreamWithMode(Path f, boolean append,
          FsPermission permission) throws IOException {
        if (LOG.isDebugEnabled()) {
          LOG.debug(String.format("EFS:createOutputStreamWithMode: %s %b %s", f,
              append, permission));
        }
        boolean success = false;
        OutputStream os = Native.Elevated.create(f, append);
        try {
          setPermission(f, permission);
          success = true;
          return os;
        } finally {
          if (!success) {
            IOUtils.cleanupWithLogger(LOG, os);
          }
        }
      }
      
      @Override
      public boolean delete(Path p, boolean recursive) throws IOException {
        if (LOG.isDebugEnabled()) {
          LOG.debug(String.format("EFS:delete: %s %b", p, recursive));
        }
        
        // The super delete uses the FileUtil.fullyDelete, 
        // but we cannot rely on that because we need to use the elevated 
        // operations to remove the files
        //
        File f = pathToFile(p);
        if (!f.exists()) {
          //no path, return false "nothing to delete"
          return false;
        }
        else if (f.isFile()) {
          return Native.Elevated.deleteFile(p);
        } 
        else if (f.isDirectory()) {
          
          // This is a best-effort attempt. There are race conditions in that
          // child files can be created/deleted after we snapped the list. 
          // No need to protect against that case.
          File[] files = FileUtil.listFiles(f);
          int childCount = files.length;
          
          if (recursive) {
            for(File child:files) {
              if (delete(new Path(child.getPath()), recursive)) {
                --childCount;
              }
            }
          }
          if (childCount == 0) {
            return Native.Elevated.deleteDirectory(p);
          } 
          else {
            throw new IOException("Directory " + f.toString() + " is not empty");
          }
        }
        else {
          // This can happen under race conditions if an external agent 
          // is messing with the file type between IFs
          throw new IOException("Path " + f.toString() + 
              " exists, but is neither a file nor a directory");
        }
      }
    }

    protected ElevatedFileSystem() throws IOException, URISyntaxException {
      super(FsConstants.LOCAL_FS_URI,
          new ElevatedRawLocalFilesystem(), 
          new Configuration(),
          FsConstants.LOCAL_FS_URI.getScheme(),
          false);
    }
  }
  
  private static class WintuilsProcessStubExecutor 
  implements Shell.CommandExecutor {
    private Native.WinutilsProcessStub processStub;
    private StringBuilder output = new StringBuilder();
    private int exitCode;
    
    private enum State {
      INIT,
      RUNNING,
      COMPLETE
    };
    
    private State state;
    
    private final String cwd;
    private final String jobName;
    private final String userName;
    private final String pidFile;
    private final String cmdLine;

    public WintuilsProcessStubExecutor(
        String cwd, 
        String jobName, 
        String userName, 
        String pidFile,
        String cmdLine) {
      this.cwd = cwd;
      this.jobName = jobName;
      this.userName = userName;
      this.pidFile = pidFile;
      this.cmdLine = cmdLine;
      this.state = State.INIT;
    }    
    
    private void assertComplete() throws IOException {
      if (state != State.COMPLETE) {
        throw new IOException("Process is not complete");
      }
    }
    
    public String getOutput () throws IOException {
      assertComplete();
      return output.toString();
    }
    
    public int getExitCode() throws IOException {
      assertComplete();
      return exitCode;
    }
    
    public void validateResult() throws IOException {
      assertComplete();
      if (0 != exitCode) {
        LOG.warn(output.toString());
        throw new IOException("Processs exit code is:" + exitCode);
      }
    }
    
    private Thread startStreamReader(final InputStream stream) 
        throws IOException {
      Thread streamReaderThread = new Thread() {
        
        @Override
        public void run() {
          try (BufferedReader lines = new BufferedReader(
                   new InputStreamReader(stream, Charset.forName("UTF-8")))) {
            char[] buf = new char[512];
            int nRead;
            while ((nRead = lines.read(buf, 0, buf.length)) > 0) {
              output.append(buf, 0, nRead);
            }
          } catch (Throwable t) {
            LOG.error("Error occurred reading the process stdout", t);
          }
        }
      };
      streamReaderThread.start();
      return streamReaderThread;
    }

    public void execute() throws IOException {
      if (state != State.INIT) {
        throw new IOException("Process is already started");
      }
      processStub = Native.createTaskAsUser(cwd,
          jobName, userName, pidFile, cmdLine);
      state = State.RUNNING;

      Thread stdOutReader = startStreamReader(processStub.getInputStream());
      Thread stdErrReader = startStreamReader(processStub.getErrorStream());
      
      try {
        processStub.resume();
        processStub.waitFor();
        stdOutReader.join();
        stdErrReader.join();
      }
      catch(InterruptedException ie) {
        throw new IOException(ie);
      }
      
      exitCode = processStub.exitValue();
      state = State.COMPLETE;
    }

    @Override
    public void close() {
      if (processStub != null) {
        processStub.dispose();
      }
    }
  }

  private String nodeManagerGroup;
  
  /** 
   * Permissions for user WSCE dirs.
   */
  static final short DIR_PERM = (short)0750;  
  
  public WindowsSecureContainerExecutor() 
      throws IOException, URISyntaxException {
    super(FileContext.getFileContext(new ElevatedFileSystem(), 
        new Configuration()));
  }

  @Override
  public void setConf(Configuration conf) {
    super.setConf(conf);
    nodeManagerGroup = conf.get(
        YarnConfiguration.NM_WINDOWS_SECURE_CONTAINER_GROUP);
  }
  
  @Override
  protected String[] getRunCommand(String command, String groupId,
      String userName, Path pidFile, Configuration conf) {
    File f = new File(command);
    if (LOG.isDebugEnabled()) {
      LOG.debug(String.format("getRunCommand: %s exists:%b", 
          command, f.exists()));
    }
    return new String[] { Shell.getWinUtilsPath(), "task",
        "createAsUser", groupId,
        userName, pidFile.toString(), "cmd /c " + command };
  }
  
  @Override
  protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder(
      String containerIdStr, Path containerWorkDir) {
   return  new WindowsSecureWrapperScriptBuilder(containerWorkDir);
  }
  
  @Override
  protected void copyFile(Path src, Path dst, String owner) throws IOException {
    LOG.debug("copyFile: {} -> {} owner:{}", src, dst, owner);
    Native.Elevated.copy(src,  dst, true);
    Native.Elevated.chown(dst, owner, nodeManagerGroup);
  }

  @Override
  protected void createDir(Path dirPath, FsPermission perms,
      boolean createParent, String owner) throws IOException {
    
    // WSCE requires dirs to be 750, not 710 as DCE.
    // This is similar to how LCE creates dirs
    //
    perms = new FsPermission(DIR_PERM);
    LOG.debug("createDir: {} perm:{} owner:{}", dirPath, perms, owner);
    
    super.createDir(dirPath, perms, createParent, owner);
    lfs.setOwner(dirPath, owner, nodeManagerGroup);
  }

  @Override
  protected void setScriptExecutable(Path script, String owner) 
      throws IOException {
    LOG.debug("setScriptExecutable: {} owner:{}", script, owner);
    super.setScriptExecutable(script, owner);
    Native.Elevated.chown(script, owner, nodeManagerGroup);
  }

  @Override
  public Path localizeClasspathJar(Path jarPath, Path target, String owner) 
      throws IOException {
    LOG.debug("localizeClasspathJar: {} {} o:{}", jarPath, target, owner);
    createDir(target,  new FsPermission(DIR_PERM), true, owner);
    String fileName = jarPath.getName();
    Path dst = new Path(target, fileName);
    Native.Elevated.move(jarPath, dst, true);
    Native.Elevated.chown(dst, owner, nodeManagerGroup);
    return dst;
  }

  @Override
  public void startLocalizer(LocalizerStartContext ctx) throws IOException,
      InterruptedException {
    Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
    InetSocketAddress nmAddr = ctx.getNmAddr();
    String user = ctx.getUser();
    String appId = ctx.getAppId();
    String locId = ctx.getLocId();
    LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
    List<String> localDirs = dirsHandler.getLocalDirs();
    List<String> logDirs = dirsHandler.getLogDirs();

    Path classpathJarPrivateDir = dirsHandler.getLocalPathForWrite(
        ResourceLocalizationService.NM_PRIVATE_DIR);
    createUserLocalDirs(localDirs, user);
    createUserCacheDirs(localDirs, user);
    createAppDirs(localDirs, user, appId);
    createAppLogDirs(appId, logDirs, user);

    Path appStorageDir = getWorkingDir(localDirs, user, appId);

    String tokenFn = String.format(ContainerExecutor.TOKEN_FILE_NAME_FMT,
        locId);
    Path tokenDst = new Path(appStorageDir, tokenFn);
    copyFile(nmPrivateContainerTokensPath, tokenDst, user);

    File cwdApp = new File(appStorageDir.toString());
    LOG.debug("cwdApp: {}", cwdApp);

    List<String> command ;

    command = new ArrayList<String>();

    //use same jvm as parent
    File jvm = new File(
        new File(System.getProperty("java.home"), "bin"), "java.exe");
    command.add(jvm.toString());

    Path cwdPath = new Path(cwdApp.getPath());

    // Build a temp classpath jar. See ContainerLaunch.sanitizeEnv().
    // Passing CLASSPATH explicitly is *way* too long for command line.
    String classPath = System.getProperty("java.class.path");
    Map<String, String> env = new HashMap<String, String>(System.getenv());
    String jarCp[] = FileUtil.createJarWithClassPath(classPath,
        classpathJarPrivateDir, cwdPath, env);
    String classPathJar = localizeClasspathJar(
        new Path(jarCp[0]), cwdPath, user).toString();
    command.add("-classpath");
    command.add(classPathJar + jarCp[1]);

    String javaLibPath = System.getProperty("java.library.path");
    if (javaLibPath != null) {
      command.add("-Djava.library.path=" + javaLibPath);
    }
    command.addAll(ContainerLocalizer.getJavaOpts(getConf()));

    ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
        tokenFn, localDirs, super.getConf());

    String cmdLine = StringUtils.join(command, " ");

    String localizerPid = String.format(LOCALIZER_PID_FORMAT, locId);

    WintuilsProcessStubExecutor stubExecutor = new WintuilsProcessStubExecutor(
        cwdApp.getAbsolutePath(),
        localizerPid, user, "nul:", cmdLine);
    try {
      stubExecutor.execute();
      stubExecutor.validateResult();
    } finally {
      stubExecutor.close();
      try
      {
        killContainer(localizerPid, Signal.KILL);
      }
      catch(Throwable e) {
        LOG.warn(String.format(
            "An exception occurred during the cleanup of localizer job %s:%n%s",
            localizerPid,
            org.apache.hadoop.util.StringUtils.stringifyException(e)));
      }
    }
  }

  @Override
  protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
      String containerIdStr, String userName, Path pidFile, Resource resource,
      File wordDir, Map<String, String> environment, String[] numaCommands) {
     return new WintuilsProcessStubExecutor(
         wordDir.toString(),
         containerIdStr, userName, pidFile.toString(),
         "cmd /c " + wrapperScriptPath);
   }
   
   @Override
   protected void killContainer(String pid, Signal signal) throws IOException {
     Native.Elevated.killTask(pid);
   }
}

相关信息

hadoop 源码目录

相关文章

hadoop CMgrCompletedAppsEvent 源码

hadoop CMgrCompletedContainersEvent 源码

hadoop CMgrSignalContainersEvent 源码

hadoop CMgrUpdateContainersEvent 源码

hadoop ContainerExecutor 源码

hadoop ContainerManagerEvent 源码

hadoop ContainerManagerEventType 源码

hadoop ContainerStateTransitionListener 源码

hadoop Context 源码

hadoop DefaultContainerExecutor 源码

0  赞