dubbo MigrationInvoker 源码

  • 2022-10-20
  • 浏览 (210)

dubbo MigrationInvoker 代码

文件路径:/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dubbo.registry.client.migration;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.status.reporter.FrameworkStatusReportService;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.client.migration.model.MigrationRule;
import org.apache.dubbo.registry.client.migration.model.MigrationStep;
import org.apache.dubbo.registry.integration.DynamicDirectory;
import org.apache.dubbo.registry.integration.RegistryProtocol;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.ScopeModelUtil;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import static org.apache.dubbo.registry.client.migration.model.MigrationStep.APPLICATION_FIRST;
import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;

public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
    private Logger logger = LoggerFactory.getLogger(MigrationInvoker.class);

    private URL url;
    private URL consumerUrl;
    private Cluster cluster;
    private Registry registry;
    private Class<T> type;
    private RegistryProtocol registryProtocol;
    private MigrationRuleListener migrationRuleListener;
    private ConsumerModel consumerModel;
    private FrameworkStatusReportService reportService;

    private volatile ClusterInvoker<T> invoker;
    private volatile ClusterInvoker<T> serviceDiscoveryInvoker;
    private volatile ClusterInvoker<T> currentAvailableInvoker;
    private volatile MigrationStep step;
    private volatile MigrationRule rule;
    private volatile int promotion = 100;

    public MigrationInvoker(RegistryProtocol registryProtocol,
                            Cluster cluster,
                            Registry registry,
                            Class<T> type,
                            URL url,
                            URL consumerUrl) {
        this(null, null, registryProtocol, cluster, registry, type, url, consumerUrl);
    }

    public MigrationInvoker(ClusterInvoker<T> invoker,
                            ClusterInvoker<T> serviceDiscoveryInvoker,
                            RegistryProtocol registryProtocol,
                            Cluster cluster,
                            Registry registry,
                            Class<T> type,
                            URL url,
                            URL consumerUrl) {
        this.invoker = invoker;
        this.serviceDiscoveryInvoker = serviceDiscoveryInvoker;
        this.registryProtocol = registryProtocol;
        this.cluster = cluster;
        this.registry = registry;
        this.type = type;
        this.url = url;
        this.consumerUrl = consumerUrl;
        this.consumerModel = (ConsumerModel) consumerUrl.getServiceModel();
        this.reportService = consumerUrl.getOrDefaultApplicationModel().getBeanFactory().getBean(FrameworkStatusReportService.class);

        if (consumerModel != null) {
            Object object = consumerModel.getServiceMetadata().getAttribute(CommonConstants.CURRENT_CLUSTER_INVOKER_KEY);
            Map<Registry, MigrationInvoker<?>> invokerMap;
            if (object instanceof Map) {
                invokerMap = (Map<Registry, MigrationInvoker<?>>) object;
            } else {
                invokerMap = new ConcurrentHashMap<>();
            }
            invokerMap.put(registry, this);
            consumerModel.getServiceMetadata().addAttribute(CommonConstants.CURRENT_CLUSTER_INVOKER_KEY, invokerMap);
        }
    }

    public ClusterInvoker<T> getInvoker() {
        return invoker;
    }

    public void setInvoker(ClusterInvoker<T> invoker) {
        this.invoker = invoker;
    }

    public ClusterInvoker<T> getServiceDiscoveryInvoker() {
        return serviceDiscoveryInvoker;
    }

    public void setServiceDiscoveryInvoker(ClusterInvoker<T> serviceDiscoveryInvoker) {
        this.serviceDiscoveryInvoker = serviceDiscoveryInvoker;
    }

    public ClusterInvoker<T> getCurrentAvailableInvoker() {
        return currentAvailableInvoker;
    }

    @Override
    public Class<T> getInterface() {
        return type;
    }

    @Override
    public void reRefer(URL newSubscribeUrl) {
        // update url to prepare for migration refresh
        this.url = url.addParameter(REFER_KEY, StringUtils.toQueryString(newSubscribeUrl.getParameters()));

        // re-subscribe immediately
        if (invoker != null && !invoker.isDestroyed()) {
            doReSubscribe(invoker, newSubscribeUrl);
        }
        if (serviceDiscoveryInvoker != null && !serviceDiscoveryInvoker.isDestroyed()) {
            doReSubscribe(serviceDiscoveryInvoker, newSubscribeUrl);
        }
    }

    private void doReSubscribe(ClusterInvoker<T> invoker, URL newSubscribeUrl) {
        DynamicDirectory<T> directory = (DynamicDirectory<T>) invoker.getDirectory();
        URL oldSubscribeUrl = directory.getRegisteredConsumerUrl();
        Registry registry = directory.getRegistry();
        registry.unregister(directory.getRegisteredConsumerUrl());
        directory.unSubscribe(RegistryProtocol.toSubscribeUrl(oldSubscribeUrl));
        if (directory.isShouldRegister()) {
            registry.register(directory.getRegisteredConsumerUrl());
            directory.setRegisteredConsumerUrl(newSubscribeUrl);
        }
        directory.buildRouterChain(newSubscribeUrl);
        directory.subscribe(RegistryProtocol.toSubscribeUrl(newSubscribeUrl));
    }

    @Override
    public boolean migrateToForceInterfaceInvoker(MigrationRule newRule) {
        CountDownLatch latch = new CountDownLatch(1);
        refreshInterfaceInvoker(latch);

        if (serviceDiscoveryInvoker == null) {
            // serviceDiscoveryInvoker is absent, ignore threshold check
            this.currentAvailableInvoker = invoker;
            return true;
        }

        // wait and compare threshold
        waitAddressNotify(newRule, latch);

        if (newRule.getForce(consumerUrl)) {
            // force migrate, ignore threshold check
            this.currentAvailableInvoker = invoker;
            this.destroyServiceDiscoveryInvoker();
            return true;
        }

        Set<MigrationAddressComparator> detectors = ScopeModelUtil.getApplicationModel(consumerUrl == null ? null : consumerUrl.getScopeModel())
            .getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
        if (CollectionUtils.isNotEmpty(detectors)) {
            if (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(invoker, serviceDiscoveryInvoker, newRule))) {
                this.currentAvailableInvoker = invoker;
                this.destroyServiceDiscoveryInvoker();
                return true;
            }
        }

        // compare failed, will not change state
        if (step == MigrationStep.FORCE_APPLICATION) {
            destroyInterfaceInvoker();
        }
        return false;
    }

    @Override
    public boolean migrateToForceApplicationInvoker(MigrationRule newRule) {
        CountDownLatch latch = new CountDownLatch(1);
        refreshServiceDiscoveryInvoker(latch);

        if (invoker == null) {
            // invoker is absent, ignore threshold check
            this.currentAvailableInvoker = serviceDiscoveryInvoker;
            return true;
        }

        // wait and compare threshold
        waitAddressNotify(newRule, latch);

        if (newRule.getForce(consumerUrl)) {
            // force migrate, ignore threshold check
            this.currentAvailableInvoker = serviceDiscoveryInvoker;
            this.destroyInterfaceInvoker();
            return true;
        }

        Set<MigrationAddressComparator> detectors = ScopeModelUtil.getApplicationModel(consumerUrl == null ? null : consumerUrl.getScopeModel())
            .getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
        if (CollectionUtils.isNotEmpty(detectors)) {
            if (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(serviceDiscoveryInvoker, invoker, newRule))) {
                this.currentAvailableInvoker = serviceDiscoveryInvoker;
                this.destroyInterfaceInvoker();
                return true;
            }
        }

        // compare failed, will not change state
        if (step == MigrationStep.FORCE_INTERFACE) {
            destroyServiceDiscoveryInvoker();
        }
        return false;
    }

    @Override
    public void migrateToApplicationFirstInvoker(MigrationRule newRule) {
        CountDownLatch latch = new CountDownLatch(0);
        refreshInterfaceInvoker(latch);
        refreshServiceDiscoveryInvoker(latch);

        // directly calculate preferred invoker, will not wait until address notify
        // calculation will re-occurred when address notify later
        calcPreferredInvoker(newRule);
    }

    private void waitAddressNotify(MigrationRule newRule, CountDownLatch latch) {
        // wait and compare threshold
        int delay = newRule.getDelay(consumerUrl);
        if (delay > 0) {
            try {
                Thread.sleep(delay * 1000L);
            } catch (InterruptedException e) {
                logger.error("Interrupted when waiting for address notify!" + e);
            }
        } else {
            // do not wait address notify by default
            delay = 0;
        }
        try {
            latch.await(delay, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.error("Interrupted when waiting for address notify!" + e);
        }
    }

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        if (currentAvailableInvoker != null) {
            if (step == APPLICATION_FIRST) {
                // call ratio calculation based on random value
                if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
                    // fall back to interface mode
                    return invoker.invoke(invocation);
                }
                // check if invoker available for each time
                return decideInvoker().invoke(invocation);
            }
            return currentAvailableInvoker.invoke(invocation);
        }

        switch (step) {
            case APPLICATION_FIRST:
                currentAvailableInvoker = decideInvoker();
                break;
            case FORCE_APPLICATION:
                currentAvailableInvoker = serviceDiscoveryInvoker;
                break;
            case FORCE_INTERFACE:
            default:
                currentAvailableInvoker = invoker;
        }

        return currentAvailableInvoker.invoke(invocation);
    }

    private ClusterInvoker<T> decideInvoker() {
        if (currentAvailableInvoker == serviceDiscoveryInvoker) {
            if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
                return serviceDiscoveryInvoker;
            }
            return invoker;
        } else {
            return currentAvailableInvoker;
        }
    }

    @Override
    public boolean isAvailable() {
        return currentAvailableInvoker != null
            ? currentAvailableInvoker.isAvailable()
            : (invoker != null && invoker.isAvailable()) || (serviceDiscoveryInvoker != null && serviceDiscoveryInvoker.isAvailable());
    }

    @Override
    public void destroy() {
        if (migrationRuleListener != null) {
            migrationRuleListener.removeMigrationInvoker(this);
        }
        if (invoker != null) {
            invoker.destroy();
        }
        if (serviceDiscoveryInvoker != null) {
            serviceDiscoveryInvoker.destroy();
        }
        if (consumerModel != null) {
            Object object = consumerModel.getServiceMetadata().getAttribute(CommonConstants.CURRENT_CLUSTER_INVOKER_KEY);
            Map<Registry, MigrationInvoker<?>> invokerMap;
            if (object instanceof Map) {
                invokerMap = (Map<Registry, MigrationInvoker<?>>) object;
                invokerMap.remove(registry);
                if (invokerMap.isEmpty()) {
                    consumerModel.getServiceMetadata().getAttributeMap().remove(CommonConstants.CURRENT_CLUSTER_INVOKER_KEY);
                }
            }
        }
    }

    @Override
    public URL getUrl() {
        if (currentAvailableInvoker != null) {
            return currentAvailableInvoker.getUrl();
        } else if (invoker != null) {
            return invoker.getUrl();
        } else if (serviceDiscoveryInvoker != null) {
            return serviceDiscoveryInvoker.getUrl();
        }

        return consumerUrl;
    }

    @Override
    public URL getRegistryUrl() {
        if (currentAvailableInvoker != null) {
            return currentAvailableInvoker.getRegistryUrl();
        } else if (invoker != null) {
            return invoker.getRegistryUrl();
        } else if (serviceDiscoveryInvoker != null) {
            return serviceDiscoveryInvoker.getRegistryUrl();
        }
        return url;
    }

    @Override
    public Directory<T> getDirectory() {
        if (currentAvailableInvoker != null) {
            return currentAvailableInvoker.getDirectory();
        } else if (invoker != null) {
            return invoker.getDirectory();
        } else if (serviceDiscoveryInvoker != null) {
            return serviceDiscoveryInvoker.getDirectory();
        }
        return null;
    }

    @Override
    public boolean isDestroyed() {
        return currentAvailableInvoker != null
            ? currentAvailableInvoker.isDestroyed()
            : (invoker == null || invoker.isDestroyed()) && (serviceDiscoveryInvoker == null || serviceDiscoveryInvoker.isDestroyed());
    }

    @Override
    public boolean isServiceDiscovery() {
        return false;
    }

    @Override
    public MigrationStep getMigrationStep() {
        return step;
    }

    @Override
    public void setMigrationStep(MigrationStep step) {
        this.step = step;
    }

    @Override
    public MigrationRule getMigrationRule() {
        return rule;
    }

    @Override
    public void setMigrationRule(MigrationRule rule) {
        this.rule = rule;
        promotion = rule.getProportion(consumerUrl);
    }

    protected void destroyServiceDiscoveryInvoker() {
        if (this.invoker != null) {
            this.currentAvailableInvoker = this.invoker;
        }
        if (serviceDiscoveryInvoker != null && !serviceDiscoveryInvoker.isDestroyed()) {
            if (logger.isInfoEnabled()) {
                logger.info("Destroying instance address invokers, will not listen for address changes until re-subscribed, " + type.getName());
            }
            serviceDiscoveryInvoker.destroy();
            serviceDiscoveryInvoker = null;
        }
    }

    protected void refreshServiceDiscoveryInvoker(CountDownLatch latch) {
        clearListener(serviceDiscoveryInvoker);
        if (needRefresh(serviceDiscoveryInvoker)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Re-subscribing instance addresses, current interface " + type.getName());
            }

            if (serviceDiscoveryInvoker != null) {
                serviceDiscoveryInvoker.destroy();
            }
            serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);
        }
        setListener(serviceDiscoveryInvoker, () -> {
            latch.countDown();
            if (reportService.hasReporter()) {
                reportService.reportConsumptionStatus(
                    reportService.createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "app"));
            }
            if (step == APPLICATION_FIRST) {
                calcPreferredInvoker(rule);
            }
        });
    }

    protected void refreshInterfaceInvoker(CountDownLatch latch) {
        clearListener(invoker);
        if (needRefresh(invoker)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Re-subscribing interface addresses for interface " + type.getName());
            }

            if (invoker != null) {
                invoker.destroy();
            }
            invoker = registryProtocol.getInvoker(cluster, registry, type, url);
        }
        setListener(invoker, () -> {
            latch.countDown();
            if (reportService.hasReporter()) {
                reportService.reportConsumptionStatus(
                    reportService.createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "interface"));
            }
            if (step == APPLICATION_FIRST) {
                calcPreferredInvoker(rule);
            }
        });
    }

    private synchronized void calcPreferredInvoker(MigrationRule migrationRule) {
        if (serviceDiscoveryInvoker == null || invoker == null) {
            return;
        }
        Set<MigrationAddressComparator> detectors = ScopeModelUtil.getApplicationModel(consumerUrl == null ? null : consumerUrl.getScopeModel())
            .getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
        if (CollectionUtils.isNotEmpty(detectors)) {
            // pick preferred invoker
            // the real invoker choice in invocation will be affected by promotion
            if (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(serviceDiscoveryInvoker, invoker, migrationRule))) {
                this.currentAvailableInvoker = serviceDiscoveryInvoker;
            } else {
                this.currentAvailableInvoker = invoker;
            }
        }
    }

    protected void destroyInterfaceInvoker() {
        if (this.serviceDiscoveryInvoker != null) {
            this.currentAvailableInvoker = this.serviceDiscoveryInvoker;
        }
        if (invoker != null && !invoker.isDestroyed()) {
            if (logger.isInfoEnabled()) {
                logger.info("Destroying interface address invokers, will not listen for address changes until re-subscribed, " + type.getName());
            }
            invoker.destroy();
            invoker = null;
        }
    }

    private void clearListener(ClusterInvoker<T> invoker) {
        if (invoker == null) {
            return;
        }
        DynamicDirectory<T> directory = (DynamicDirectory<T>) invoker.getDirectory();
        directory.setInvokersChangedListener(null);
    }

    private void setListener(ClusterInvoker<T> invoker, InvokersChangedListener listener) {
        if (invoker == null) {
            return;
        }
        DynamicDirectory<T> directory = (DynamicDirectory<T>) invoker.getDirectory();
        directory.setInvokersChangedListener(listener);
    }

    private boolean needRefresh(ClusterInvoker<T> invoker) {
        return invoker == null || invoker.isDestroyed() || !invoker.hasProxyInvokers();
    }

    public boolean checkInvokerAvailable(ClusterInvoker<T> invoker) {
        return invoker != null && !invoker.isDestroyed() && invoker.isAvailable();
    }

    protected void setCurrentAvailableInvoker(ClusterInvoker<T> currentAvailableInvoker) {
        this.currentAvailableInvoker = currentAvailableInvoker;
    }

    protected void setMigrationRuleListener(MigrationRuleListener migrationRuleListener) {
        this.migrationRuleListener = migrationRuleListener;
    }

    public Cluster getCluster() {
        return cluster;
    }

    public URL getConsumerUrl() {
        return consumerUrl;
    }
}

相关信息

dubbo 源码目录

相关文章

dubbo DefaultMigrationAddressComparator 源码

dubbo InvokersChangedListener 源码

dubbo MigrationAddressComparator 源码

dubbo MigrationClusterInvoker 源码

dubbo MigrationRuleHandler 源码

dubbo MigrationRuleListener 源码

dubbo PreMigratingConditionChecker 源码

dubbo ServiceDiscoveryMigrationInvoker 源码

0  赞