hadoop DistCh 源码

  • 2022-10-20
  • 浏览 (116)

haddop DistCh 代码

文件路径:/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCh.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.tools;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Stack;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.InvalidInputException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;

/**
 * A Map-reduce program to recursively change files properties
 * such as owner, group and permission.
 */
public class DistCh extends DistTool {
  static final String NAME = "distch";
  static final String JOB_DIR_LABEL = NAME + ".job.dir";
  static final String OP_LIST_LABEL = NAME + ".op.list";
  static final String OP_COUNT_LABEL = NAME + ".op.count";

  static final String USAGE = "java " + DistCh.class.getName() 
      + " [OPTIONS] <path:owner:group:permission>+ "

      + "\n\nThe values of owner, group and permission can be empty."
      + "\nPermission is a octal number."

      + "\n\nOPTIONS:"
      + "\n-f <urilist_uri>       Use list at <urilist_uri> as src list"
      + "\n-i                     Ignore failures"
      + "\n-log <logdir>          Write logs to <logdir>"
      ;

  private static final long OP_PER_MAP =  1000;
  private static final int MAX_MAPS_PER_NODE = 20;
  private static final int SYNC_FILE_MAX = 10;

  static enum Counter { SUCCEED, FAIL }

  static enum Option {
    IGNORE_FAILURES("-i", NAME + ".ignore.failures");

    final String cmd, propertyname;

    private Option(String cmd, String propertyname) {
      this.cmd = cmd;
      this.propertyname = propertyname;
    }
  }

  DistCh(Configuration conf) {
    super(createJobConf(conf));
  }

  private static JobConf createJobConf(Configuration conf) {
    JobConf jobconf = new JobConf(conf, DistCh.class);
    jobconf.setJobName(NAME);
    jobconf.setMapSpeculativeExecution(false);

    jobconf.setInputFormat(ChangeInputFormat.class);
    jobconf.setOutputKeyClass(Text.class);
    jobconf.setOutputValueClass(Text.class);

    jobconf.setMapperClass(ChangeFilesMapper.class);
    jobconf.setNumReduceTasks(0);
    return jobconf;
  }

  /** File operations. */
  static class FileOperation implements Writable {
    private Path src;
    private String owner;
    private String group;
    private FsPermission permission;

    FileOperation() {}

    FileOperation(Path src, FileOperation that) {
      this.src = src;
      this.owner = that.owner;
      this.group = that.group;
      this.permission = that.permission;
      checkState();
    }

    /**
     * path:owner:group:permission
     * e.g.
     * /user/foo:foo:bar:700 
     */
    FileOperation(String line) {
      try {
        String[] t = line.split(":", 4);
        for(int i = 0; i < t.length; i++) {
          if ("".equals(t[i])) {
            t[i] = null;
          }
        }

        src = new Path(t[0]);
        owner = t[1];
        group = t[2];
        permission = t[3] == null? null:
          new FsPermission(Short.parseShort(t[3], 8));

        checkState();
      }
      catch(Exception e) {
        throw (IllegalArgumentException)new IllegalArgumentException(
            "line=" + line).initCause(e);
      }
    }

    private void checkState() throws IllegalStateException {
      if (owner == null && group == null && permission == null) {
        throw new IllegalStateException(
            "owner == null && group == null && permission == null");
      }
    }

    static final FsPermission FILE_UMASK
        = FsPermission.createImmutable((short)0111);

    private boolean isDifferent(FileStatus original) {
      if (owner != null && !owner.equals(original.getOwner())) {
        return true;
      }
      if (group != null && !group.equals(original.getGroup())) {
        return true;
      }
      if (permission != null) {
        FsPermission orig = original.getPermission();
        return original.isDirectory()? !permission.equals(orig):
          !permission.applyUMask(FILE_UMASK).equals(orig);
      }
      return false;
    }

    void run(Configuration conf) throws IOException {
      FileSystem fs = src.getFileSystem(conf);
      if (permission != null) {
        fs.setPermission(src, permission);
      }
      if (owner != null || group != null) {
        fs.setOwner(src, owner, group);
      }
    }

    /** {@inheritDoc} */
    public void readFields(DataInput in) throws IOException {
      this.src = new Path(Text.readString(in));
      owner = DistTool.readString(in);
      group = DistTool.readString(in);
      permission = in.readBoolean()? FsPermission.read(in): null;
    }

    /** {@inheritDoc} */
    public void write(DataOutput out) throws IOException {
      Text.writeString(out, src.toString());
      DistTool.writeString(out, owner);
      DistTool.writeString(out, group);

      boolean b = permission != null;
      out.writeBoolean(b);
      if (b) {permission.write(out);}
    }

    /** {@inheritDoc} */
    public String toString() {
      return src + ":" + owner + ":" + group + ":" + permission; 
    }
  }

  /** Responsible for generating splits of the src file list. */
  static class ChangeInputFormat implements InputFormat<Text, FileOperation> {
    /** Do nothing. */
    public void validateInput(JobConf job) {}

    /**
     * Produce splits such that each is no greater than the quotient of the
     * total size and the number of splits requested.
     * @param job The handle to the JobConf object
     * @param numSplits Number of splits requested
     */
    public InputSplit[] getSplits(JobConf job, int numSplits
        ) throws IOException {
      final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
      final int targetcount = srcCount / numSplits;
      String srclist = job.get(OP_LIST_LABEL, "");
      if (srcCount < 0 || "".equals(srclist)) {
        throw new RuntimeException("Invalid metadata: #files(" + srcCount +
                                   ") listuri(" + srclist + ")");
      }
      Path srcs = new Path(srclist);
      FileSystem fs = srcs.getFileSystem(job);

      List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);

      Text key = new Text();
      FileOperation value = new FileOperation();
      long prev = 0L;
      int count = 0; //count src
      try (SequenceFile.Reader in = new SequenceFile.Reader(fs, srcs, job)) {
        for ( ; in.next(key, value); ) {
          long curr = in.getPosition();
          long delta = curr - prev;
          if (++count > targetcount) {
            count = 0;
            splits.add(new FileSplit(srcs, prev, delta, (String[])null));
            prev = curr;
          }
        }
      }
      long remaining = fs.getFileStatus(srcs).getLen() - prev;
      if (remaining != 0) {
        splits.add(new FileSplit(srcs, prev, remaining, (String[])null));
      }
      LOG.info("numSplits="  + numSplits + ", splits.size()=" + splits.size());
      return splits.toArray(new FileSplit[splits.size()]);
    }

    /** {@inheritDoc} */
    public RecordReader<Text, FileOperation> getRecordReader(InputSplit split,
        JobConf job, Reporter reporter) throws IOException {
      return new SequenceFileRecordReader<Text, FileOperation>(job,
          (FileSplit)split);
    }
  }

  /** The mapper for changing files. */
  static class ChangeFilesMapper 
      implements Mapper<Text, FileOperation, WritableComparable<?>, Text> {
    private JobConf jobconf;
    private boolean ignoreFailures;

    private int failcount = 0;
    private int succeedcount = 0;

    private String getCountString() {
      return "Succeeded: " + succeedcount + " Failed: " + failcount;
    }

    /** {@inheritDoc} */
    public void configure(JobConf job) {
      this.jobconf = job;
      ignoreFailures=job.getBoolean(Option.IGNORE_FAILURES.propertyname,false);
    }

    /** Run a FileOperation */
    public void map(Text key, FileOperation value,
        OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
        ) throws IOException {
      try {
        value.run(jobconf);
        ++succeedcount;
        reporter.incrCounter(Counter.SUCCEED, 1);
      } catch (IOException e) {
        ++failcount;
        reporter.incrCounter(Counter.FAIL, 1);

        String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
        out.collect(null, new Text(s));
        LOG.info(s);
      } finally {
        reporter.setStatus(getCountString());
      }
    }

    /** {@inheritDoc} */
    public void close() throws IOException {
      if (failcount == 0 || ignoreFailures) {
        return;
      }
      throw new IOException(getCountString());
    }
  }

  private static void check(Configuration conf, List<FileOperation> ops
      ) throws InvalidInputException {
    List<Path> srcs = new ArrayList<Path>();
    for(FileOperation op : ops) {
      srcs.add(op.src);
    }
    DistTool.checkSource(conf, srcs);
  }

  private static List<FileOperation> fetchList(Configuration conf, Path inputfile
      ) throws IOException {
    List<FileOperation> result = new ArrayList<FileOperation>();
    for(String line : readFile(conf, inputfile)) {
      result.add(new FileOperation(line));
    }
    return result;
  }

  /** This is the main driver for recursively changing files properties. */
  public int run(String[] args) throws Exception {
    List<FileOperation> ops = new ArrayList<FileOperation>();
    Path logpath = null;
    boolean isIgnoreFailures = false;

    try {
      for (int idx = 0; idx < args.length; idx++) {
        if ("-f".equals(args[idx])) {
          if (++idx ==  args.length) {
            System.out.println("urilist_uri not specified");
            System.out.println(USAGE);
            return -1;
          }
          ops.addAll(fetchList(jobconf, new Path(args[idx])));
        } else if (Option.IGNORE_FAILURES.cmd.equals(args[idx])) {
          isIgnoreFailures = true;
        } else if ("-log".equals(args[idx])) {
          if (++idx ==  args.length) {
            System.out.println("logdir not specified");
            System.out.println(USAGE);
            return -1;
          }
          logpath = new Path(args[idx]);
        } else if ('-' == args[idx].codePointAt(0)) {
          System.out.println("Invalid switch " + args[idx]);
          System.out.println(USAGE);
          ToolRunner.printGenericCommandUsage(System.out);
          return -1;
        } else {
          ops.add(new FileOperation(args[idx]));
        }
      }
      // mandatory command-line parameters
      if (ops.isEmpty()) {
        throw new IllegalStateException("Operation is empty");
      }
      LOG.info("ops=" + ops);
      LOG.info("isIgnoreFailures=" + isIgnoreFailures);
      jobconf.setBoolean(Option.IGNORE_FAILURES.propertyname, isIgnoreFailures);
      check(jobconf, ops);

      try {
        if (setup(ops, logpath)) {
          JobClient.runJob(jobconf);
        }
      } finally {
        try {
          if (logpath == null) {
            //delete log directory
            final Path logdir = FileOutputFormat.getOutputPath(jobconf);
            if (logdir != null) {
              logdir.getFileSystem(jobconf).delete(logdir, true);
            }
          }
        }
        finally {
          //delete job directory
          final String jobdir = jobconf.get(JOB_DIR_LABEL);
          if (jobdir != null) {
            final Path jobpath = new Path(jobdir);
            jobpath.getFileSystem(jobconf).delete(jobpath, true);
          }
        }
      }
    } catch(DuplicationException e) {
      LOG.error("Input error:", e);
      return DuplicationException.ERROR_CODE;
    } catch(Exception e) {
      LOG.error(NAME + " failed: ", e);
      System.out.println(USAGE);
      ToolRunner.printGenericCommandUsage(System.out);
      return -1;
    }
    return 0;
  }

  /** Calculate how many maps to run. */
  private static int getMapCount(int srcCount, int numNodes) {
    int numMaps = (int)(srcCount / OP_PER_MAP);
    numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
    return Math.max(numMaps, 1);
  }

  private boolean setup(List<FileOperation> ops, Path log) 
  throws IOException {
    final String randomId = getRandomId();
    JobClient jClient = new JobClient(jobconf);
    Path stagingArea;
    try {
      stagingArea = JobSubmissionFiles.getStagingDir(
                       jClient.getClusterHandle(), jobconf);
    } catch (InterruptedException ie){
      throw new IOException(ie);
    }
    Path jobdir = new Path(stagingArea + NAME + "_" + randomId);
    FsPermission mapredSysPerms = 
      new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
    FileSystem.mkdirs(jClient.getFs(), jobdir, mapredSysPerms);
    LOG.info(JOB_DIR_LABEL + "=" + jobdir);

    if (log == null) {
      log = new Path(jobdir, "_logs");
    }
    FileOutputFormat.setOutputPath(jobconf, log);
    LOG.info("log=" + log);

    //create operation list
    FileSystem fs = jobdir.getFileSystem(jobconf);
    Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
    jobconf.set(OP_LIST_LABEL, opList.toString());
    int opCount = 0, synCount = 0;
    try (SequenceFile.Writer opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
            FileOperation.class, SequenceFile.CompressionType.NONE)) {
      for(FileOperation op : ops) {
        FileStatus srcstat = fs.getFileStatus(op.src); 
        if (srcstat.isDirectory() && op.isDifferent(srcstat)) {
          ++opCount;
          opWriter.append(new Text(op.src.toString()), op);
        }

        Stack<Path> pathstack = new Stack<Path>();
        for(pathstack.push(op.src); !pathstack.empty(); ) {
          for(FileStatus stat : fs.listStatus(pathstack.pop())) {
            if (stat.isDirectory()) {
              pathstack.push(stat.getPath());
            }

            if (op.isDifferent(stat)) {              
              ++opCount;
              if (++synCount > SYNC_FILE_MAX) {
                opWriter.sync();
                synCount = 0;
              }
              Path f = stat.getPath();
              opWriter.append(new Text(f.toString()), new FileOperation(f, op));
            }
          }
        }
      }
    }

    checkDuplication(fs, opList, new Path(jobdir, "_sorted"), jobconf);
    jobconf.setInt(OP_COUNT_LABEL, opCount);
    LOG.info(OP_COUNT_LABEL + "=" + opCount);
    jobconf.setNumMapTasks(getMapCount(opCount,
        new JobClient(jobconf).getClusterStatus().getTaskTrackers()));
    return opCount != 0;    
  }

  private static void checkDuplication(FileSystem fs, Path file, Path sorted,
    Configuration conf) throws IOException {
    SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
        new Text.Comparator(), Text.class, FileOperation.class, conf);
    sorter.sort(file, sorted);
    try (SequenceFile.Reader in = new SequenceFile.Reader(fs, sorted, conf)) {
      FileOperation curop = new FileOperation();
      Text prevsrc = null, cursrc = new Text(); 
      for(; in.next(cursrc, curop); ) {
        if (prevsrc != null && cursrc.equals(prevsrc)) {
          throw new DuplicationException(
            "Invalid input, there are duplicated files in the sources: "
            + prevsrc + ", " + cursrc);
        }
        prevsrc = cursrc;
        cursrc = new Text();
        curop = new FileOperation();
      }
    }
  } 

  public static void main(String[] args) throws Exception {
    System.exit(ToolRunner.run(new DistCh(new Configuration()), args));
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop DistTool 源码

hadoop package-info 源码

0  赞