kubernetes binder 源码

  • 2022-09-18
  • 浏览 (377)

kubernetes binder 代码

文件路径:/pkg/scheduler/framework/plugins/volumebinding/binder.go

/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package volumebinding

import (
	"context"
	"fmt"
	"sort"
	"strings"
	"time"

	v1 "k8s.io/api/core/v1"
	storagev1 "k8s.io/api/storage/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/util/sets"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/apiserver/pkg/storage"
	utilfeature "k8s.io/apiserver/pkg/util/feature"
	coreinformers "k8s.io/client-go/informers/core/v1"
	storageinformers "k8s.io/client-go/informers/storage/v1"
	clientset "k8s.io/client-go/kubernetes"
	corelisters "k8s.io/client-go/listers/core/v1"
	storagelisters "k8s.io/client-go/listers/storage/v1"
	"k8s.io/component-helpers/storage/ephemeral"
	"k8s.io/component-helpers/storage/volume"
	csitrans "k8s.io/csi-translation-lib"
	csiplugins "k8s.io/csi-translation-lib/plugins"
	"k8s.io/klog/v2"
	v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
	"k8s.io/kubernetes/pkg/features"
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics"
)

// ConflictReason is used for the special strings which explain why
// volume binding is impossible for a node.
type ConflictReason string

// ConflictReasons contains all reasons that explain why volume binding is impossible for a node.
type ConflictReasons []ConflictReason

func (reasons ConflictReasons) Len() int           { return len(reasons) }
func (reasons ConflictReasons) Less(i, j int) bool { return reasons[i] < reasons[j] }
func (reasons ConflictReasons) Swap(i, j int)      { reasons[i], reasons[j] = reasons[j], reasons[i] }

const (
	// ErrReasonBindConflict is used for VolumeBindingNoMatch predicate error.
	ErrReasonBindConflict ConflictReason = "node(s) didn't find available persistent volumes to bind"
	// ErrReasonNodeConflict is used for VolumeNodeAffinityConflict predicate error.
	ErrReasonNodeConflict ConflictReason = "node(s) had volume node affinity conflict"
	// ErrReasonNotEnoughSpace is used when a pod cannot start on a node because not enough storage space is available.
	ErrReasonNotEnoughSpace = "node(s) did not have enough free storage"
	// ErrReasonPVNotExist is used when a pod has one or more PVC(s) bound to non-existent persistent volume(s)"
	ErrReasonPVNotExist = "node(s) unavailable due to one or more pvc(s) bound to non-existent pv(s)"
)

// BindingInfo holds a binding between PV and PVC.
type BindingInfo struct {
	// PVC that needs to be bound
	pvc *v1.PersistentVolumeClaim

	// Proposed PV to bind to this PVC
	pv *v1.PersistentVolume
}

// StorageClassName returns the name of the storage class.
func (b *BindingInfo) StorageClassName() string {
	return b.pv.Spec.StorageClassName
}

// StorageResource represents storage resource.
type StorageResource struct {
	Requested int64
	Capacity  int64
}

// StorageResource returns storage resource.
func (b *BindingInfo) StorageResource() *StorageResource {
	// both fields are mandatory
	requestedQty := b.pvc.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
	capacityQty := b.pv.Spec.Capacity[v1.ResourceName(v1.ResourceStorage)]
	return &StorageResource{
		Requested: requestedQty.Value(),
		Capacity:  capacityQty.Value(),
	}
}

// PodVolumes holds pod's volumes information used in volume scheduling.
type PodVolumes struct {
	// StaticBindings are binding decisions for PVCs which can be bound to
	// pre-provisioned static PVs.
	StaticBindings []*BindingInfo
	// DynamicProvisions are PVCs that require dynamic provisioning
	DynamicProvisions []*v1.PersistentVolumeClaim
}

// InTreeToCSITranslator contains methods required to check migratable status
// and perform translations from InTree PV's to CSI
type InTreeToCSITranslator interface {
	IsPVMigratable(pv *v1.PersistentVolume) bool
	GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error)
	TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
}

// SchedulerVolumeBinder is used by the scheduler VolumeBinding plugin to
// handle PVC/PV binding and dynamic provisioning. The binding decisions are
// integrated into the pod scheduling workflow so that the PV NodeAffinity is
// also considered along with the pod's other scheduling requirements.
//
// This integrates into the existing scheduler workflow as follows:
//  1. The scheduler takes a Pod off the scheduler queue and processes it serially:
//     a. Invokes all pre-filter plugins for the pod. GetPodVolumes() is invoked
//     here, pod volume information will be saved in current scheduling cycle state for later use.
//     b. Invokes all filter plugins, parallelized across nodes.  FindPodVolumes() is invoked here.
//     c. Invokes all score plugins.  Future/TBD
//     d. Selects the best node for the Pod.
//     e. Invokes all reserve plugins. AssumePodVolumes() is invoked here.
//     i.  If PVC binding is required, cache in-memory only:
//     * For manual binding: update PV objects for prebinding to the corresponding PVCs.
//     * For dynamic provisioning: update PVC object with a selected node from c)
//     * For the pod, which PVCs and PVs need API updates.
//     ii. Afterwards, the main scheduler caches the Pod->Node binding in the scheduler's pod cache,
//     This is handled in the scheduler and not here.
//     f. Asynchronously bind volumes and pod in a separate goroutine
//     i.  BindPodVolumes() is called first in PreBind phase. It makes all the necessary API updates and waits for
//     PV controller to fully bind and provision the PVCs. If binding fails, the Pod is sent
//     back through the scheduler.
//     ii. After BindPodVolumes() is complete, then the scheduler does the final Pod->Node binding.
//  2. Once all the assume operations are done in e), the scheduler processes the next Pod in the scheduler queue
//     while the actual binding operation occurs in the background.
type SchedulerVolumeBinder interface {
	// GetPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning)
	// and unbound with immediate binding (including prebound)
	GetPodVolumes(pod *v1.Pod) (boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error)

	// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the
	// node and returns pod's volumes information.
	//
	// If a PVC is bound, it checks if the PV's NodeAffinity matches the Node.
	// Otherwise, it tries to find an available PV to bind to the PVC.
	//
	// It returns an error when something went wrong or a list of reasons why the node is
	// (currently) not usable for the pod.
	//
	// If the CSIStorageCapacity feature is enabled, then it also checks for sufficient storage
	// for volumes that still need to be created.
	//
	// This function is called by the scheduler VolumeBinding plugin and can be called in parallel
	FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error)

	// AssumePodVolumes will:
	// 1. Take the PV matches for unbound PVCs and update the PV cache assuming
	// that the PV is prebound to the PVC.
	// 2. Take the PVCs that need provisioning and update the PVC cache with related
	// annotations set.
	//
	// It returns true if all volumes are fully bound
	//
	// This function is called serially.
	AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error)

	// RevertAssumedPodVolumes will revert assumed PV and PVC cache.
	RevertAssumedPodVolumes(podVolumes *PodVolumes)

	// BindPodVolumes will:
	// 1. Initiate the volume binding by making the API call to prebind the PV
	// to its matching PVC.
	// 2. Trigger the volume provisioning by making the API call to set related
	// annotations on the PVC
	// 3. Wait for PVCs to be completely bound by the PV controller
	//
	// This function can be called in parallel.
	BindPodVolumes(ctx context.Context, assumedPod *v1.Pod, podVolumes *PodVolumes) error
}

type volumeBinder struct {
	kubeClient clientset.Interface

	classLister   storagelisters.StorageClassLister
	podLister     corelisters.PodLister
	nodeLister    corelisters.NodeLister
	csiNodeLister storagelisters.CSINodeLister

	pvcCache PVCAssumeCache
	pvCache  PVAssumeCache

	// Amount of time to wait for the bind operation to succeed
	bindTimeout time.Duration

	translator InTreeToCSITranslator

	csiDriverLister          storagelisters.CSIDriverLister
	csiStorageCapacityLister storagelisters.CSIStorageCapacityLister
}

// CapacityCheck contains additional parameters for NewVolumeBinder that
// are only needed when checking volume sizes against available storage
// capacity is desired.
type CapacityCheck struct {
	CSIDriverInformer          storageinformers.CSIDriverInformer
	CSIStorageCapacityInformer storageinformers.CSIStorageCapacityInformer
}

// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions.
//
// capacityCheck determines how storage capacity is checked (CSIStorageCapacity feature).
func NewVolumeBinder(
	kubeClient clientset.Interface,
	podInformer coreinformers.PodInformer,
	nodeInformer coreinformers.NodeInformer,
	csiNodeInformer storageinformers.CSINodeInformer,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	pvInformer coreinformers.PersistentVolumeInformer,
	storageClassInformer storageinformers.StorageClassInformer,
	capacityCheck CapacityCheck,
	bindTimeout time.Duration) SchedulerVolumeBinder {
	b := &volumeBinder{
		kubeClient:    kubeClient,
		podLister:     podInformer.Lister(),
		classLister:   storageClassInformer.Lister(),
		nodeLister:    nodeInformer.Lister(),
		csiNodeLister: csiNodeInformer.Lister(),
		pvcCache:      NewPVCAssumeCache(pvcInformer.Informer()),
		pvCache:       NewPVAssumeCache(pvInformer.Informer()),
		bindTimeout:   bindTimeout,
		translator:    csitrans.New(),
	}

	b.csiDriverLister = capacityCheck.CSIDriverInformer.Lister()
	b.csiStorageCapacityLister = capacityCheck.CSIStorageCapacityInformer.Lister()

	return b
}

// FindPodVolumes finds the matching PVs for PVCs and nodes to provision PVs
// for the given pod and node. If the node does not fit, confilict reasons are
// returned.
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
	podVolumes = &PodVolumes{}

	// Warning: Below log needs high verbosity as it can be printed several times (#60933).
	klog.V(5).InfoS("FindPodVolumes", "pod", klog.KObj(pod), "node", klog.KObj(node))

	// Initialize to true for pods that don't have volumes. These
	// booleans get translated into reason strings when the function
	// returns without an error.
	unboundVolumesSatisfied := true
	boundVolumesSatisfied := true
	sufficientStorage := true
	boundPVsFound := true
	defer func() {
		if err != nil {
			return
		}
		if !boundVolumesSatisfied {
			reasons = append(reasons, ErrReasonNodeConflict)
		}
		if !unboundVolumesSatisfied {
			reasons = append(reasons, ErrReasonBindConflict)
		}
		if !sufficientStorage {
			reasons = append(reasons, ErrReasonNotEnoughSpace)
		}
		if !boundPVsFound {
			reasons = append(reasons, ErrReasonPVNotExist)
		}
	}()

	defer func() {
		if err != nil {
			metrics.VolumeSchedulingStageFailed.WithLabelValues("predicate").Inc()
		}
	}()

	var (
		staticBindings    []*BindingInfo
		dynamicProvisions []*v1.PersistentVolumeClaim
	)
	defer func() {
		// Although we do not distinguish nil from empty in this function, for
		// easier testing, we normalize empty to nil.
		if len(staticBindings) == 0 {
			staticBindings = nil
		}
		if len(dynamicProvisions) == 0 {
			dynamicProvisions = nil
		}
		podVolumes.StaticBindings = staticBindings
		podVolumes.DynamicProvisions = dynamicProvisions
	}()

	// Check PV node affinity on bound volumes
	if len(boundClaims) > 0 {
		boundVolumesSatisfied, boundPVsFound, err = b.checkBoundClaims(boundClaims, node, pod)
		if err != nil {
			return
		}
	}

	// Find matching volumes and node for unbound claims
	if len(claimsToBind) > 0 {
		var (
			claimsToFindMatching []*v1.PersistentVolumeClaim
			claimsToProvision    []*v1.PersistentVolumeClaim
		)

		// Filter out claims to provision
		for _, claim := range claimsToBind {
			if selectedNode, ok := claim.Annotations[volume.AnnSelectedNode]; ok {
				if selectedNode != node.Name {
					// Fast path, skip unmatched node.
					unboundVolumesSatisfied = false
					return
				}
				claimsToProvision = append(claimsToProvision, claim)
			} else {
				claimsToFindMatching = append(claimsToFindMatching, claim)
			}
		}

		// Find matching volumes
		if len(claimsToFindMatching) > 0 {
			var unboundClaims []*v1.PersistentVolumeClaim
			unboundVolumesSatisfied, staticBindings, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, node)
			if err != nil {
				return
			}
			claimsToProvision = append(claimsToProvision, unboundClaims...)
		}

		// Check for claims to provision. This is the first time where we potentially
		// find out that storage is not sufficient for the node.
		if len(claimsToProvision) > 0 {
			unboundVolumesSatisfied, sufficientStorage, dynamicProvisions, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
			if err != nil {
				return
			}
		}
	}

	return
}

// AssumePodVolumes will take the matching PVs and PVCs to provision in pod's
// volume information for the chosen node, and:
// 1. Update the pvCache with the new prebound PV.
// 2. Update the pvcCache with the new PVCs with annotations set
// 3. Update PodVolumes again with cached API updates for PVs and PVCs.
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error) {
	klog.V(4).InfoS("AssumePodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", nodeName))
	defer func() {
		if err != nil {
			metrics.VolumeSchedulingStageFailed.WithLabelValues("assume").Inc()
		}
	}()

	if allBound := b.arePodVolumesBound(assumedPod); allBound {
		klog.V(4).InfoS("AssumePodVolumes: all PVCs bound and nothing to do", "pod", klog.KObj(assumedPod), "node", klog.KRef("", nodeName))
		return true, nil
	}

	// Assume PV
	newBindings := []*BindingInfo{}
	for _, binding := range podVolumes.StaticBindings {
		newPV, dirty, err := volume.GetBindVolumeToClaim(binding.pv, binding.pvc)
		klog.V(5).InfoS("AssumePodVolumes: GetBindVolumeToClaim",
			"pod", klog.KObj(assumedPod),
			"PV", klog.KObj(binding.pv),
			"PVC", klog.KObj(binding.pvc),
			"newPV", klog.KObj(newPV),
			"dirty", dirty,
		)
		if err != nil {
			klog.ErrorS(err, "AssumePodVolumes: fail to GetBindVolumeToClaim")
			b.revertAssumedPVs(newBindings)
			return false, err
		}
		// TODO: can we assume every time?
		if dirty {
			err = b.pvCache.Assume(newPV)
			if err != nil {
				b.revertAssumedPVs(newBindings)
				return false, err
			}
		}
		newBindings = append(newBindings, &BindingInfo{pv: newPV, pvc: binding.pvc})
	}

	// Assume PVCs
	newProvisionedPVCs := []*v1.PersistentVolumeClaim{}
	for _, claim := range podVolumes.DynamicProvisions {
		// The claims from method args can be pointing to watcher cache. We must not
		// modify these, therefore create a copy.
		claimClone := claim.DeepCopy()
		metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, volume.AnnSelectedNode, nodeName)
		err = b.pvcCache.Assume(claimClone)
		if err != nil {
			b.revertAssumedPVs(newBindings)
			b.revertAssumedPVCs(newProvisionedPVCs)
			return
		}

		newProvisionedPVCs = append(newProvisionedPVCs, claimClone)
	}

	podVolumes.StaticBindings = newBindings
	podVolumes.DynamicProvisions = newProvisionedPVCs
	return
}

// RevertAssumedPodVolumes will revert assumed PV and PVC cache.
func (b *volumeBinder) RevertAssumedPodVolumes(podVolumes *PodVolumes) {
	b.revertAssumedPVs(podVolumes.StaticBindings)
	b.revertAssumedPVCs(podVolumes.DynamicProvisions)
}

// BindPodVolumes gets the cached bindings and PVCs to provision in pod's volumes information,
// makes the API update for those PVs/PVCs, and waits for the PVCs to be completely bound
// by the PV controller.
func (b *volumeBinder) BindPodVolumes(ctx context.Context, assumedPod *v1.Pod, podVolumes *PodVolumes) (err error) {
	klog.V(4).InfoS("BindPodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", assumedPod.Spec.NodeName))

	defer func() {
		if err != nil {
			metrics.VolumeSchedulingStageFailed.WithLabelValues("bind").Inc()
		}
	}()

	bindings := podVolumes.StaticBindings
	claimsToProvision := podVolumes.DynamicProvisions

	// Start API operations
	err = b.bindAPIUpdate(ctx, assumedPod, bindings, claimsToProvision)
	if err != nil {
		return err
	}

	err = wait.Poll(time.Second, b.bindTimeout, func() (bool, error) {
		b, err := b.checkBindings(assumedPod, bindings, claimsToProvision)
		return b, err
	})
	if err != nil {
		return fmt.Errorf("binding volumes: %w", err)
	}
	return nil
}

func getPodName(pod *v1.Pod) string {
	return pod.Namespace + "/" + pod.Name
}

func getPVCName(pvc *v1.PersistentVolumeClaim) string {
	return pvc.Namespace + "/" + pvc.Name
}

// bindAPIUpdate makes the API update for those PVs/PVCs.
func (b *volumeBinder) bindAPIUpdate(ctx context.Context, pod *v1.Pod, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) error {
	podName := getPodName(pod)
	if bindings == nil {
		return fmt.Errorf("failed to get cached bindings for pod %q", podName)
	}
	if claimsToProvision == nil {
		return fmt.Errorf("failed to get cached claims to provision for pod %q", podName)
	}

	lastProcessedBinding := 0
	lastProcessedProvisioning := 0
	defer func() {
		// only revert assumed cached updates for volumes we haven't successfully bound
		if lastProcessedBinding < len(bindings) {
			b.revertAssumedPVs(bindings[lastProcessedBinding:])
		}
		// only revert assumed cached updates for claims we haven't updated,
		if lastProcessedProvisioning < len(claimsToProvision) {
			b.revertAssumedPVCs(claimsToProvision[lastProcessedProvisioning:])
		}
	}()

	var (
		binding *BindingInfo
		i       int
		claim   *v1.PersistentVolumeClaim
	)

	// Do the actual prebinding. Let the PV controller take care of the rest
	// There is no API rollback if the actual binding fails
	for _, binding = range bindings {
		klog.V(5).InfoS("bindAPIUpdate: binding PV to PVC", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc))
		// TODO: does it hurt if we make an api call and nothing needs to be updated?
		klog.V(2).InfoS("Claim bound to volume", "PVC", klog.KObj(binding.pvc), "PV", klog.KObj(binding.pv))
		newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(ctx, binding.pv, metav1.UpdateOptions{})
		if err != nil {
			klog.V(4).InfoS("Updating PersistentVolume: binding to claim failed", "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc), "err", err)
			return err
		}
		klog.V(4).InfoS("Updating PersistentVolume: bound to claim", "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc))
		// Save updated object from apiserver for later checking.
		binding.pv = newPV
		lastProcessedBinding++
	}

	// Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest
	// PV controller is expected to signal back by removing related annotations if actual provisioning fails
	for i, claim = range claimsToProvision {
		klog.V(5).InfoS("Updating claims objects to trigger volume provisioning", "pod", klog.KObj(pod), "PVC", klog.KObj(claim))
		newClaim, err := b.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
		if err != nil {
			return err
		}
		// Save updated object from apiserver for later checking.
		claimsToProvision[i] = newClaim
		lastProcessedProvisioning++
	}

	return nil
}

var (
	versioner = storage.APIObjectVersioner{}
)

// checkBindings runs through all the PVCs in the Pod and checks:
// * if the PVC is fully bound
// * if there are any conditions that require binding to fail and be retried
//
// It returns true when all of the Pod's PVCs are fully bound, and error if
// binding (and scheduling) needs to be retried
// Note that it checks on API objects not PV/PVC cache, this is because
// PV/PVC cache can be assumed again in main scheduler loop, we must check
// latest state in API server which are shared with PV controller and
// provisioners
func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) {
	podName := getPodName(pod)
	if bindings == nil {
		return false, fmt.Errorf("failed to get cached bindings for pod %q", podName)
	}
	if claimsToProvision == nil {
		return false, fmt.Errorf("failed to get cached claims to provision for pod %q", podName)
	}

	node, err := b.nodeLister.Get(pod.Spec.NodeName)
	if err != nil {
		return false, fmt.Errorf("failed to get node %q: %w", pod.Spec.NodeName, err)
	}

	csiNode, err := b.csiNodeLister.Get(node.Name)
	if err != nil {
		// TODO: return the error once CSINode is created by default
		klog.V(4).InfoS("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
	}

	// Check for any conditions that might require scheduling retry

	// When pod is deleted, binding operation should be cancelled. There is no
	// need to check PV/PVC bindings any more.
	_, err = b.podLister.Pods(pod.Namespace).Get(pod.Name)
	if err != nil {
		if apierrors.IsNotFound(err) {
			return false, fmt.Errorf("pod does not exist any more: %w", err)
		}
		klog.ErrorS(err, "Failed to get pod from the lister", "pod", klog.KObj(pod))
	}

	for _, binding := range bindings {
		pv, err := b.pvCache.GetAPIPV(binding.pv.Name)
		if err != nil {
			return false, fmt.Errorf("failed to check binding: %w", err)
		}

		pvc, err := b.pvcCache.GetAPIPVC(getPVCName(binding.pvc))
		if err != nil {
			return false, fmt.Errorf("failed to check binding: %w", err)
		}

		// Because we updated PV in apiserver, skip if API object is older
		// and wait for new API object propagated from apiserver.
		if versioner.CompareResourceVersion(binding.pv, pv) > 0 {
			return false, nil
		}

		pv, err = b.tryTranslatePVToCSI(pv, csiNode)
		if err != nil {
			return false, fmt.Errorf("failed to translate pv to csi: %w", err)
		}

		// Check PV's node affinity (the node might not have the proper label)
		if err := volume.CheckNodeAffinity(pv, node.Labels); err != nil {
			return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %w", pv.Name, node.Name, err)
		}

		// Check if pv.ClaimRef got dropped by unbindVolume()
		if pv.Spec.ClaimRef == nil || pv.Spec.ClaimRef.UID == "" {
			return false, fmt.Errorf("ClaimRef got reset for pv %q", pv.Name)
		}

		// Check if pvc is fully bound
		if !b.isPVCFullyBound(pvc) {
			return false, nil
		}
	}

	for _, claim := range claimsToProvision {
		pvc, err := b.pvcCache.GetAPIPVC(getPVCName(claim))
		if err != nil {
			return false, fmt.Errorf("failed to check provisioning pvc: %w", err)
		}

		// Because we updated PVC in apiserver, skip if API object is older
		// and wait for new API object propagated from apiserver.
		if versioner.CompareResourceVersion(claim, pvc) > 0 {
			return false, nil
		}

		// Check if selectedNode annotation is still set
		if pvc.Annotations == nil {
			return false, fmt.Errorf("selectedNode annotation reset for PVC %q", pvc.Name)
		}
		selectedNode := pvc.Annotations[volume.AnnSelectedNode]
		if selectedNode != pod.Spec.NodeName {
			// If provisioner fails to provision a volume, selectedNode
			// annotation will be removed to signal back to the scheduler to
			// retry.
			return false, fmt.Errorf("provisioning failed for PVC %q", pvc.Name)
		}

		// If the PVC is bound to a PV, check its node affinity
		if pvc.Spec.VolumeName != "" {
			pv, err := b.pvCache.GetAPIPV(pvc.Spec.VolumeName)
			if err != nil {
				if _, ok := err.(*errNotFound); ok {
					// We tolerate NotFound error here, because PV is possibly
					// not found because of API delay, we can check next time.
					// And if PV does not exist because it's deleted, PVC will
					// be unbound eventually.
					return false, nil
				}
				return false, fmt.Errorf("failed to get pv %q from cache: %w", pvc.Spec.VolumeName, err)
			}

			pv, err = b.tryTranslatePVToCSI(pv, csiNode)
			if err != nil {
				return false, err
			}

			if err := volume.CheckNodeAffinity(pv, node.Labels); err != nil {
				return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %w", pv.Name, node.Name, err)
			}
		}

		// Check if pvc is fully bound
		if !b.isPVCFullyBound(pvc) {
			return false, nil
		}
	}

	// All pvs and pvcs that we operated on are bound
	klog.V(4).InfoS("All PVCs for pod are bound", "pod", klog.KObj(pod))
	return true, nil
}

func (b *volumeBinder) isVolumeBound(pod *v1.Pod, vol *v1.Volume) (bound bool, pvc *v1.PersistentVolumeClaim, err error) {
	pvcName := ""
	isEphemeral := false
	switch {
	case vol.PersistentVolumeClaim != nil:
		pvcName = vol.PersistentVolumeClaim.ClaimName
	case vol.Ephemeral != nil:
		// Generic ephemeral inline volumes also use a PVC,
		// just with a computed name, and...
		pvcName = ephemeral.VolumeClaimName(pod, vol)
		isEphemeral = true
	default:
		return true, nil, nil
	}

	bound, pvc, err = b.isPVCBound(pod.Namespace, pvcName)
	// ... the PVC must be owned by the pod.
	if isEphemeral && err == nil && pvc != nil {
		if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil {
			return false, nil, err
		}
	}
	return
}

func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.PersistentVolumeClaim, error) {
	claim := &v1.PersistentVolumeClaim{
		ObjectMeta: metav1.ObjectMeta{
			Name:      pvcName,
			Namespace: namespace,
		},
	}
	pvcKey := getPVCName(claim)
	pvc, err := b.pvcCache.GetPVC(pvcKey)
	if err != nil || pvc == nil {
		return false, nil, fmt.Errorf("error getting PVC %q: %v", pvcKey, err)
	}

	fullyBound := b.isPVCFullyBound(pvc)
	if fullyBound {
		klog.V(5).InfoS("PVC is fully bound to PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvc.Spec.VolumeName))
	} else {
		if pvc.Spec.VolumeName != "" {
			klog.V(5).InfoS("PVC is not fully bound to PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvc.Spec.VolumeName))
		} else {
			klog.V(5).InfoS("PVC is not bound", "PVC", klog.KObj(pvc))
		}
	}
	return fullyBound, pvc, nil
}

func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool {
	return pvc.Spec.VolumeName != "" && metav1.HasAnnotation(pvc.ObjectMeta, volume.AnnBindCompleted)
}

// arePodVolumesBound returns true if all volumes are fully bound
func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool {
	for _, vol := range pod.Spec.Volumes {
		if isBound, _, _ := b.isVolumeBound(pod, &vol); !isBound {
			// Pod has at least one PVC that needs binding
			return false
		}
	}
	return true
}

// GetPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning)
// and unbound with immediate binding (including prebound)
func (b *volumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaimsDelayBinding []*v1.PersistentVolumeClaim, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
	boundClaims = []*v1.PersistentVolumeClaim{}
	unboundClaimsImmediate = []*v1.PersistentVolumeClaim{}
	unboundClaimsDelayBinding = []*v1.PersistentVolumeClaim{}

	for _, vol := range pod.Spec.Volumes {
		volumeBound, pvc, err := b.isVolumeBound(pod, &vol)
		if err != nil {
			return nil, nil, nil, err
		}
		if pvc == nil {
			continue
		}
		if volumeBound {
			boundClaims = append(boundClaims, pvc)
		} else {
			delayBindingMode, err := volume.IsDelayBindingMode(pvc, b.classLister)
			if err != nil {
				return nil, nil, nil, err
			}
			// Prebound PVCs are treated as unbound immediate binding
			if delayBindingMode && pvc.Spec.VolumeName == "" {
				// Scheduler path
				unboundClaimsDelayBinding = append(unboundClaimsDelayBinding, pvc)
			} else {
				// !delayBindingMode || pvc.Spec.VolumeName != ""
				// Immediate binding should have already been bound
				unboundClaimsImmediate = append(unboundClaimsImmediate, pvc)
			}
		}
	}
	return boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate, nil
}

func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node *v1.Node, pod *v1.Pod) (bool, bool, error) {
	csiNode, err := b.csiNodeLister.Get(node.Name)
	if err != nil {
		// TODO: return the error once CSINode is created by default
		klog.V(4).InfoS("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
	}

	for _, pvc := range claims {
		pvName := pvc.Spec.VolumeName
		pv, err := b.pvCache.GetPV(pvName)
		if err != nil {
			if _, ok := err.(*errNotFound); ok {
				err = nil
			}
			return true, false, err
		}

		pv, err = b.tryTranslatePVToCSI(pv, csiNode)
		if err != nil {
			return false, true, err
		}

		err = volume.CheckNodeAffinity(pv, node.Labels)
		if err != nil {
			klog.V(4).InfoS("PersistentVolume and node mismatch for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod), "err", err)
			return false, true, nil
		}
		klog.V(5).InfoS("PersistentVolume and node matches for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod))
	}

	klog.V(4).InfoS("All bound volumes for pod match with node", "pod", klog.KObj(pod), "node", klog.KObj(node))
	return true, true, nil
}

// findMatchingVolumes tries to find matching volumes for given claims,
// and return unbound claims for further provision.
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (foundMatches bool, bindings []*BindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) {
	// Sort all the claims by increasing size request to get the smallest fits
	sort.Sort(byPVCSize(claimsToBind))

	chosenPVs := map[string]*v1.PersistentVolume{}

	foundMatches = true

	for _, pvc := range claimsToBind {
		// Get storage class name from each PVC
		storageClassName := volume.GetPersistentVolumeClaimClass(pvc)
		allPVs := b.pvCache.ListPVs(storageClassName)

		// Find a matching PV
		pv, err := volume.FindMatchingVolume(pvc, allPVs, node, chosenPVs, true)
		if err != nil {
			return false, nil, nil, err
		}
		if pv == nil {
			klog.V(4).InfoS("No matching volumes for pod", "pod", klog.KObj(pod), "PVC", klog.KObj(pvc), "node", klog.KObj(node))
			unboundClaims = append(unboundClaims, pvc)
			foundMatches = false
			continue
		}

		// matching PV needs to be excluded so we don't select it again
		chosenPVs[pv.Name] = pv
		bindings = append(bindings, &BindingInfo{pv: pv, pvc: pvc})
		klog.V(5).InfoS("Found matching PV for PVC for pod", "PV", klog.KObj(pv), "PVC", klog.KObj(pvc), "node", klog.KObj(node), "pod", klog.KObj(pod))
	}

	if foundMatches {
		klog.V(4).InfoS("Found matching volumes for pod", "pod", klog.KObj(pod), "node", klog.KObj(node))
	}

	return
}

// checkVolumeProvisions checks given unbound claims (the claims have gone through func
// findMatchingVolumes, and do not have matching volumes for binding), and return true
// if all of the claims are eligible for dynamic provision.
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) {
	dynamicProvisions = []*v1.PersistentVolumeClaim{}

	// We return early with provisionedClaims == nil if a check
	// fails or we encounter an error.
	for _, claim := range claimsToProvision {
		pvcName := getPVCName(claim)
		className := volume.GetPersistentVolumeClaimClass(claim)
		if className == "" {
			return false, false, nil, fmt.Errorf("no class for claim %q", pvcName)
		}

		class, err := b.classLister.Get(className)
		if err != nil {
			return false, false, nil, fmt.Errorf("failed to find storage class %q", className)
		}
		provisioner := class.Provisioner
		if provisioner == "" || provisioner == volume.NotSupportedProvisioner {
			klog.V(4).InfoS("Storage class of claim does not support dynamic provisioning", "storageClassName", className, "PVC", klog.KObj(claim))
			return false, true, nil, nil
		}

		// Check if the node can satisfy the topology requirement in the class
		if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) {
			klog.V(4).InfoS("Node cannot satisfy provisioning topology requirements of claim", "node", klog.KObj(node), "PVC", klog.KObj(claim))
			return false, true, nil, nil
		}

		// Check storage capacity.
		sufficient, err := b.hasEnoughCapacity(provisioner, claim, class, node)
		if err != nil {
			return false, false, nil, err
		}
		if !sufficient {
			// hasEnoughCapacity logs an explanation.
			return true, false, nil, nil
		}

		dynamicProvisions = append(dynamicProvisions, claim)

	}
	klog.V(4).InfoS("Provisioning for claims of pod that has no matching volumes...", "claimCount", len(claimsToProvision), "pod", klog.KObj(pod), "node", klog.KObj(node))

	return true, true, dynamicProvisions, nil
}

func (b *volumeBinder) revertAssumedPVs(bindings []*BindingInfo) {
	for _, BindingInfo := range bindings {
		b.pvCache.Restore(BindingInfo.pv.Name)
	}
}

func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) {
	for _, claim := range claims {
		b.pvcCache.Restore(getPVCName(claim))
	}
}

// hasEnoughCapacity checks whether the provisioner has enough capacity left for a new volume of the given size
// that is available from the node.
func (b *volumeBinder) hasEnoughCapacity(provisioner string, claim *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass, node *v1.Node) (bool, error) {
	quantity, ok := claim.Spec.Resources.Requests[v1.ResourceStorage]
	if !ok {
		// No capacity to check for.
		return true, nil
	}

	// Only enabled for CSI drivers which opt into it.
	driver, err := b.csiDriverLister.Get(provisioner)
	if err != nil {
		if apierrors.IsNotFound(err) {
			// Either the provisioner is not a CSI driver or the driver does not
			// opt into storage capacity scheduling. Either way, skip
			// capacity checking.
			return true, nil
		}
		return false, err
	}
	if driver.Spec.StorageCapacity == nil || !*driver.Spec.StorageCapacity {
		return true, nil
	}

	// Look for a matching CSIStorageCapacity object(s).
	// TODO (for beta): benchmark this and potentially introduce some kind of lookup structure (https://github.com/kubernetes/enhancements/issues/1698#issuecomment-654356718).
	capacities, err := b.csiStorageCapacityLister.List(labels.Everything())
	if err != nil {
		return false, err
	}

	sizeInBytes := quantity.Value()
	for _, capacity := range capacities {
		if capacity.StorageClassName == storageClass.Name &&
			capacitySufficient(capacity, sizeInBytes) &&
			b.nodeHasAccess(node, capacity) {
			// Enough capacity found.
			return true, nil
		}
	}

	// TODO (?): this doesn't give any information about which pools where considered and why
	// they had to be rejected. Log that above? But that might be a lot of log output...
	klog.V(4).InfoS("Node has no accessible CSIStorageCapacity with enough capacity for PVC",
		"node", klog.KObj(node), "PVC", klog.KObj(claim), "size", sizeInBytes, "storageClass", klog.KObj(storageClass))
	return false, nil
}

func capacitySufficient(capacity *storagev1.CSIStorageCapacity, sizeInBytes int64) bool {
	limit := capacity.Capacity
	if capacity.MaximumVolumeSize != nil {
		// Prefer MaximumVolumeSize if available, it is more precise.
		limit = capacity.MaximumVolumeSize
	}
	return limit != nil && limit.Value() >= sizeInBytes
}

func (b *volumeBinder) nodeHasAccess(node *v1.Node, capacity *storagev1.CSIStorageCapacity) bool {
	if capacity.NodeTopology == nil {
		// Unavailable
		return false
	}
	// Only matching by label is supported.
	selector, err := metav1.LabelSelectorAsSelector(capacity.NodeTopology)
	if err != nil {
		klog.ErrorS(err, "Unexpected error converting to a label selector", "nodeTopology", capacity.NodeTopology)
		return false
	}
	return selector.Matches(labels.Set(node.Labels))
}

type byPVCSize []*v1.PersistentVolumeClaim

func (a byPVCSize) Len() int {
	return len(a)
}

func (a byPVCSize) Swap(i, j int) {
	a[i], a[j] = a[j], a[i]
}

func (a byPVCSize) Less(i, j int) bool {
	iSize := a[i].Spec.Resources.Requests[v1.ResourceStorage]
	jSize := a[j].Spec.Resources.Requests[v1.ResourceStorage]
	// return true if iSize is less than jSize
	return iSize.Cmp(jSize) == -1
}

// isCSIMigrationOnForPlugin checks if CSI migration is enabled for a given plugin.
func isCSIMigrationOnForPlugin(pluginName string) bool {
	switch pluginName {
	case csiplugins.AWSEBSInTreePluginName:
		return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS)
	case csiplugins.GCEPDInTreePluginName:
		return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE)
	case csiplugins.AzureDiskInTreePluginName:
		return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk)
	case csiplugins.PortworxVolumePluginName:
		return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationPortworx)
	case csiplugins.RBDVolumePluginName:
		return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationRBD)
	}
	return false
}

// isPluginMigratedToCSIOnNode checks if an in-tree plugin has been migrated to a CSI driver on the node.
func isPluginMigratedToCSIOnNode(pluginName string, csiNode *storagev1.CSINode) bool {
	if csiNode == nil {
		return false
	}

	csiNodeAnn := csiNode.GetAnnotations()
	if csiNodeAnn == nil {
		return false
	}

	var mpaSet sets.String
	mpa := csiNodeAnn[v1.MigratedPluginsAnnotationKey]
	if len(mpa) == 0 {
		mpaSet = sets.NewString()
	} else {
		tok := strings.Split(mpa, ",")
		mpaSet = sets.NewString(tok...)
	}

	return mpaSet.Has(pluginName)
}

// tryTranslatePVToCSI will translate the in-tree PV to CSI if it meets the criteria. If not, it returns the unmodified in-tree PV.
func (b *volumeBinder) tryTranslatePVToCSI(pv *v1.PersistentVolume, csiNode *storagev1.CSINode) (*v1.PersistentVolume, error) {
	if !b.translator.IsPVMigratable(pv) {
		return pv, nil
	}

	pluginName, err := b.translator.GetInTreePluginNameFromSpec(pv, nil)
	if err != nil {
		return nil, fmt.Errorf("could not get plugin name from pv: %v", err)
	}

	if !isCSIMigrationOnForPlugin(pluginName) {
		return pv, nil
	}

	if !isPluginMigratedToCSIOnNode(pluginName, csiNode) {
		return pv, nil
	}

	transPV, err := b.translator.TranslateInTreePVToCSI(pv)
	if err != nil {
		return nil, fmt.Errorf("could not translate pv: %v", err)
	}

	return transPV, nil
}

相关信息

kubernetes 源码目录

相关文章

kubernetes assume_cache 源码

kubernetes assume_cache_test 源码

kubernetes binder_test 源码

kubernetes fake_binder 源码

kubernetes scorer 源码

kubernetes scorer_test 源码

kubernetes test_utils 源码

kubernetes volume_binding 源码

kubernetes volume_binding_test 源码

0  赞