hadoop KeyValueBasedTimelineStore 源码

  • 2022-10-20
  • 浏览 (88)

haddop KeyValueBasedTimelineStore 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.timeline;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;

import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
import static org.apache.hadoop.yarn.server.timeline.TimelineStoreMapAdapter.CloseableIterator;

/**
 * Map based implementation of {@link TimelineStore}. A hash map
 * implementation should be connected to this implementation through a
 * {@link TimelineStoreMapAdapter}.
 *
 * The methods are synchronized to avoid concurrent modifications.
 *
 */
@Private
@Unstable
abstract class KeyValueBasedTimelineStore
    extends AbstractService implements TimelineStore {

  protected TimelineStoreMapAdapter<EntityIdentifier, TimelineEntity> entities;
  protected TimelineStoreMapAdapter<EntityIdentifier, Long> entityInsertTimes;
  protected TimelineStoreMapAdapter<String, TimelineDomain> domainById;
  protected TimelineStoreMapAdapter<String, Set<TimelineDomain>> domainsByOwner;

  private boolean serviceStopped = false;

  private static final Logger LOG
      = LoggerFactory.getLogger(KeyValueBasedTimelineStore.class);

  public KeyValueBasedTimelineStore() {
    super(KeyValueBasedTimelineStore.class.getName());
  }

  public KeyValueBasedTimelineStore(String name) {
    super(name);
  }

  public synchronized boolean getServiceStopped() {
    return serviceStopped;
  }

  @Override
  protected synchronized void serviceStop() throws Exception {
    serviceStopped = true;
    super.serviceStop();
  }

  @Override
  public synchronized TimelineEntities getEntities(String entityType, Long limit,
      Long windowStart, Long windowEnd, String fromId, Long fromTs,
      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
      EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
    if (getServiceStopped()) {
      LOG.info("Service stopped, return null for the storage");
      return null;
    }
    if (limit == null) {
      limit = DEFAULT_LIMIT;
    }
    if (windowStart == null) {
      windowStart = Long.MIN_VALUE;
    }
    if (windowEnd == null) {
      windowEnd = Long.MAX_VALUE;
    }
    if (fields == null) {
      fields = EnumSet.allOf(Field.class);
    }

    TimelineEntity firstEntity = null;
    if (fromId != null) {
      firstEntity = entities.get(new EntityIdentifier(fromId,
          entityType));
      if (firstEntity == null) {
        return new TimelineEntities();
      }
    }

    List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();

    try(CloseableIterator<TimelineEntity> entityIterator =
        firstEntity == null ? entities.valueSetIterator() :
            entities.valueSetIterator(firstEntity)) {
      while (entityIterator.hasNext()) {
        TimelineEntity entity = entityIterator.next();
        if (entitiesSelected.size() >= limit) {
          break;
        }
        if (!entity.getEntityType().equals(entityType)) {
          continue;
        }
        if (entity.getStartTime() <= windowStart) {
          continue;
        }
        if (entity.getStartTime() > windowEnd) {
          continue;
        }
        if (fromTs != null && entityInsertTimes.get(
            new EntityIdentifier(entity.getEntityId(), entity.getEntityType()))
            > fromTs) {
          continue;
        }
        if (primaryFilter != null && !KeyValueBasedTimelineStoreUtils
            .matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
          continue;
        }
        if (secondaryFilters != null) { // AND logic
          boolean flag = true;
          for (NameValuePair secondaryFilter : secondaryFilters) {
            if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils
                .matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter)
                && !KeyValueBasedTimelineStoreUtils
                .matchFilter(entity.getOtherInfo(), secondaryFilter)) {
              flag = false;
              break;
            }
          }
          if (!flag) {
            continue;
          }
        }
        if (entity.getDomainId() == null) {
          entity.setDomainId(DEFAULT_DOMAIN_ID);
        }
        if (checkAcl == null || checkAcl.check(entity)) {
          entitiesSelected.add(entity);
        }
      }
    }

    List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
    for (TimelineEntity entitySelected : entitiesSelected) {
      entitiesToReturn.add(KeyValueBasedTimelineStoreUtils.maskFields(
          entitySelected, fields));
    }
    Collections.sort(entitiesToReturn);
    TimelineEntities entitiesWrapper = new TimelineEntities();
    entitiesWrapper.setEntities(entitiesToReturn);
    return entitiesWrapper;
  }

  @Override
  public synchronized TimelineEntity getEntity(String entityId, String entityType,
      EnumSet<Field> fieldsToRetrieve) {
    if (getServiceStopped()) {
      LOG.info("Service stopped, return null for the storage");
      return null;
    }
    if (fieldsToRetrieve == null) {
      fieldsToRetrieve = EnumSet.allOf(Field.class);
    }
    TimelineEntity
        entity = entities.get(new EntityIdentifier(entityId, entityType));
    if (entity == null) {
      return null;
    } else {
      return KeyValueBasedTimelineStoreUtils.maskFields(
          entity, fieldsToRetrieve);
    }
  }

  @Override
  public synchronized TimelineEvents getEntityTimelines(String entityType,
      SortedSet<String> entityIds, Long limit, Long windowStart,
      Long windowEnd,
      Set<String> eventTypes) {
    if (getServiceStopped()) {
      LOG.info("Service stopped, return null for the storage");
      return null;
    }
    TimelineEvents allEvents = new TimelineEvents();
    if (entityIds == null) {
      return allEvents;
    }
    if (limit == null) {
      limit = DEFAULT_LIMIT;
    }
    if (windowStart == null) {
      windowStart = Long.MIN_VALUE;
    }
    if (windowEnd == null) {
      windowEnd = Long.MAX_VALUE;
    }
    for (String entityId : entityIds) {
      EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
      TimelineEntity entity = entities.get(entityID);
      if (entity == null) {
        continue;
      }
      EventsOfOneEntity events = new EventsOfOneEntity();
      events.setEntityId(entityId);
      events.setEntityType(entityType);
      for (TimelineEvent event : entity.getEvents()) {
        if (events.getEvents().size() >= limit) {
          break;
        }
        if (event.getTimestamp() <= windowStart) {
          continue;
        }
        if (event.getTimestamp() > windowEnd) {
          continue;
        }
        if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
          continue;
        }
        events.addEvent(event);
      }
      allEvents.addEvent(events);
    }
    return allEvents;
  }

  @Override
  public TimelineDomain getDomain(String domainId)
      throws IOException {
    if (getServiceStopped()) {
      LOG.info("Service stopped, return null for the storage");
      return null;
    }
    TimelineDomain domain = domainById.get(domainId);
    if (domain == null) {
      return null;
    } else {
      return KeyValueBasedTimelineStoreUtils.createTimelineDomain(
          domain.getId(),
          domain.getDescription(),
          domain.getOwner(),
          domain.getReaders(),
          domain.getWriters(),
          domain.getCreatedTime(),
          domain.getModifiedTime());
    }
  }

  @Override
  public TimelineDomains getDomains(String owner)
      throws IOException {
    if (getServiceStopped()) {
      LOG.info("Service stopped, return null for the storage");
      return null;
    }
    List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
    Set<TimelineDomain> domainsOfOneOwner = domainsByOwner.get(owner);
    if (domainsOfOneOwner == null) {
      return new TimelineDomains();
    }
    for (TimelineDomain domain : domainsByOwner.get(owner)) {
      TimelineDomain domainToReturn = KeyValueBasedTimelineStoreUtils
          .createTimelineDomain(
              domain.getId(),
              domain.getDescription(),
              domain.getOwner(),
              domain.getReaders(),
              domain.getWriters(),
              domain.getCreatedTime(),
              domain.getModifiedTime());
      domains.add(domainToReturn);
    }
    Collections.sort(domains, new Comparator<TimelineDomain>() {
      @Override
      public int compare(
          TimelineDomain domain1, TimelineDomain domain2) {
         int result = domain2.getCreatedTime().compareTo(
             domain1.getCreatedTime());
         if (result == 0) {
           return domain2.getModifiedTime().compareTo(
               domain1.getModifiedTime());
         } else {
           return result;
         }
      }
    });
    TimelineDomains domainsToReturn = new TimelineDomains();
    domainsToReturn.addDomains(domains);
    return domainsToReturn;
  }

  @Override
  public synchronized TimelinePutResponse put(TimelineEntities data) {
    TimelinePutResponse response = new TimelinePutResponse();
    if (getServiceStopped()) {
      LOG.info("Service stopped, return null for the storage");
      TimelinePutError error = new TimelinePutError();
      error.setErrorCode(TimelinePutError.IO_EXCEPTION);
      response.addError(error);
      return response;
    }
    for (TimelineEntity entity : data.getEntities()) {
      EntityIdentifier entityId =
          new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
      // store entity info in memory
      TimelineEntity existingEntity = entities.get(entityId);
      boolean needsPut = false;
      if (existingEntity == null) {
        existingEntity = new TimelineEntity();
        existingEntity.setEntityId(entity.getEntityId());
        existingEntity.setEntityType(entity.getEntityType());
        existingEntity.setStartTime(entity.getStartTime());
        if (entity.getDomainId() == null ||
            entity.getDomainId().length() == 0) {
          TimelinePutError error = new TimelinePutError();
          error.setEntityId(entityId.getId());
          error.setEntityType(entityId.getType());
          error.setErrorCode(TimelinePutError.NO_DOMAIN);
          response.addError(error);
          continue;
        }
        existingEntity.setDomainId(entity.getDomainId());
        // insert a new entity to the storage, update insert time map
        entityInsertTimes.put(entityId, System.currentTimeMillis());
        needsPut = true;
      }
      if (entity.getEvents() != null) {
        if (existingEntity.getEvents() == null) {
          existingEntity.setEvents(entity.getEvents());
        } else {
          existingEntity.addEvents(entity.getEvents());
        }
        Collections.sort(existingEntity.getEvents());
        needsPut = true;
      }
      // check startTime
      if (existingEntity.getStartTime() == null) {
        if (existingEntity.getEvents() == null
            || existingEntity.getEvents().isEmpty()) {
          TimelinePutError error = new TimelinePutError();
          error.setEntityId(entityId.getId());
          error.setEntityType(entityId.getType());
          error.setErrorCode(TimelinePutError.NO_START_TIME);
          response.addError(error);
          entities.remove(entityId);
          entityInsertTimes.remove(entityId);
          continue;
        } else {
          Long min = Long.MAX_VALUE;
          for (TimelineEvent e : entity.getEvents()) {
            if (min > e.getTimestamp()) {
              min = e.getTimestamp();
            }
          }
          existingEntity.setStartTime(min);
          needsPut = true;
        }
      }
      if (entity.getPrimaryFilters() != null) {
        if (existingEntity.getPrimaryFilters() == null) {
          existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
        }
        for (Entry<String, Set<Object>> pf :
            entity.getPrimaryFilters().entrySet()) {
          for (Object pfo : pf.getValue()) {
            existingEntity.addPrimaryFilter(pf.getKey(),
                KeyValueBasedTimelineStoreUtils.compactNumber(pfo));
            needsPut = true;
          }
        }
      }
      if (entity.getOtherInfo() != null) {
        if (existingEntity.getOtherInfo() == null) {
          existingEntity.setOtherInfo(new HashMap<String, Object>());
        }
        for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
          existingEntity.addOtherInfo(info.getKey(),
              KeyValueBasedTimelineStoreUtils.compactNumber(info.getValue()));
          needsPut = true;
        }
      }
      if (needsPut) {
        entities.put(entityId, existingEntity);
      }

      // relate it to other entities
      if (entity.getRelatedEntities() == null) {
        continue;
      }
      for (Entry<String, Set<String>> partRelatedEntities : entity
          .getRelatedEntities().entrySet()) {
        if (partRelatedEntities == null) {
          continue;
        }
        for (String idStr : partRelatedEntities.getValue()) {
          EntityIdentifier relatedEntityId =
              new EntityIdentifier(idStr, partRelatedEntities.getKey());
          TimelineEntity relatedEntity = entities.get(relatedEntityId);
          if (relatedEntity != null) {
            if (relatedEntity.getDomainId().equals(
                existingEntity.getDomainId())) {
              relatedEntity.addRelatedEntity(
                  existingEntity.getEntityType(), existingEntity.getEntityId());
              entities.put(relatedEntityId, relatedEntity);
            } else {
              // in this case the entity will be put, but the relation will be
              // ignored
              TimelinePutError error = new TimelinePutError();
              error.setEntityType(existingEntity.getEntityType());
              error.setEntityId(existingEntity.getEntityId());
              error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
              response.addError(error);
            }
          } else {
            relatedEntity = new TimelineEntity();
            relatedEntity.setEntityId(relatedEntityId.getId());
            relatedEntity.setEntityType(relatedEntityId.getType());
            relatedEntity.setStartTime(existingEntity.getStartTime());
            relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
                existingEntity.getEntityId());
            relatedEntity.setDomainId(existingEntity.getDomainId());
            entities.put(relatedEntityId, relatedEntity);
            entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
          }
        }
      }
    }
    return response;
  }

  public void put(TimelineDomain domain) throws IOException {
    if (getServiceStopped()) {
      LOG.info("Service stopped, return null for the storage");
      return;
    }
    TimelineDomain domainToReplace =
        domainById.get(domain.getId());
    Long currentTimestamp = System.currentTimeMillis();
    TimelineDomain domainToStore
        = KeyValueBasedTimelineStoreUtils.createTimelineDomain(
        domain.getId(), domain.getDescription(), domain.getOwner(),
        domain.getReaders(), domain.getWriters(),
        (domainToReplace == null ?
            currentTimestamp : domainToReplace.getCreatedTime()),
        currentTimestamp);
    domainById.put(domainToStore.getId(), domainToStore);
    Set<TimelineDomain> domainsByOneOwner =
        domainsByOwner.get(domainToStore.getOwner());
    if (domainsByOneOwner == null) {
      domainsByOneOwner = new HashSet<TimelineDomain>();
      domainsByOwner.put(domainToStore.getOwner(), domainsByOneOwner);
    }
    if (domainToReplace != null) {
      domainsByOneOwner.remove(domainToReplace);
    }
    domainsByOneOwner.add(domainToStore);
  }

  private static class KeyValueBasedTimelineStoreUtils {

    static TimelineDomain createTimelineDomain(
        String id, String description, String owner,
        String readers, String writers,
        Long createdTime, Long modifiedTime) {
      TimelineDomain domainToStore = new TimelineDomain();
      domainToStore.setId(id);
      domainToStore.setDescription(description);
      domainToStore.setOwner(owner);
      domainToStore.setReaders(readers);
      domainToStore.setWriters(writers);
      domainToStore.setCreatedTime(createdTime);
      domainToStore.setModifiedTime(modifiedTime);
      return domainToStore;
    }

    static TimelineEntity maskFields(
        TimelineEntity entity, EnumSet<Field> fields) {
      // Conceal the fields that are not going to be exposed
      TimelineEntity entityToReturn = new TimelineEntity();
      entityToReturn.setEntityId(entity.getEntityId());
      entityToReturn.setEntityType(entity.getEntityType());
      entityToReturn.setStartTime(entity.getStartTime());
      entityToReturn.setDomainId(entity.getDomainId());
      // Deep copy
      if (fields.contains(Field.EVENTS)) {
        entityToReturn.addEvents(entity.getEvents());
      } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
        entityToReturn.addEvent(entity.getEvents().get(0));
      } else {
        entityToReturn.setEvents(null);
      }
      if (fields.contains(Field.RELATED_ENTITIES)) {
        entityToReturn.addRelatedEntities(entity.getRelatedEntities());
      } else {
        entityToReturn.setRelatedEntities(null);
      }
      if (fields.contains(Field.PRIMARY_FILTERS)) {
        entityToReturn.addPrimaryFilters(entity.getPrimaryFilters());
      } else {
        entityToReturn.setPrimaryFilters(null);
      }
      if (fields.contains(Field.OTHER_INFO)) {
        entityToReturn.addOtherInfo(entity.getOtherInfo());
      } else {
        entityToReturn.setOtherInfo(null);
      }
      return entityToReturn;
    }

    static boolean matchFilter(Map<String, Object> tags,
        NameValuePair filter) {
      Object value = tags.get(filter.getName());
      if (value == null) { // doesn't have the filter
        return false;
      } else if (!value.equals(filter.getValue())) { // doesn't match the filter
        return false;
      }
      return true;
    }

    static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
        NameValuePair filter) {
      Set<Object> value = tags.get(filter.getName());
      if (value == null) { // doesn't have the filter
        return false;
      } else {
        return value.contains(filter.getValue());
      }
    }

    static Object compactNumber(Object o) {
      if (o instanceof Long) {
        Long l = (Long) o;
        if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
          return l.intValue();
        }
      }
      return o;
    }

  }

}

相关信息

hadoop 源码目录

相关文章

hadoop EntityIdentifier 源码

hadoop GenericObjectMapper 源码

hadoop LeveldbTimelineStore 源码

hadoop MemoryTimelineStore 源码

hadoop NameValuePair 源码

hadoop RollingLevelDB 源码

hadoop RollingLevelDBTimelineStore 源码

hadoop TimelineDataManager 源码

hadoop TimelineDataManagerMetrics 源码

hadoop TimelineReader 源码

0  赞