dubbo ServiceDiscoveryRegistryDirectory 源码

  • 2022-10-20
  • 浏览 (216)

dubbo ServiceDiscoveryRegistryDirectory 代码

文件路径:/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dubbo.registry.client;

import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.registry.AddressListener;
import org.apache.dubbo.registry.Constants;
import org.apache.dubbo.registry.ProviderFirstParams;
import org.apache.dubbo.registry.integration.AbstractConfiguratorListener;
import org.apache.dubbo.registry.integration.DynamicDirectory;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcServiceContext;
import org.apache.dubbo.rpc.cluster.Configurator;
import org.apache.dubbo.rpc.cluster.RouterChain;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.cluster.router.state.BitList;
import org.apache.dubbo.rpc.model.ModuleModel;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.dubbo.common.constants.CommonConstants.DISABLED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_UNSUPPORTED;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_HASHMAP_LOAD_FACTOR;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_TYPE;
import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX;
import static org.apache.dubbo.rpc.model.ScopeModelUtil.getModuleModel;

public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
    private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(ServiceDiscoveryRegistryDirectory.class);

    /**
     * instance address to invoker mapping.
     * The initial value is null and the midway may be assigned to null, please use the local variable reference
     */
    private volatile Map<ProtocolServiceKeyWithAddress, Invoker<T>> urlInvokerMap;
    private volatile ReferenceConfigurationListener referenceConfigurationListener;
    private volatile boolean enableConfigurationListen = true;
    private volatile List<URL> originalUrls = null;
    private volatile Map<String, String> overrideQueryMap;
    private final Set<String> providerFirstParams;
    private final ModuleModel moduleModel;
    private final ProtocolServiceKey consumerProtocolServiceKey;
    private final Map<ProtocolServiceKey, URL> customizedConsumerUrlMap = new ConcurrentHashMap<>();

    public ServiceDiscoveryRegistryDirectory(Class<T> serviceType, URL url) {
        super(serviceType, url);
        moduleModel = getModuleModel(url.getScopeModel());

        Set<ProviderFirstParams> providerFirstParams = url.getOrDefaultApplicationModel().getExtensionLoader(ProviderFirstParams.class).getSupportedExtensionInstances();
        if (CollectionUtils.isEmpty(providerFirstParams)) {
            this.providerFirstParams = null;
        } else {
            if (providerFirstParams.size() == 1) {
                this.providerFirstParams = Collections.unmodifiableSet(providerFirstParams.iterator().next().params());
            } else {
                Set<String> params = new HashSet<>();
                for (ProviderFirstParams paramsFilter : providerFirstParams) {
                    if (paramsFilter.params() == null) {
                        break;
                    }
                    params.addAll(paramsFilter.params());
                }
                this.providerFirstParams = Collections.unmodifiableSet(params);
            }
        }

        String protocol = consumerUrl.getParameter(PROTOCOL_KEY, consumerUrl.getProtocol());
        consumerProtocolServiceKey = new ProtocolServiceKey(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(),
            !CommonConstants.CONSUMER.equals(protocol) ? protocol : null);
    }

    @Override
    public void subscribe(URL url) {
        if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
            enableConfigurationListen = true;
            getConsumerConfigurationListener(moduleModel).addNotifyListener(this);
            referenceConfigurationListener = new ReferenceConfigurationListener(this.moduleModel, this, url);
        } else {
            enableConfigurationListen = false;
        }
        super.subscribe(url);
    }

    private ConsumerConfigurationListener getConsumerConfigurationListener(ModuleModel moduleModel) {
        return moduleModel.getBeanFactory().getOrRegisterBean(ConsumerConfigurationListener.class,
            type -> new ConsumerConfigurationListener(moduleModel));
    }

    @Override
    public void unSubscribe(URL url) {
        super.unSubscribe(url);
        this.originalUrls = null;
        if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
            getConsumerConfigurationListener(moduleModel).removeNotifyListener(this);
            referenceConfigurationListener.stop();
        }
    }

    @Override
    public void destroy() {
        super.destroy();
        if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
            getConsumerConfigurationListener(moduleModel).removeNotifyListener(this);
            referenceConfigurationListener.stop();
        }
    }

    @Override
    public void buildRouterChain(URL url) {
        this.setRouterChain(RouterChain.buildChain(getInterface(), url.addParameter(REGISTRY_TYPE_KEY, SERVICE_REGISTRY_TYPE)));
    }

    @Override
    public synchronized void notify(List<URL> instanceUrls) {
        if (isDestroyed()) {
            return;
        }
        // Set the context of the address notification thread.
        RpcServiceContext.getServiceContext().setConsumerUrl(getConsumerUrl());

        //  3.x added for extend URL address
        ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);
        List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
        if (supportedListeners != null && !supportedListeners.isEmpty()) {
            for (AddressListener addressListener : supportedListeners) {
                instanceUrls = addressListener.notify(instanceUrls, getConsumerUrl(), this);
            }
        }

        refreshOverrideAndInvoker(instanceUrls);
    }

    // RefreshOverrideAndInvoker will be executed by registryCenter and configCenter, so it should be synchronized.
    private synchronized void refreshOverrideAndInvoker(List<URL> instanceUrls) {
        // mock zookeeper://xxx?mock=return null
        refreshInvoker(instanceUrls);
    }

    private InstanceAddressURL overrideWithConfigurator(InstanceAddressURL providerUrl) {
        // override url with configurator from "app-name.configurators"
        providerUrl = overrideWithConfigurators(getConsumerConfigurationListener(moduleModel).getConfigurators(), providerUrl);

        // override url with configurator from configurators from "service-name.configurators"
        if (referenceConfigurationListener != null) {
            providerUrl = overrideWithConfigurators(referenceConfigurationListener.getConfigurators(), providerUrl);
        }

        return providerUrl;
    }

    private InstanceAddressURL overrideWithConfigurators(List<Configurator> configurators, InstanceAddressURL url) {
        if (CollectionUtils.isNotEmpty(configurators)) {
            // wrap url
            OverrideInstanceAddressURL overrideInstanceAddressURL = new OverrideInstanceAddressURL(url);
            if (overrideQueryMap != null) {
                // override app-level configs
                overrideInstanceAddressURL = (OverrideInstanceAddressURL) overrideInstanceAddressURL.addParameters(overrideQueryMap);
            }
            for (Configurator configurator : configurators) {
                overrideInstanceAddressURL = (OverrideInstanceAddressURL) configurator.configure(overrideInstanceAddressURL);
            }
            return overrideInstanceAddressURL;
        }
        return url;
    }

    @Override
    public boolean isServiceDiscovery() {
        return true;
    }

    /**
     * This implementation makes sure all application names related to serviceListener received address notification.
     * <p>
     * FIXME, make sure deprecated "interface-application" mapping item be cleared in time.
     */
    @Override
    public boolean isNotificationReceived() {
        return serviceListener == null || serviceListener.isDestroyed()
            || serviceListener.getAllInstances().size() == serviceListener.getServiceNames().size();
    }

    private void refreshInvoker(List<URL> invokerUrls) {
        Assert.notNull(invokerUrls, "invokerUrls should not be null, use EMPTY url to clear current addresses.");
        this.originalUrls = invokerUrls;

        if (invokerUrls.size() == 1 && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            logger.warn("Received url with EMPTY protocol, will clear all available addresses.");
            this.forbidden = true; // Forbid to access
            routerChain.setInvokers(BitList.emptyList());
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow accessing
            if (CollectionUtils.isEmpty(invokerUrls)) {
                logger.warn("Received empty url list, will ignore for protection purpose.");
                return;
            }

            // use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().
            Map<ProtocolServiceKeyWithAddress, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
            // can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().
            Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap = null;
            if (localUrlInvokerMap != null) {
                // the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
                oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
                localUrlInvokerMap.forEach(oldUrlInvokerMap::put);
            }
            Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
            logger.info("Refreshed invoker size " + newUrlInvokerMap.size());

            if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
                logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")"));
                return;
            }
            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
            this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));
            // pre-route and build cache
            routerChain.setInvokers(this.getInvokers());
            this.urlInvokerMap = newUrlInvokerMap;

            if (oldUrlInvokerMap != null) {
                try {
                    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
                } catch (Exception e) {
                    logger.warn("destroyUnusedInvokers error. ", e);
                }
            }
        }

        // notify invokers refreshed
        this.invokersChanged();
    }

    /**
     * Turn urls into invokers, and if url has been refer, will not re-reference.
     * the items that will be put into newUrlInvokeMap will be removed from oldUrlInvokerMap.
     *
     * @param oldUrlInvokerMap it might be modified during the process.
     * @param urls
     * @return invokers
     */
    private Map<ProtocolServiceKeyWithAddress, Invoker<T>> toInvokers(Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {
        Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }

        for (URL url : urls) {
            InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url;
            if (EMPTY_PROTOCOL.equals(instanceAddressURL.getProtocol())) {
                continue;
            }
            if (!getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).hasExtension(instanceAddressURL.getProtocol())) {

                // 4-1 - Unsupported protocol

                logger.error(PROTOCOL_UNSUPPORTED, "protocol extension does not installed", "", "Unsupported protocol.",
                    new IllegalStateException("Unsupported protocol " + instanceAddressURL.getProtocol() +
                    " in notified url: " + instanceAddressURL + " from registry " + getUrl().getAddress() +
                    " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                    getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).getSupportedExtensions()));

                continue;
            }

            instanceAddressURL.setProviderFirstParams(providerFirstParams);

            // Override provider urls if needed
            if (enableConfigurationListen) {
                instanceAddressURL = overrideWithConfigurator(instanceAddressURL);
            }

            // filter all the service available (version wildcard, group wildcard, protocol wildcard)
            int port = instanceAddressURL.getPort();
            List<ProtocolServiceKey> matchedProtocolServiceKeys = instanceAddressURL.getMetadataInfo()
                .getMatchedServiceInfos(consumerProtocolServiceKey)
                .stream()
                .filter(serviceInfo -> serviceInfo.getPort() <= 0 || serviceInfo.getPort() == port)
                .map(MetadataInfo.ServiceInfo::getProtocolServiceKey)
                .collect(Collectors.toList());

            // see org.apache.dubbo.common.ProtocolServiceKey.isSameWith
            // check if needed to override the consumer url
            boolean shouldWrap = matchedProtocolServiceKeys.size() != 1 || !consumerProtocolServiceKey.isSameWith(matchedProtocolServiceKeys.get(0));

            for (ProtocolServiceKey matchedProtocolServiceKey : matchedProtocolServiceKeys) {
                ProtocolServiceKeyWithAddress protocolServiceKeyWithAddress = new ProtocolServiceKeyWithAddress(matchedProtocolServiceKey, instanceAddressURL.getAddress());
                Invoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.get(protocolServiceKeyWithAddress);
                if (invoker == null || urlChanged(invoker, instanceAddressURL, matchedProtocolServiceKey)) { // Not in the cache, refer again
                    try {
                        boolean enabled;
                        if (instanceAddressURL.hasParameter(DISABLED_KEY)) {
                            enabled = !instanceAddressURL.getParameter(DISABLED_KEY, false);
                        } else {
                            enabled = instanceAddressURL.getParameter(ENABLED_KEY, true);
                        }
                        if (enabled) {
                            if (shouldWrap) {
                                URL newConsumerUrl = customizedConsumerUrlMap.computeIfAbsent(matchedProtocolServiceKey,
                                    k -> consumerUrl.setProtocol(k.getProtocol())
                                        .addParameter(CommonConstants.GROUP_KEY, k.getGroup())
                                        .addParameter(CommonConstants.VERSION_KEY, k.getVersion()));
                                RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
                                invoker = new InstanceWrappedInvoker<>(protocol.refer(serviceType, instanceAddressURL), newConsumerUrl, matchedProtocolServiceKey);
                            } else {
                                invoker = protocol.refer(serviceType, instanceAddressURL);
                            }
                        }
                    } catch (Throwable t) {
                        logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t);
                    }
                    if (invoker != null) { // Put new invoker in cache
                        newUrlInvokerMap.put(protocolServiceKeyWithAddress, invoker);
                    }
                } else {
                    newUrlInvokerMap.put(protocolServiceKeyWithAddress, invoker);
                    oldUrlInvokerMap.remove(protocolServiceKeyWithAddress, invoker);
                }
            }
        }
        return newUrlInvokerMap;
    }

    private boolean urlChanged(Invoker<T> invoker, InstanceAddressURL newURL, ProtocolServiceKey protocolServiceKey) {
        InstanceAddressURL oldURL = (InstanceAddressURL) invoker.getUrl();

        if (!newURL.getInstance().equals(oldURL.getInstance())) {
            return true;
        }

        if (oldURL instanceof OverrideInstanceAddressURL || newURL instanceof OverrideInstanceAddressURL) {
            if (!(oldURL instanceof OverrideInstanceAddressURL && newURL instanceof OverrideInstanceAddressURL)) {
                // sub-class changed
                return true;
            } else {
                if (!((OverrideInstanceAddressURL) oldURL).getOverrideParams().equals(((OverrideInstanceAddressURL) newURL).getOverrideParams())) {
                    return true;
                }
            }
        }

        MetadataInfo.ServiceInfo oldServiceInfo = oldURL.getMetadataInfo().getValidServiceInfo(protocolServiceKey.toString());
        if (null == oldServiceInfo) {
            return false;
        }

        return !oldServiceInfo.equals(newURL.getMetadataInfo().getValidServiceInfo(protocolServiceKey.toString()));
    }

    private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
        List<Invoker<T>> mergedInvokers = new ArrayList<>();
        Map<String, List<Invoker<T>>> groupMap = new HashMap<>();
        for (Invoker<T> invoker : invokers) {
            String group = invoker.getUrl().getGroup("");
            groupMap.computeIfAbsent(group, k -> new ArrayList<>());
            groupMap.get(group).add(invoker);
        }

        if (groupMap.size() == 1) {
            mergedInvokers.addAll(groupMap.values().iterator().next());
        } else if (groupMap.size() > 1) {
            for (List<Invoker<T>> groupList : groupMap.values()) {
                StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList);
                staticDirectory.buildRouterChain();
                mergedInvokers.add(cluster.join(staticDirectory, false));
            }
        } else {
            mergedInvokers = invokers;
        }
        return mergedInvokers;
    }

    /**
     * Close all invokers
     */
    @Override
    protected void destroyAllInvokers() {
        Map<ProtocolServiceKeyWithAddress, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        if (localUrlInvokerMap != null) {
            for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
                try {
                    invoker.destroyAll();
                } catch (Throwable t) {
                    logger.warn("Failed to destroy service " + serviceKey + " to provider " + invoker.getUrl(), t);
                }
            }
            localUrlInvokerMap.clear();
        }

        this.urlInvokerMap = null;
        this.destroyInvokers();
    }

    /**
     * Check whether the invoker in the cache needs to be destroyed
     * If set attribute of url: refer.autodestroy=false, the invokers will only increase without decreasing,there may be a refer leak
     *
     * @param oldUrlInvokerMap
     * @param newUrlInvokerMap
     */
    private void destroyUnusedInvokers(Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap, Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap) {
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            destroyAllInvokers();
            return;
        }

        if (oldUrlInvokerMap == null || oldUrlInvokerMap.size() == 0) {
            return;
        }

        for (Map.Entry<ProtocolServiceKeyWithAddress, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
            Invoker<T> invoker = entry.getValue();
            if (invoker != null) {
                try {
                    invoker.destroyAll();
                    if (logger.isDebugEnabled()) {
                        logger.debug("destroy invoker[" + invoker.getUrl() + "] success. ");
                    }
                } catch (Exception e) {
                    logger.warn("destroy invoker[" + invoker.getUrl() + "] failed. " + e.getMessage(), e);
                }
            }
        }
        logger.info(oldUrlInvokerMap.size() + " deprecated invokers deleted.");
    }

    private class ReferenceConfigurationListener extends AbstractConfiguratorListener {
        private final ServiceDiscoveryRegistryDirectory<?> directory;
        private final URL url;

        ReferenceConfigurationListener(ModuleModel moduleModel, ServiceDiscoveryRegistryDirectory<?> directory, URL url) {
            super(moduleModel);
            this.directory = directory;
            this.url = url;
            this.initWith(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX);
        }

        void stop() {
            this.stopListen(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX);
        }

        @Override
        protected void notifyOverrides() {
            // to notify configurator/router changes
            if (directory.originalUrls != null) {
                URL backup = RpcContext.getServiceContext().getConsumerUrl();
                RpcContext.getServiceContext().setConsumerUrl(directory.getConsumerUrl());
                directory.refreshOverrideAndInvoker(directory.originalUrls);
                RpcContext.getServiceContext().setConsumerUrl(backup);
            }
        }
    }

    private static class ConsumerConfigurationListener extends AbstractConfiguratorListener {
        private final List<ServiceDiscoveryRegistryDirectory<?>> listeners = new ArrayList<>();

        ConsumerConfigurationListener(ModuleModel moduleModel) {
            super(moduleModel);
        }

        void addNotifyListener(ServiceDiscoveryRegistryDirectory<?> listener) {
            if (listeners.size() == 0) {
                this.initWith(moduleModel.getApplicationModel().getApplicationName() + CONFIGURATORS_SUFFIX);
            }
            this.listeners.add(listener);
        }

        void removeNotifyListener(ServiceDiscoveryRegistryDirectory<?> listener) {
            this.listeners.remove(listener);
            if (listeners.size() == 0) {
                this.stopListen(moduleModel.getApplicationModel().getApplicationName() + CONFIGURATORS_SUFFIX);
            }
        }

        @Override
        protected void notifyOverrides() {
            listeners.forEach(listener -> {
                if (listener.originalUrls != null) {
                    URL backup = RpcContext.getServiceContext().getConsumerUrl();
                    RpcContext.getServiceContext().setConsumerUrl(listener.getConsumerUrl());
                    listener.refreshOverrideAndInvoker(listener.originalUrls);
                    RpcContext.getServiceContext().setConsumerUrl(backup);
                }
            });
        }
    }

    public static final class ProtocolServiceKeyWithAddress extends ProtocolServiceKey {
        private final String address;

        public ProtocolServiceKeyWithAddress(ProtocolServiceKey protocolServiceKey, String address) {
            super(protocolServiceKey.getInterfaceName(), protocolServiceKey.getVersion(), protocolServiceKey.getGroup(), protocolServiceKey.getProtocol());
            this.address = address;
        }

        public String getAddress() {
            return address;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            if (!super.equals(o)) {
                return false;
            }
            ProtocolServiceKeyWithAddress that = (ProtocolServiceKeyWithAddress) o;
            return Objects.equals(address, that.address);
        }

        @Override
        public int hashCode() {
            return Objects.hash(super.hashCode(), address);
        }
    }

    public static final class InstanceWrappedInvoker<T> implements Invoker<T> {
        private final Invoker<T> originInvoker;
        private final URL newConsumerUrl;
        private final ProtocolServiceKey protocolServiceKey;

        public InstanceWrappedInvoker(Invoker<T> originInvoker, URL newConsumerUrl, ProtocolServiceKey protocolServiceKey) {
            this.originInvoker = originInvoker;
            this.newConsumerUrl = newConsumerUrl;
            this.protocolServiceKey = protocolServiceKey;
        }

        @Override
        public Class<T> getInterface() {
            return originInvoker.getInterface();
        }

        @Override
        public Result invoke(Invocation invocation) throws RpcException {
            // override consumer url with real protocol service key
            RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
            // recreate invocation due to the protocol service key changed
            RpcInvocation copiedInvocation = new RpcInvocation(invocation.getTargetServiceUniqueName(),
                invocation.getServiceModel(), invocation.getMethodName(), invocation.getServiceName(), protocolServiceKey.toString(),
                invocation.getParameterTypes(), invocation.getArguments(), invocation.getObjectAttachments(),
                invocation.getInvoker(), invocation.getAttributes(),
                invocation instanceof RpcInvocation ? ((RpcInvocation) invocation).getInvokeMode() : null);
            copiedInvocation.setObjectAttachment(CommonConstants.GROUP_KEY, protocolServiceKey.getGroup());
            copiedInvocation.setObjectAttachment(CommonConstants.VERSION_KEY, protocolServiceKey.getVersion());
            return originInvoker.invoke(copiedInvocation);
        }

        @Override
        public URL getUrl() {
            RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
            return originInvoker.getUrl();
        }

        @Override
        public boolean isAvailable() {
            RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
            return originInvoker.isAvailable();
        }

        @Override
        public void destroy() {
            RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
            originInvoker.destroy();
        }
    }

}

相关信息

dubbo 源码目录

相关文章

dubbo AbstractServiceDiscovery 源码

dubbo AbstractServiceDiscoveryFactory 源码

dubbo DefaultRegistryClusterIdentifier 源码

dubbo DefaultServiceDiscoveryFactory 源码

dubbo DefaultServiceInstance 源码

dubbo FileSystemServiceDiscovery 源码

dubbo InstanceAddressURL 源码

dubbo NopServiceDiscovery 源码

dubbo OverrideInstanceAddressURL 源码

dubbo ReflectionBasedServiceDiscovery 源码

0  赞