kubernetes in_tree_volume 源码

  • 2022-09-18
  • 浏览 (198)

kubernetes in_tree_volume 代码

文件路径:/staging/src/k8s.io/csi-translation-lib/plugins/in_tree_volume.go

/*
Copyright 2019 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package plugins

import (
	"errors"
	"fmt"
	"sort"
	"strings"

	v1 "k8s.io/api/core/v1"
	storage "k8s.io/api/storage/v1"
	"k8s.io/apimachinery/pkg/util/sets"
)

// InTreePlugin handles translations between CSI and in-tree sources in a PV
type InTreePlugin interface {

	// TranslateInTreeStorageClassToCSI takes in-tree volume options
	// and translates them to a volume options consumable by CSI plugin
	TranslateInTreeStorageClassToCSI(sc *storage.StorageClass) (*storage.StorageClass, error)

	// TranslateInTreeInlineVolumeToCSI takes a inline volume and will translate
	// the in-tree inline volume source to a CSIPersistentVolumeSource
	// A PV object containing the CSIPersistentVolumeSource in it's spec is returned
	// podNamespace is only needed for azurefile to fetch secret namespace, no need to be set for other plugins.
	TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)

	// TranslateInTreePVToCSI takes a persistent volume and will translate
	// the in-tree pv source to a CSI Source. The input persistent volume can be modified
	TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)

	// TranslateCSIPVToInTree takes a PV with a CSI PersistentVolume Source and will translate
	// it to a in-tree Persistent Volume Source for the in-tree volume
	// by the `Driver` field in the CSI Source. The input PV object can be modified
	TranslateCSIPVToInTree(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)

	// CanSupport tests whether the plugin supports a given persistent volume
	// specification from the API.
	CanSupport(pv *v1.PersistentVolume) bool

	// CanSupportInline tests whether the plugin supports a given inline volume
	// specification from the API.
	CanSupportInline(vol *v1.Volume) bool

	// GetInTreePluginName returns the in-tree plugin name this migrates
	GetInTreePluginName() string

	// GetCSIPluginName returns the name of the CSI plugin that supersedes the in-tree plugin
	GetCSIPluginName() string

	// RepairVolumeHandle generates a correct volume handle based on node ID information.
	RepairVolumeHandle(volumeHandle, nodeID string) (string, error)
}

const (
	// fsTypeKey is the deprecated storage class parameter key for fstype
	fsTypeKey = "fstype"
	// csiFsTypeKey is the storage class parameter key for CSI fstype
	csiFsTypeKey = "csi.storage.k8s.io/fstype"
	// zoneKey is the deprecated storage class parameter key for zone
	zoneKey = "zone"
	// zonesKey is the deprecated storage class parameter key for zones
	zonesKey = "zones"
)

// replaceTopology overwrites an existing key in NodeAffinity by a new one.
// If there are any newKey already exist in an expression of a term, we will
// not combine the replaced key Values with the existing ones.
// So there might be duplication if there is any newKey expression
// already in the terms.
func replaceTopology(pv *v1.PersistentVolume, oldKey, newKey string) error {
	// Make sure the necessary fields exist
	if pv == nil || pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil ||
		pv.Spec.NodeAffinity.Required.NodeSelectorTerms == nil || len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
		return nil
	}
	for i := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
		for j, r := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions {
			if r.Key == oldKey {
				pv.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions[j].Key = newKey
			}
		}
	}

	return nil
}

// getTopologyValues returns all unique topology values with the given key found in
// the PV NodeAffinity. Sort by alphabetical order.
// This function collapses multiple zones into a list that is ORed. This assumes that
// the plugin does not support a constraint like key in "zone1" AND "zone2"
func getTopologyValues(pv *v1.PersistentVolume, key string) []string {
	if pv.Spec.NodeAffinity == nil ||
		pv.Spec.NodeAffinity.Required == nil ||
		len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) < 1 {
		return nil
	}

	values := make(map[string]bool)
	for i := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
		for _, r := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions {
			if r.Key == key {
				for _, v := range r.Values {
					values[v] = true
				}
			}
		}
	}
	// remove duplication and sort them in order for better usage
	var re []string
	for k := range values {
		re = append(re, k)
	}
	sort.Strings(re)
	return re
}

// addTopology appends the topology to the given PV to all Terms.
func addTopology(pv *v1.PersistentVolume, topologyKey string, zones []string) error {
	// Make sure there are no duplicate or empty strings
	filteredZones := sets.String{}
	for i := range zones {
		zone := strings.TrimSpace(zones[i])
		if len(zone) > 0 {
			filteredZones.Insert(zone)
		}
	}

	zones = filteredZones.List()
	if len(zones) < 1 {
		return errors.New("there are no valid zones to add to pv")
	}

	// Make sure the necessary fields exist
	if pv.Spec.NodeAffinity == nil {
		pv.Spec.NodeAffinity = new(v1.VolumeNodeAffinity)
	}

	if pv.Spec.NodeAffinity.Required == nil {
		pv.Spec.NodeAffinity.Required = new(v1.NodeSelector)
	}

	if len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
		pv.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1)
	}

	topology := v1.NodeSelectorRequirement{
		Key:      topologyKey,
		Operator: v1.NodeSelectorOpIn,
		Values:   zones,
	}

	// add the CSI topology to each term
	for i := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
		pv.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(
			pv.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions,
			topology,
		)
	}

	return nil
}

// translateTopologyFromInTreeToCSI converts existing zone labels or in-tree topology to CSI topology.
// In-tree topology has precedence over zone labels. When both in-tree topology and zone labels exist
// for a particular CSI topology, in-tree topology will be used.
// This function will remove the Beta version Kubernetes topology label in case the node upgrade to a
// newer version where it does not have any Beta topology label anymore
func translateTopologyFromInTreeToCSI(pv *v1.PersistentVolume, csiTopologyKey string) error {

	zoneLabel, regionLabel := getTopologyLabel(pv)

	// If Zone kubernetes topology exist, replace it to use csiTopologyKey
	zones := getTopologyValues(pv, zoneLabel)
	if len(zones) > 0 {
		replaceTopology(pv, zoneLabel, csiTopologyKey)
	} else {
		// if nothing is in the NodeAffinity, try to fetch the topology from PV labels
		if label, ok := pv.Labels[zoneLabel]; ok {
			zones = strings.Split(label, labelMultiZoneDelimiter)
			if len(zones) > 0 {
				addTopology(pv, csiTopologyKey, zones)
			}
		}
	}

	// if the in-tree PV has beta region label, replace it with GA label to ensure
	// the scheduler is able to schedule it on new nodes with only GA kubernetes label
	// No need to check it for zone label because it has already been replaced if exist
	if regionLabel == v1.LabelFailureDomainBetaRegion {
		regions := getTopologyValues(pv, regionLabel)
		if len(regions) > 0 {
			replaceTopology(pv, regionLabel, v1.LabelTopologyRegion)
		}
	}

	return nil
}

// getTopologyLabel checks if the kubernetes topology label used in this
// PV is GA and return the zone/region label used.
// The version checking follows the following orders
//  1. Check NodeAffinity
//     1.1 Check if zoneGA exists, if yes return GA labels
//     1.2 Check if zoneBeta exists, if yes return Beta labels
//  2. Check PV labels
//     2.1 Check if zoneGA exists, if yes return GA labels
//     2.2 Check if zoneBeta exists, if yes return Beta labels
func getTopologyLabel(pv *v1.PersistentVolume) (zoneLabel string, regionLabel string) {

	if zoneGA := TopologyKeyExist(v1.LabelTopologyZone, pv.Spec.NodeAffinity); zoneGA {
		return v1.LabelTopologyZone, v1.LabelTopologyRegion
	}
	if zoneBeta := TopologyKeyExist(v1.LabelFailureDomainBetaZone, pv.Spec.NodeAffinity); zoneBeta {
		return v1.LabelFailureDomainBetaZone, v1.LabelFailureDomainBetaRegion
	}
	if _, zoneGA := pv.Labels[v1.LabelTopologyZone]; zoneGA {
		return v1.LabelTopologyZone, v1.LabelTopologyRegion
	}
	if _, zoneBeta := pv.Labels[v1.LabelFailureDomainBetaZone]; zoneBeta {
		return v1.LabelFailureDomainBetaZone, v1.LabelFailureDomainBetaRegion
	}
	// No labels or NodeAffinity exist, default to GA version
	return v1.LabelTopologyZone, v1.LabelTopologyRegion
}

// TopologyKeyExist checks if a certain key exists in a VolumeNodeAffinity
func TopologyKeyExist(key string, vna *v1.VolumeNodeAffinity) bool {
	if vna == nil || vna.Required == nil || vna.Required.NodeSelectorTerms == nil || len(vna.Required.NodeSelectorTerms) == 0 {
		return false
	}

	for _, nodeSelectorTerms := range vna.Required.NodeSelectorTerms {
		nsrequirements := nodeSelectorTerms.MatchExpressions
		for _, nodeSelectorRequirement := range nsrequirements {
			if nodeSelectorRequirement.Key == key {
				return true
			}
		}
	}
	return false
}

type regionParserFn func([]string) (string, error)

// translateTopologyFromCSIToInTree translate a CSI topology to
// Kubernetes topology and add topology labels to it. Note that this function
// will only work for plugin with a single topologyKey that translates to
// Kubernetes zone(and region if regionParser is passed in).
// If a plugin has more than one topologyKey, it will need to be processed
// separately by the plugin.
// If regionParser is nil, no region NodeAffinity will be added. If not nil,
// it'll be passed to regionTopologyHandler, which will add region topology NodeAffinity
// and labels for the given PV. It assumes the Zone NodeAffinity already exists.
// In short this function will,
// 1. Replace all CSI topology to Kubernetes Zone topology label
// 2. Process and generate region topology if a regionParser is passed
// 3. Add Kubernetes Topology labels(zone) if they do not exist
func translateTopologyFromCSIToInTree(pv *v1.PersistentVolume, csiTopologyKey string, regionParser regionParserFn) error {

	zoneLabel, _ := getTopologyLabel(pv)

	// 1. Replace all CSI topology to Kubernetes Zone label
	err := replaceTopology(pv, csiTopologyKey, zoneLabel)
	if err != nil {
		return fmt.Errorf("Failed to replace CSI topology to Kubernetes topology, error: %v", err)
	}

	// 2. Take care of region topology if a regionParser is passed
	if regionParser != nil {
		// let's make less strict on this one. Even if there is an error in the region processing, just ignore it
		err = regionTopologyHandler(pv, regionParser)
		if err != nil {
			return fmt.Errorf("Failed to handle region topology. error: %v", err)
		}
	}

	// 3. Add labels about Kubernetes Topology
	zoneVals := getTopologyValues(pv, zoneLabel)
	if len(zoneVals) > 0 {
		if pv.Labels == nil {
			pv.Labels = make(map[string]string)
		}
		_, zoneOK := pv.Labels[zoneLabel]
		if !zoneOK {
			zoneValStr := strings.Join(zoneVals, labelMultiZoneDelimiter)
			pv.Labels[zoneLabel] = zoneValStr
		}
	}

	return nil
}

// translateAllowedTopologies translates allowed topologies within storage class or PV
// from legacy failure domain to given CSI topology key
func translateAllowedTopologies(terms []v1.TopologySelectorTerm, key string) ([]v1.TopologySelectorTerm, error) {
	if terms == nil {
		return nil, nil
	}

	newTopologies := []v1.TopologySelectorTerm{}
	for _, term := range terms {
		newTerm := v1.TopologySelectorTerm{}
		for _, exp := range term.MatchLabelExpressions {
			var newExp v1.TopologySelectorLabelRequirement
			if exp.Key == v1.LabelFailureDomainBetaZone || exp.Key == v1.LabelTopologyZone {
				newExp = v1.TopologySelectorLabelRequirement{
					Key:    key,
					Values: exp.Values,
				}
			} else {
				// Other topologies are passed through unchanged.
				newExp = exp
			}
			newTerm.MatchLabelExpressions = append(newTerm.MatchLabelExpressions, newExp)
		}
		newTopologies = append(newTopologies, newTerm)
	}
	return newTopologies, nil
}

// regionTopologyHandler will process the PV and add region
// kubernetes topology label to its NodeAffinity and labels
// It assumes the Zone NodeAffinity already exists
// Each provider is responsible for providing their own regionParser
func regionTopologyHandler(pv *v1.PersistentVolume, regionParser regionParserFn) error {

	// Make sure the necessary fields exist
	if pv == nil || pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil ||
		pv.Spec.NodeAffinity.Required.NodeSelectorTerms == nil || len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
		return nil
	}

	zoneLabel, regionLabel := getTopologyLabel(pv)

	// process each term
	for index, nodeSelectorTerm := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
		// In the first loop, see if regionLabel already exist
		regionExist := false
		var zoneVals []string
		for _, nsRequirement := range nodeSelectorTerm.MatchExpressions {
			if nsRequirement.Key == regionLabel {
				regionExist = true
				break
			} else if nsRequirement.Key == zoneLabel {
				zoneVals = append(zoneVals, nsRequirement.Values...)
			}
		}
		if regionExist {
			// Regionlabel already exist in this term, skip it
			continue
		}
		// If no regionLabel found, generate region label from the zoneLabel we collect from this term
		regionVal, err := regionParser(zoneVals)
		if err != nil {
			return err
		}
		// Add the regionVal to this term
		pv.Spec.NodeAffinity.Required.NodeSelectorTerms[index].MatchExpressions =
			append(pv.Spec.NodeAffinity.Required.NodeSelectorTerms[index].MatchExpressions, v1.NodeSelectorRequirement{
				Key:      regionLabel,
				Operator: v1.NodeSelectorOpIn,
				Values:   []string{regionVal},
			})

	}

	// Add region label
	regionVals := getTopologyValues(pv, regionLabel)
	if len(regionVals) == 1 {
		// We should only have exactly 1 region value
		if pv.Labels == nil {
			pv.Labels = make(map[string]string)
		}
		_, regionOK := pv.Labels[regionLabel]
		if !regionOK {
			pv.Labels[regionLabel] = regionVals[0]
		}
	}

	return nil
}

相关信息

kubernetes 源码目录

相关文章

kubernetes aws_ebs 源码

kubernetes aws_ebs_test 源码

kubernetes azure_disk 源码

kubernetes azure_disk_test 源码

kubernetes azure_file 源码

kubernetes azure_file_test 源码

kubernetes const 源码

kubernetes gce_pd 源码

kubernetes gce_pd_test 源码

kubernetes in_tree_volume_test 源码

0  赞