dubbo ServiceDiscoveryRegistry 源码

  • 2022-10-20
  • 浏览 (290)

dubbo ServiceDiscoveryRegistry 代码

文件路径:/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dubbo.registry.client;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.metadata.AbstractServiceNameMapping;
import org.apache.dubbo.metadata.MappingChangedEvent;
import org.apache.dubbo.metadata.MappingListener;
import org.apache.dubbo.metadata.ServiceNameMapping;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.support.FailbackRegistry;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_TYPE;
import static org.apache.dubbo.common.function.ThrowableAction.execute;
import static org.apache.dubbo.common.utils.CollectionUtils.toTreeSet;
import static org.apache.dubbo.metadata.ServiceNameMapping.toStringKeys;
import static org.apache.dubbo.registry.client.ServiceDiscoveryFactory.getExtension;

/**
 * TODO, this bridge implementation is not necessary now, protocol can interact with service discovery directly.
 * <p>
 * ServiceDiscoveryRegistry is a very special Registry implementation, which is used to bridge the old interface-level service discovery model
 * with the new service discovery model introduced in 3.0 in a compatible manner.
 * <p>
 * It fully complies with the extension specification of the Registry SPI, but is different from the specific implementation of zookeeper and Nacos,
 * because it does not interact with any real third-party registry, but only with the relevant components of ServiceDiscovery in the process.
 * In short, it bridges the old interface model and the new service discovery model:
 * <p>
 * - register() aggregates interface level data into MetadataInfo by mainly interacting with MetadataService.
 * - subscribe() triggers the whole subscribe process of the application level service discovery model.
 * - Maps interface to applications depending on ServiceNameMapping.
 * - Starts the new service discovery listener (InstanceListener) and makes NotifierListeners part of the InstanceListener.
 */
public class ServiceDiscoveryRegistry extends FailbackRegistry {

    protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass());

    private final ServiceDiscovery serviceDiscovery;

    private final AbstractServiceNameMapping serviceNameMapping;

    /* apps - listener */
    private final Map<String, ServiceInstancesChangedListener> serviceListeners = new ConcurrentHashMap<>();
    private final Map<String, MappingListener> mappingListeners = new ConcurrentHashMap<>();
    /* This lock has the same scope and lifecycle as its corresponding instance listener.
    It's used to make sure that only one interface mapping to the same app list can do subscribe or unsubscribe at the same moment.
    And the lock should be destroyed when listener destroying its corresponding instance listener.
    * */
    private final ConcurrentMap<String, Lock> appSubscriptionLocks = new ConcurrentHashMap<>();

    public ServiceDiscoveryRegistry(URL registryURL, ApplicationModel applicationModel) {
        super(registryURL);
        this.serviceDiscovery = createServiceDiscovery(registryURL);
        this.serviceNameMapping = (AbstractServiceNameMapping) ServiceNameMapping.getDefaultExtension(registryURL.getScopeModel());
        super.applicationModel = applicationModel;
    }

    // Currently, for test purpose
    protected ServiceDiscoveryRegistry(URL registryURL, ServiceDiscovery serviceDiscovery, ServiceNameMapping serviceNameMapping) {
        super(registryURL);
        this.serviceDiscovery = serviceDiscovery;
        this.serviceNameMapping = (AbstractServiceNameMapping) serviceNameMapping;
    }

    public ServiceDiscovery getServiceDiscovery() {
        return serviceDiscovery;
    }

    /**
     * Create the {@link ServiceDiscovery} from the registry {@link URL}
     *
     * @param registryURL the {@link URL} to connect the registry
     * @return non-null
     */
    protected ServiceDiscovery createServiceDiscovery(URL registryURL) {
        return getServiceDiscovery(registryURL.addParameter(INTERFACE_KEY, ServiceDiscovery.class.getName())
            .removeParameter(REGISTRY_TYPE_KEY));
    }

    /**
     * Get the instance {@link ServiceDiscovery} from the registry {@link URL} using
     * {@link ServiceDiscoveryFactory} SPI
     *
     * @param registryURL the {@link URL} to connect the registry
     * @return
     */
    private ServiceDiscovery getServiceDiscovery(URL registryURL) {
        ServiceDiscoveryFactory factory = getExtension(registryURL);
        return factory.getServiceDiscovery(registryURL);
    }

    protected boolean shouldRegister(URL providerURL) {

        String side = providerURL.getSide();

        boolean should = PROVIDER_SIDE.equals(side); // Only register the Provider.

        if (!should) {
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("The URL[%s] should not be registered.", providerURL));
            }
        }

        if (!acceptable(providerURL)) {
            logger.info("URL " + providerURL + " will not be registered to Registry. Registry " + this.getUrl() + " does not accept service of this protocol type.");
            return false;
        }

        return should;
    }

    protected boolean shouldSubscribe(URL subscribedURL) {
        return !shouldRegister(subscribedURL);
    }

    @Override
    public final void register(URL url) {
        if (!shouldRegister(url)) { // Should Not Register
            return;
        }
        doRegister(url);
    }

    @Override
    public void doRegister(URL url) {
        // fixme, add registry-cluster is not necessary anymore
        url = addRegistryClusterKey(url);
        serviceDiscovery.register(url);
    }

    @Override
    public final void unregister(URL url) {
        if (!shouldRegister(url)) {
            return;
        }
        doUnregister(url);
    }

    @Override
    public void doUnregister(URL url) {
        // fixme, add registry-cluster is not necessary anymore
        url = addRegistryClusterKey(url);
        serviceDiscovery.unregister(url);
    }

    @Override
    public final void subscribe(URL url, NotifyListener listener) {
        if (!shouldSubscribe(url)) { // Should Not Subscribe
            return;
        }
        doSubscribe(url, listener);
    }

    @Override
    public void doSubscribe(URL url, NotifyListener listener) {
        url = addRegistryClusterKey(url);

        serviceDiscovery.subscribe(url, listener);

        boolean check = url.getParameter(CHECK_KEY, false);

        String key = ServiceNameMapping.buildMappingKey(url);
        Lock mappingLock = serviceNameMapping.getMappingLock(key);
        try {
            mappingLock.lock();
            Set<String> subscribedServices = serviceNameMapping.getCachedMapping(url);
            try {
                MappingListener mappingListener = new DefaultMappingListener(url, subscribedServices, listener);
                subscribedServices = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
                mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
            } catch (Exception e) {
                logger.warn("Cannot find app mapping for service " + url.getServiceInterface() + ", will not migrate.", e);
            }

            if (CollectionUtils.isEmpty(subscribedServices)) {
                logger.info("No interface-apps mapping found in local cache, stop subscribing, will automatically wait for mapping listener callback: " + url);
//                if (check) {
//                    throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
//                }
                return;
            }

            subscribeURLs(url, listener, subscribedServices);
        } finally {
            mappingLock.unlock();
        }
    }

    @Override
    public final void unsubscribe(URL url, NotifyListener listener) {
        if (!shouldSubscribe(url)) { // Should Not Subscribe
            return;
        }
        url = addRegistryClusterKey(url);
        doUnsubscribe(url, listener);
    }

    private URL addRegistryClusterKey(URL url) {
        String registryCluster = serviceDiscovery.getUrl().getParameter(REGISTRY_CLUSTER_KEY);
        if (registryCluster != null && url.getParameter(REGISTRY_CLUSTER_KEY) == null) {
            url = url.addParameter(REGISTRY_CLUSTER_KEY, registryCluster);
        }
        return url;
    }

    @Override
    public void doUnsubscribe(URL url, NotifyListener listener) {
        // TODO: remove service name mapping listener
        serviceDiscovery.unsubscribe(url, listener);
        String protocolServiceKey = url.getProtocolServiceKey();
        Set<String> serviceNames = serviceNameMapping.getCachedMapping(url);
        serviceNameMapping.stopListen(url, mappingListeners.remove(protocolServiceKey));
        if (CollectionUtils.isNotEmpty(serviceNames)) {
            String serviceNamesKey = toStringKeys(serviceNames);
            Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
            try {
                appSubscriptionLock.lock();
                ServiceInstancesChangedListener instancesChangedListener = serviceListeners.get(serviceNamesKey);
                if (instancesChangedListener != null) {
                    instancesChangedListener.removeListener(protocolServiceKey, listener);
                    if (!instancesChangedListener.hasListeners()) {
                        instancesChangedListener.destroy();
                        serviceListeners.remove(serviceNamesKey);
                        removeAppSubscriptionLock(serviceNamesKey);
                    }
                }
            } finally {
                appSubscriptionLock.unlock();
            }
        }
    }

    @Override
    public List<URL> lookup(URL url) {
        throw new UnsupportedOperationException("");
    }

    @Override
    public boolean isAvailable() {
        if (serviceDiscovery instanceof NopServiceDiscovery) {
            // NopServiceDiscovery is designed for compatibility, check availability is meaningless, just return true
            return true;
        }
        return !serviceDiscovery.isDestroy() && !serviceDiscovery.getServices().isEmpty();
    }

    @Override
    public void destroy() {
        registryManager.removeDestroyedRegistry(this);
        // stop ServiceDiscovery
        execute(serviceDiscovery::destroy);
        // destroy all event listener
        for (ServiceInstancesChangedListener listener : serviceListeners.values()) {
            listener.destroy();
        }
        appSubscriptionLocks.clear();
        serviceListeners.clear();
        mappingListeners.clear();
    }

    @Override
    public boolean isServiceDiscovery() {
        return true;
    }

    protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) {
        serviceNames = toTreeSet(serviceNames);
        String serviceNamesKey = toStringKeys(serviceNames);
        String serviceKey = url.getServiceKey();
        logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, serviceKey));

        // register ServiceInstancesChangedListener
        Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
        try {
            appSubscriptionLock.lock();
            ServiceInstancesChangedListener serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
            if (serviceInstancesChangedListener == null) {
                serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
                serviceInstancesChangedListener.setUrl(url);
                for (String serviceName : serviceNames) {
                    List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
                    if (CollectionUtils.isNotEmpty(serviceInstances)) {
                        serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
                    }
                }
                serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
            }

            if (!serviceInstancesChangedListener.isDestroyed()) {
                serviceInstancesChangedListener.setUrl(url);
                listener.addServiceListener(serviceInstancesChangedListener);
                serviceInstancesChangedListener.addListenerAndNotify(url, listener);
                serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
            } else {
                logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
                serviceListeners.remove(serviceNamesKey);
            }
        } finally {
            appSubscriptionLock.unlock();
        }
    }

    /**
     * Supports or not ?
     *
     * @param registryURL the {@link URL url} of registry
     * @return if supported, return <code>true</code>, or <code>false</code>
     */
    public static boolean supports(URL registryURL) {
        return SERVICE_REGISTRY_TYPE.equalsIgnoreCase(registryURL.getParameter(REGISTRY_TYPE_KEY));
    }

    public Map<String, ServiceInstancesChangedListener> getServiceListeners() {
        return serviceListeners;
    }

    private class DefaultMappingListener implements MappingListener {
        private final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(DefaultMappingListener.class);
        private final URL url;
        private Set<String> oldApps;
        private NotifyListener listener;
        private volatile boolean stopped;

        public DefaultMappingListener(URL subscribedURL, Set<String> serviceNames, NotifyListener listener) {
            this.url = subscribedURL;
            this.oldApps = serviceNames;
            this.listener = listener;
        }

        @Override
        public synchronized void onEvent(MappingChangedEvent event) {
            logger.info("Received mapping notification from meta server, " + event);

            if (stopped) {
                logger.warn("Listener has been stopped, ignore mapping notification, check why listener is not removed.");
                return;
            }
            Set<String> newApps = event.getApps();
            Set<String> tempOldApps = oldApps;

            if (CollectionUtils.isEmpty(newApps) || CollectionUtils.equals(newApps, tempOldApps)) {
                return;
            }

            logger.info("Mapping of service " + event.getServiceKey() + "changed from " + tempOldApps + " to " + newApps);

            Lock mappingLock = serviceNameMapping.getMappingLock(event.getServiceKey());
            try {
                mappingLock.lock();
                if (CollectionUtils.isEmpty(tempOldApps) && newApps.size() > 0) {
                    serviceNameMapping.putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
                    subscribeURLs(url, listener, newApps);
                    oldApps = newApps;
                    return;
                }

                for (String newAppName : newApps) {
                    if (!tempOldApps.contains(newAppName)) {
                        serviceNameMapping.removeCachedMapping(ServiceNameMapping.buildMappingKey(url));
                        serviceNameMapping.putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
                        // old instance listener related to old app list that needs to be destroyed after subscribe refresh.
                        ServiceInstancesChangedListener oldListener = listener.getServiceListener();
                        if (oldListener != null) {
                            String appKey = toStringKeys(toTreeSet(tempOldApps));
                            Lock appSubscriptionLock = getAppSubscription(appKey);
                            try {
                                appSubscriptionLock.lock();
                                oldListener.removeListener(url.getServiceKey(), listener);
                                if (!oldListener.hasListeners()) {
                                    oldListener.destroy();
                                    removeAppSubscriptionLock(appKey);
                                }
                            } finally {
                                appSubscriptionLock.unlock();
                            }
                        }

                        subscribeURLs(url, listener, newApps);
                        oldApps = newApps;
                        return;
                    }
                }
            } finally {
                mappingLock.unlock();
            }
        }

        @Override
        public void stop() {
            stopped = true;
        }
    }

    public Lock getAppSubscription(String key) {
        return appSubscriptionLocks.computeIfAbsent(key, _k -> new ReentrantLock());
    }

    public void removeAppSubscriptionLock(String key) {
        Lock lock = appSubscriptionLocks.get(key);
        if (lock != null) {
            try {
                lock.lock();
                appSubscriptionLocks.remove(key);
            } finally {
                lock.unlock();
            }
        }
    }
}

相关信息

dubbo 源码目录

相关文章

dubbo AbstractServiceDiscovery 源码

dubbo AbstractServiceDiscoveryFactory 源码

dubbo DefaultRegistryClusterIdentifier 源码

dubbo DefaultServiceDiscoveryFactory 源码

dubbo DefaultServiceInstance 源码

dubbo FileSystemServiceDiscovery 源码

dubbo InstanceAddressURL 源码

dubbo NopServiceDiscovery 源码

dubbo OverrideInstanceAddressURL 源码

dubbo ReflectionBasedServiceDiscovery 源码

0  赞