hadoop DiffListBySkipList 源码

  • 2022-10-20
  • 浏览 (37)

haddop DiffListBySkipList 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DiffListBySkipList.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode.snapshot;

import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.snapshot.
    DirectoryWithSnapshotFeature.DirectoryDiff;
import org.apache.hadoop.hdfs.server.namenode.snapshot.
    DirectoryWithSnapshotFeature.ChildrenDiff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;

/**
 * SkipList is an implementation of a data structure for storing a sorted list
 * of Directory Diff elements, using a hierarchy of linked lists that connect
 * increasingly sparse subsequences(defined by skip interval here) of the diffs.
 * The elements contained in the tree must be mutually comparable.
 * <p>
 * Consider  a case where we have 10 snapshots for a directory starting from s0
 * to s9 each associated with certain change records in terms of inodes deleted
 * and created after a particular snapshot and before the next snapshot. The
 * sequence will look like this:
 * <p>
 * {@literal s0->s1->s2->s3->s4->s5->s6->s7->s8->s9}.
 * <p>
 * Assuming a skip interval of 3, which means a new diff will be added at a
 * level higher than the current level after we have  ore than 3 snapshots.
 * Next level promotion happens after 9 snapshots and so on.
 * <p>
 * level 2:   {@literal s08------------------------------->s9}
 * level 1:   {@literal S02------->s35-------->s68-------->s9}
 * level 0:  {@literal s0->s1->s2->s3->s4->s5->s6->s7->s8->s9}
 * <p>
 * s02 will be created by combining diffs for s0, s1, s2 once s3 gets created.
 * Similarly, s08 will be created by combining s02, s35 and s68 once s9 gets
 * created.So, for constructing the children list fot s0, we have  to combine
 * s08, s9 and reverse apply to the live fs.
 * <p>
 * Similarly, for constructing the children list for s2, s2, s35, s68 and s9
 * need to get combined(or added) and reverse applied to current fs.
 * <p>
 * This approach will improve the snapshot deletion and snapshot diff
 * calculation.
 * <p>
 * Once a snapshot gets deleted, the list needs to be balanced.
 */
public class DiffListBySkipList implements DiffList<DirectoryDiff> {
  public static final Logger LOG =
      LoggerFactory.getLogger(DiffListBySkipList.class);

  static String childrenDiff2String(ChildrenDiff diff) {
    if (diff == null) {
      return "null";
    }
    return "@" + Integer.toHexString(System.identityHashCode(diff));
  }

  static String skip2String(SkipListNode skipTo, ChildrenDiff diff) {
    return "->" + skipTo + ":diff=" + childrenDiff2String(diff);
  }

  private static class SkipDiff {
    static final SkipDiff[] EMPTY_ARRAY = {};

    /**
     * The references to the subsequent nodes.
     */
    private SkipListNode skipTo;
    /**
     * combined diff over a skip Interval.
     */
    private ChildrenDiff diff;

    SkipDiff(ChildrenDiff diff) {
      this.diff = diff;
    }

    public ChildrenDiff getDiff() {
      return diff;
    }

    public SkipListNode getSkipTo() {
      return skipTo;
    }

    public void setSkipTo(SkipListNode node) {
      skipTo = node;
    }

    public void setDiff(ChildrenDiff diff) {
      this.diff = diff;
    }

    @Override
    public String toString() {
      return skip2String(skipTo, diff);
    }
  }

  /**
   * SkipListNode is an implementation of a DirectoryDiff List node,
   * which stores a Directory Diff and references to subsequent nodes.
   */
  final static class SkipListNode implements Comparable<Integer> {

    /**
     * The data element stored in this node.
     */
    private final DirectoryDiff diff;

    /** Next node. */
    private SkipListNode next;
    /**
     * Array containing combined children diffs over a skip interval.
     */
    private SkipDiff[] skips;

    /**
     * Constructs a new instance of SkipListNode with the specified data element
     * and level.
     *
     * @param diff The element to be stored in the node.
     * @param level
     */
    SkipListNode(DirectoryDiff diff, int level) {
      this.diff = diff;

      this.skips = level > 0? new SkipDiff[level]: SkipDiff.EMPTY_ARRAY;
      for(int i = 0; i < skips.length; i++) {
        skips[i] = new SkipDiff(null);
      }
    }

    /**
     * Returns the level of this SkipListNode.
     */
    public int level() {
      return skips.length;
    }

    void trim() {
      int n = skips.length - 1;
      for (; n >= 0 && skips[n] == null; n--) {
        continue;
      }
      n++;
      if (n < skips.length) {
        skips = n > 0 ? Arrays.copyOf(skips, n) : SkipDiff.EMPTY_ARRAY;
      }
    }

    public DirectoryDiff getDiff() {
      return diff;
    }

    /**
     * Compare diffs with snapshot ID.
     */
    @Override
    public int compareTo(Integer that) {
      return diff.compareTo(that);
    }

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (o == null || getClass() != o.getClass()) {
        return false;
      }
      SkipListNode that = (SkipListNode) o;
      return Objects.equals(diff, that.diff);
    }

    @Override
    public int hashCode() {
      return Objects.hash(diff);
    }

    public void setSkipDiff(ChildrenDiff cDiff, int level) {
      Preconditions.checkArgument(level > 0);
      resize(level);
      skips[level - 1].setDiff(cDiff);
    }

    void setSkipDiff4Target(
        SkipListNode target, int startLevel, ChildrenDiff childrenDiff) {
      for(int i = startLevel; i <= level(); i++) {
        if (getSkipNode(i) != target) {
          return;
        }
        setSkipDiff(childrenDiff, i);
      }
    }

    private void resize(int newLevel) {
      int i = skips.length;
      if (i < newLevel) {
        skips = Arrays.copyOf(skips, newLevel);
        for (; i < newLevel; i++) {
          skips[i] = new SkipDiff(null);
        }
      }
    }

    public void setSkipTo(SkipListNode node, int level) {
      if (level == 0) {
        next = node;
      } else {
        resize(level);
        skips[level - 1].setSkipTo(node);
      }
    }

    public ChildrenDiff getChildrenDiff(int level) {
      if (level == 0) {
        return diff != null? diff.getChildrenDiff(): null;
      } else {
        return skips[level - 1].getDiff();
      }
    }

    SkipListNode getSkipNode(int level) {
      return level == 0? next
          : level <= skips.length? skips[level - 1].getSkipTo()
          : null;
    }

    @Override
    public String toString() {
      return diff != null ? "" + diff.getSnapshotId() : "?";
    }

    StringBuilder appendTo(StringBuilder b) {
      b.append(this).append(": ").append(skip2String(next, getChildrenDiff(0)));
      for(int i = 0; i < skips.length; i++) {
        b.append(", ").append(skips[i]);
      }
      return b;
    }
  }

  /**
   * The reference to the first node of the list.
   * The list will grow linearly once a new Directory diff gets added.
   * All the list inteface defined methods provide a linear view of the list.
   */
  private final List<SkipListNode> skipNodeList;

  /**
   * The head node to the list.
   */
  private SkipListNode head;

  /**
   * Constructs a new, empty instance of SkipList.
   */
  public DiffListBySkipList(int capacity) {
    skipNodeList = new ArrayList<>(capacity);
    head = new SkipListNode(null, 0);
  }

  /**
   * Adds the specified data element to the beginning of the SkipList,
   * if the element is not already present.
   * @param diff the element to be inserted
   */
  @Override
  public void addFirst(DirectoryDiff diff) {
    final int nodeLevel = DirectoryDiffListFactory.randomLevel();
    final SkipListNode[] nodePath = new SkipListNode[nodeLevel + 1];
    Arrays.fill(nodePath, head);

    final SkipListNode newNode = new SkipListNode(diff, nodeLevel);
    for (int level = 0; level <= nodeLevel; level++) {
      if (level > 0) {
        // Case : S0 is added at the beginning and it has 3 levels
        //  suppose the list is like:
        //  level 1: head ------------------->s5------------->NULL
        //  level 0:head->    s1->s2->s3->s4->s5->s6->s7->s8->s9
        //  in this case:
        //  level 2: head -> s0 -------------------------------->NULL
        //  level 1: head -> s0'---------------->s5------------->NULL
        //  level 0:head->   s0->s1->s2->s3->s4->s5->s6->s7->s8->s9
        //  At level 1, we need to combine s0, s1, s2, s3, s4 and s5 and store
        //  as s0'. At level 2, s0 of next is pointing to null;
        //  Note: in this case, the diff of element being added is included
        //  while combining the diffs.
        final SkipListNode nextNode = head.getSkipNode(level);
        if (nextNode != null) {
          ChildrenDiff combined = combineDiff(newNode, nextNode, level);
          if (combined != null) {
            newNode.setSkipDiff(combined, level);
          }
        }
      }
      //insert to the linked list
      newNode.setSkipTo(nodePath[level].getSkipNode(level), level);
      nodePath[level].setSkipTo(newNode, level);
    }
    skipNodeList.add(0, newNode);
  }

  private SkipListNode[] findPreviousNodes(SkipListNode node, int nodeLevel) {
    final SkipListNode[] nodePath = new SkipListNode[nodeLevel + 1];
    SkipListNode cur = head;
    final int headLevel = head.level();
    for (int level = headLevel < nodeLevel ? headLevel : nodeLevel;
         level >= 0; level--) {
      while (cur.getSkipNode(level) != node) {
        cur = cur.getSkipNode(level);
      }
      nodePath[level] = cur;
    }
    for (int level = headLevel + 1; level <= nodeLevel; level++) {
      nodePath[level] = head;
    }
    return nodePath;
  }

  /**
   * Adds the specified data element to the end of the SkipList,
   * if the element is not already present.
   * @param diff the element to be inserted
   */
  @Override
  public boolean addLast(DirectoryDiff diff) {
    final int nodeLevel = DirectoryDiffListFactory.randomLevel();
    final SkipListNode[] nodePath = findPreviousNodes(null, nodeLevel);

    final SkipListNode newNode = new SkipListNode(diff, nodeLevel);
    for (int level = 0; level <= nodeLevel; level++) {
      if (level > 0 && nodePath[level] != head) {
        //  suppose the list is like:
        //  level 2: head ->  s1----------------------------->NULL
        //  level 1: head ->  s1---->s3'------>s5------------->NULL
        //  level 0:head->    s1->s2->s3->s4->s5->s6->s7->s8->s9

        // case : s10 is added at the end the let the level for this node = 4
        //  in this case,
        //  level 2: head ->  s1''------------------------------------>s10
        //  level 1: head ->  s1'---->s3'------>s5'-------------------->s10
        //  level 0:head->    s1->s2->s3->s4->s5->s6->s7->s8->s9---->s10
        //  At level 1, we combine s5, s6, s7, s8, s9 and store as s5'
        //  At level 2, we combine s1', s3', s5' and form s1'' and store at s1.
        // Note : the last element(element being added) diff is not added while
        // combining the diffs.
        ChildrenDiff combined = combineDiff(nodePath[level], newNode, level);
        if (combined != null) {
          nodePath[level].setSkipDiff(combined, level);
        }
      }
      nodePath[level].setSkipTo(newNode, level);
      newNode.setSkipTo(null, level);
    }
    return skipNodeList.add(newNode);
  }

  private static ChildrenDiff combineDiff(SkipListNode from, SkipListNode to,
      int level) {
    ChildrenDiff combined = null;
    ChildrenDiff first = null;

    SkipListNode cur = from;
    for (int i = level - 1; i >= 0; i--) {
      while (cur != to) {
        final SkipListNode next = cur.getSkipNode(i);
        if (next == null) {
          break;
        }

        if (first == null) {
          first = cur.getChildrenDiff(i);
        } else {
          if (combined == null) {
            combined = new ChildrenDiff();
            combined.combinePosterior(first, null);
          }
          combined.combinePosterior(cur.getChildrenDiff(i), null);
        }
        cur = next;
      }
    }
    return combined != null? combined: first;
  }

  /**
   * Returns the data element at the specified index in this SkipList.
   *
   * @param index The index of the element to be returned.
   * @return The element at the specified index in this SkipList.
   */
  @Override
  public DirectoryDiff get(int index) {
    return skipNodeList.get(index).getDiff();
  }

  SkipListNode getSkipListNode(int i) {
    return skipNodeList.get(i);
  }

  /**
   * Removes the element at the specified position in this list.
   *
   * @param index the index of the element to be removed
   * @return the removed DirectoryDiff
   */
  @Override
  public DirectoryDiff remove(int index) {
    final SkipListNode node = getNode(index);

    int headLevel = head.level();
    int nodeLevel = node.level();
    final SkipListNode[] nodePath = findPreviousNodes(node, nodeLevel);

    for (int level = 0; level <= nodeLevel; level++) {
      final SkipListNode previous = nodePath[level];
      final SkipListNode next = node.getSkipNode(level);
      if (level == 0) {
        if (next != null) {
          previous.setSkipDiff4Target(next, 1, previous.getChildrenDiff(0));
        }
      } else if (previous != head) {
        // if the last snapshot is deleted, for all the skip level nodes
        // pointing to the last one, the combined children diff at each level
        // > 0 should be made null and skip pointers will be updated to null.
        // if the snapshot being deleted is not the last one, we have to merge
        // the diff of deleted node at each level to the previous skip level
        // node at that level and the skip pointers will be updated to point to
        // the skip nodes of the deleted node.
        if (next == null) {
          previous.setSkipDiff(null, level);
        } else {
          /* Ideally at level 0, the deleted diff will be combined with
           * the previous diff , and deleted inodes will be cleaned up
           * by passing a deleted processor here while combining the diffs.
           * Level 0 merge with previous diff will be handled inside the
           * {@link AbstractINodeDiffList#deleteSnapshotDiff} function.
           */
          if (node.getChildrenDiff(level) != null) {
            final ChildrenDiff combined;
            if (previous == nodePath[level - 1]
                && next == node.getSkipNode(level - 1)) {
              combined = nodePath[level - 1].getChildrenDiff(level - 1);
              previous.setSkipDiff4Target(next, level + 1, combined);
            } else if (next == previous.getSkipNode(level + 1)) {
              combined = previous.getChildrenDiff(level + 1);
            } else {
              combined = new ChildrenDiff();
              combined.combinePosterior(previous.getChildrenDiff(level), null);
              combined.combinePosterior(node.getChildrenDiff(level), null);
            }
            previous.setSkipDiff(combined, level);
          }
        }
      }
      previous.setSkipTo(next, level);
    }
    if (nodeLevel == headLevel) {
      head.trim();
    }
    return skipNodeList.remove(index).getDiff();
  }

  /**
   * Returns true if this SkipList contains no data elements. In other words,
   * returns true if the size of this SkipList is zero.
   *
   * @return True if this SkipList contains no elements.
   */
  @Override
  public boolean isEmpty() {
    return skipNodeList.isEmpty();
  }

  /**
   * Returns the number of data elements in this SkipList.
   *
   * @return The number of elements in this SkipList.
   */
  @Override
  public int size() {
    return skipNodeList.size();
  }

  /**
   * Iterator is an iterator over the SkipList. This should
   * always provide a linear view of the list.
   */
  @Override
  public Iterator<DirectoryDiff> iterator() {
    final Iterator<SkipListNode> i = skipNodeList.iterator();
    return new Iterator<DirectoryDiff>() {

      @Override
      public boolean hasNext() {
        return i.hasNext();
      }

      @Override
      public DirectoryDiff next() {
        return i.next().getDiff();
      }
    };
  }

  @Override
  public int binarySearch(int key) {
    return Collections.binarySearch(skipNodeList, key);
  }

  private SkipListNode getNode(int index) {
    return skipNodeList.get(index);
  }


  /**
   * This function returns the minimal set of diffs required to combine in
   * order to generate all the changes occurred between fromIndex and
   * toIndex.
   *
   * @param fromIndex index from where the summation has to start(inclusive)
   * @param toIndex   index till where the summation has to end(exclusive)
   * @return list of Directory Diff
   */
  @Override
  public List<DirectoryDiff> getMinListForRange(int fromIndex, int toIndex,
      INodeDirectory dir) {
    final List<DirectoryDiff> subList = new ArrayList<>();
    final int toSnapshotId = get(toIndex - 1).getSnapshotId();
    for (SkipListNode current = getNode(fromIndex); current != null;) {
      SkipListNode next = null;
      ChildrenDiff childrenDiff = null;
      for (int level = current.level(); level >= 0; level--) {
        next = current.getSkipNode(level);
        if (next != null && next.getDiff().compareTo(toSnapshotId) <= 0) {
          childrenDiff = current.getChildrenDiff(level);
          break;
        }
      }
      final DirectoryDiff curDiff = current.getDiff();
      subList.add(childrenDiff == null ? curDiff :
          new DirectoryDiff(curDiff.getSnapshotId(), dir, childrenDiff));

      if (current.getDiff().compareTo(toSnapshotId) == 0) {
        break;
      }
      current = next;
    }
    return subList;
  }

  @Override
  public String toString() {
    final StringBuilder b = new StringBuilder().append(" head: ");
    head.appendTo(b);
    for (SkipListNode n : skipNodeList) {
      n.appendTo(b.append("\n  "));
    }
    return b.toString();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractINodeDiff 源码

hadoop AbstractINodeDiffList 源码

hadoop DiffList 源码

hadoop DiffListByArrayList 源码

hadoop DirectoryDiffListFactory 源码

hadoop DirectorySnapshottableFeature 源码

hadoop DirectoryWithSnapshotFeature 源码

hadoop FSImageFormatPBSnapshot 源码

hadoop FileDiff 源码

hadoop FileDiffList 源码

0  赞