hadoop InMemoryAliasMapProtocolClientSideTranslatorPB 源码

  • 2022-10-20
  • 浏览 (233)

haddop InMemoryAliasMapProtocolClientSideTranslatorPB 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.protocolPB;

import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.InMemoryAliasMapFailoverProxyProvider;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
import static org.apache.hadoop.hdfs.DFSUtil.addKeySuffixes;
import static org.apache.hadoop.hdfs.DFSUtil.createUri;
import static org.apache.hadoop.hdfs.DFSUtilClient.getNameServiceIds;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX;
import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.*;

/**
 * This class is the client side translator to translate requests made to the
 * {@link InMemoryAliasMapProtocol} interface to the RPC server implementing
 * {@link AliasMapProtocolPB}.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InMemoryAliasMapProtocolClientSideTranslatorPB
    implements InMemoryAliasMapProtocol, Closeable {

  private static final Logger LOG =
      LoggerFactory
          .getLogger(InMemoryAliasMapProtocolClientSideTranslatorPB.class);

  private AliasMapProtocolPB rpcProxy;

  public InMemoryAliasMapProtocolClientSideTranslatorPB(
      AliasMapProtocolPB rpcProxy) {
    this.rpcProxy = rpcProxy;
  }

  public static Collection<InMemoryAliasMapProtocol> init(Configuration conf) {
    Collection<InMemoryAliasMapProtocol> aliasMaps = new ArrayList<>();
    // Try to connect to all configured nameservices as it is not known which
    // nameservice supports the AliasMap.
    for (String nsId : getNameServiceIds(conf)) {
      try {
        URI namenodeURI = null;
        Configuration newConf = new Configuration(conf);
        if (HAUtil.isHAEnabled(conf, nsId)) {
          // set the failover-proxy provider if HA is enabled.
          newConf.setClass(
              addKeySuffixes(PROXY_PROVIDER_KEY_PREFIX, nsId),
              InMemoryAliasMapFailoverProxyProvider.class,
              AbstractNNFailoverProxyProvider.class);
          namenodeURI = new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId);
        } else {
          String key =
              addKeySuffixes(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, nsId);
          String addr = conf.get(key);
          if (addr != null) {
            namenodeURI = createUri(HdfsConstants.HDFS_URI_SCHEME,
                NetUtils.createSocketAddr(addr));
          }
        }
        if (namenodeURI != null) {
          aliasMaps.add(NameNodeProxies
              .createProxy(newConf, namenodeURI, InMemoryAliasMapProtocol.class)
              .getProxy());
          LOG.info("Connected to InMemoryAliasMap at {}", namenodeURI);
        }
      } catch (IOException | URISyntaxException e) {
        LOG.warn("Exception in connecting to InMemoryAliasMap for nameservice "
            + "{}: {}", nsId, e);
      }
    }
    // if a separate AliasMap is configured using
    // DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, try to connect it.
    if (conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS) != null) {
      URI uri = createUri("hdfs", NetUtils.createSocketAddr(
          conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS)));
      try {
        aliasMaps.add(NameNodeProxies
            .createProxy(conf, uri, InMemoryAliasMapProtocol.class).getProxy());
        LOG.info("Connected to InMemoryAliasMap at {}", uri);
      } catch (IOException e) {
        LOG.warn("Exception in connecting to InMemoryAliasMap at {}: {}", uri,
            e);
      }
    }
    return aliasMaps;
  }

  @Override
  public InMemoryAliasMap.IterationResult list(Optional<Block> marker)
      throws IOException {
    ListRequestProto.Builder builder = ListRequestProto.newBuilder();
    if (marker.isPresent()) {
      builder.setMarker(PBHelperClient.convert(marker.get()));
    }
    ListRequestProto request = builder.build();
    try {
      ListResponseProto response = rpcProxy.list(null, request);
      List<KeyValueProto> fileRegionsList = response.getFileRegionsList();

      List<FileRegion> fileRegions = fileRegionsList
          .stream()
          .map(kv -> new FileRegion(
              PBHelperClient.convert(kv.getKey()),
              PBHelperClient.convert(kv.getValue())
          ))
          .collect(Collectors.toList());
      BlockProto nextMarker = response.getNextMarker();

      if (nextMarker.isInitialized()) {
        return new InMemoryAliasMap.IterationResult(fileRegions,
            Optional.of(PBHelperClient.convert(nextMarker)));
      } else {
        return new InMemoryAliasMap.IterationResult(fileRegions,
            Optional.empty());
      }

    } catch (ServiceException e) {
      throw ProtobufHelper.getRemoteException(e);
    }
  }

  @Nonnull
  @Override
  public Optional<ProvidedStorageLocation> read(@Nonnull Block block)
      throws IOException {

    if (block == null) {
      throw new IOException("Block cannot be null");
    }
    ReadRequestProto request =
        ReadRequestProto
            .newBuilder()
            .setKey(PBHelperClient.convert(block))
            .build();
    try {
      ReadResponseProto response = rpcProxy.read(null, request);

      ProvidedStorageLocationProto providedStorageLocation =
          response.getValue();
      if (providedStorageLocation.isInitialized()) {
        return Optional.of(PBHelperClient.convert(providedStorageLocation));
      }
      return Optional.empty();

    } catch (ServiceException e) {
      throw ProtobufHelper.getRemoteException(e);
    }
  }

  @Override
  public void write(@Nonnull Block block,
      @Nonnull ProvidedStorageLocation providedStorageLocation)
      throws IOException {
    if (block == null || providedStorageLocation == null) {
      throw new IOException("Provided block and location cannot be null");
    }
    WriteRequestProto request =
        WriteRequestProto
            .newBuilder()
            .setKeyValuePair(KeyValueProto.newBuilder()
                .setKey(PBHelperClient.convert(block))
                .setValue(PBHelperClient.convert(providedStorageLocation))
                .build())
            .build();

    try {
      rpcProxy.write(null, request);
    } catch (ServiceException e) {
      throw ProtobufHelper.getRemoteException(e);
    }
  }

  @Override
  public String getBlockPoolId() throws IOException {
    try {
      BlockPoolResponseProto response = rpcProxy.getBlockPoolId(null,
          BlockPoolRequestProto.newBuilder().build());
      return response.getBlockPoolId();
    } catch (ServiceException e) {
      throw ProtobufHelper.getRemoteException(e);
    }
  }

  @Override
  public void close() throws IOException {
    LOG.info("Stopping rpcProxy in" +
        "InMemoryAliasMapProtocolClientSideTranslatorPB");
    if (rpcProxy != null) {
      RPC.stopProxy(rpcProxy);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AliasMapProtocolPB 源码

hadoop AliasMapProtocolServerSideTranslatorPB 源码

hadoop ClientDatanodeProtocolServerSideTranslatorPB 源码

hadoop ClientNamenodeProtocolServerSideTranslatorPB 源码

hadoop DatanodeLifelineProtocolClientSideTranslatorPB 源码

hadoop DatanodeLifelineProtocolPB 源码

hadoop DatanodeLifelineProtocolServerSideTranslatorPB 源码

hadoop DatanodeProtocolClientSideTranslatorPB 源码

hadoop DatanodeProtocolPB 源码

hadoop DatanodeProtocolServerSideTranslatorPB 源码

0  赞