kubernetes expand_controller 源码
kubernetes expand_controller 代码
文件路径:/pkg/controller/volume/expand/expand_controller.go
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package expand
import (
"context"
"fmt"
"net"
"time"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
utilexec "k8s.io/utils/exec"
authenticationv1 "k8s.io/api/authentication/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/features"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csimigration"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/subpath"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
)
const (
// number of default volume expansion workers
defaultWorkerCount = 10
)
// ExpandController expands the pvs
type ExpandController interface {
Run(ctx context.Context)
}
// CSINameTranslator can get the CSI Driver name based on the in-tree plugin name
type CSINameTranslator interface {
GetCSINameFromInTreeName(pluginName string) (string, error)
}
type expandController struct {
// kubeClient is the kube API client used by volumehost to communicate with
// the API server.
kubeClient clientset.Interface
// pvcLister is the shared PVC lister used to fetch and store PVC
// objects from the API server. It is shared with other controllers and
// therefore the PVC objects in its store should be treated as immutable.
pvcLister corelisters.PersistentVolumeClaimLister
pvcsSynced cache.InformerSynced
pvLister corelisters.PersistentVolumeLister
pvSynced cache.InformerSynced
// cloud provider used by volume host
cloud cloudprovider.Interface
// volumePluginMgr used to initialize and fetch volume plugins
volumePluginMgr volume.VolumePluginMgr
// recorder is used to record events in the API server
recorder record.EventRecorder
operationGenerator operationexecutor.OperationGenerator
queue workqueue.RateLimitingInterface
translator CSINameTranslator
csiMigratedPluginManager csimigration.PluginManager
filteredDialOptions *proxyutil.FilteredDialOptions
}
// NewExpandController expands the pvs
func NewExpandController(
kubeClient clientset.Interface,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
cloud cloudprovider.Interface,
plugins []volume.VolumePlugin,
translator CSINameTranslator,
csiMigratedPluginManager csimigration.PluginManager,
filteredDialOptions *proxyutil.FilteredDialOptions) (ExpandController, error) {
expc := &expandController{
kubeClient: kubeClient,
cloud: cloud,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvSynced: pvInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"),
translator: translator,
csiMigratedPluginManager: csiMigratedPluginManager,
filteredDialOptions: filteredDialOptions,
}
if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil {
return nil, fmt.Errorf("could not initialize volume plugins for Expand Controller : %+v", err)
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
expc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"})
blkutil := volumepathhandler.NewBlockVolumePathHandler()
expc.operationGenerator = operationexecutor.NewOperationGenerator(
kubeClient,
&expc.volumePluginMgr,
expc.recorder,
blkutil)
pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: expc.enqueuePVC,
UpdateFunc: func(old, new interface{}) {
oldPVC, ok := old.(*v1.PersistentVolumeClaim)
if !ok {
return
}
oldReq := oldPVC.Spec.Resources.Requests[v1.ResourceStorage]
oldCap := oldPVC.Status.Capacity[v1.ResourceStorage]
newPVC, ok := new.(*v1.PersistentVolumeClaim)
if !ok {
return
}
newReq := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
newCap := newPVC.Status.Capacity[v1.ResourceStorage]
// PVC will be enqueued under 2 circumstances
// 1. User has increased PVC's request capacity --> volume needs to be expanded
// 2. PVC status capacity has been expanded --> claim's bound PV has likely recently gone through filesystem resize, so remove AnnPreResizeCapacity annotation from PV
if newReq.Cmp(oldReq) > 0 || newCap.Cmp(oldCap) > 0 {
expc.enqueuePVC(new)
}
},
DeleteFunc: expc.enqueuePVC,
})
return expc, nil
}
func (expc *expandController) enqueuePVC(obj interface{}) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return
}
if pvc.Status.Phase == v1.ClaimBound {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err))
return
}
expc.queue.Add(key)
}
}
func (expc *expandController) processNextWorkItem(ctx context.Context) bool {
key, shutdown := expc.queue.Get()
if shutdown {
return false
}
defer expc.queue.Done(key)
err := expc.syncHandler(ctx, key.(string))
if err == nil {
expc.queue.Forget(key)
return true
}
runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
expc.queue.AddRateLimited(key)
return true
}
// syncHandler performs actual expansion of volume. If an error is returned
// from this function - PVC will be requeued for resizing.
func (expc *expandController) syncHandler(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
pvc, err := expc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
klog.V(5).Infof("Error getting PVC %q from informer : %v", key, err)
return err
}
pv, err := expc.getPersistentVolume(ctx, pvc)
if err != nil {
klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", key, pvc.UID, err)
return err
}
if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID {
err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", key)
klog.V(4).Infof("%v", err)
return err
}
pvcRequestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage]
// call expand operation only under two condition
// 1. pvc's request size has been expanded and is larger than pvc's current status size
// 2. pv has an pre-resize capacity annotation
if pvcRequestSize.Cmp(pvcStatusSize) <= 0 && !metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) {
return nil
}
volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
migratable, err := expc.csiMigratedPluginManager.IsMigratable(volumeSpec)
if err != nil {
klog.V(4).Infof("failed to check CSI migration status for PVC: %s with error: %v", key, err)
return nil
}
// handle CSI migration scenarios before invoking FindExpandablePluginBySpec for in-tree
if migratable {
inTreePluginName, err := expc.csiMigratedPluginManager.GetInTreePluginNameFromSpec(volumeSpec.PersistentVolume, volumeSpec.Volume)
if err != nil {
klog.V(4).Infof("Error getting in-tree plugin name from persistent volume %s: %v", volumeSpec.PersistentVolume.Name, err)
return err
}
msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", inTreePluginName)
expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg)
csiResizerName, err := expc.translator.GetCSINameFromInTreeName(inTreePluginName)
if err != nil {
errorMsg := fmt.Sprintf("error getting CSI driver name for pvc %s, with error %v", key, err)
expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
return fmt.Errorf(errorMsg)
}
pvc, err := util.SetClaimResizer(pvc, csiResizerName, expc.kubeClient)
if err != nil {
errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", key, err)
expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
return fmt.Errorf(errorMsg)
}
return nil
}
volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
if err != nil || volumePlugin == nil {
msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " +
"waiting for an external controller to process this PVC")
eventType := v1.EventTypeNormal
if err != nil {
eventType = v1.EventTypeWarning
}
expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg))
klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", key, pvc.UID, msg)
// If we are expecting that an external plugin will handle resizing this volume then
// is no point in requeuing this PVC.
return nil
}
volumeResizerName := volumePlugin.GetPluginName()
return expc.expand(pvc, pv, volumeResizerName)
}
func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error {
// if node expand is complete and pv's annotation can be removed, remove the annotation from pv and return
if expc.isNodeExpandComplete(pvc, pv) && metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) {
return util.DeleteAnnPreResizeCapacity(pv, expc.GetKubeClient())
}
var generatedOptions volumetypes.GeneratedOperations
var err error
if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
generatedOptions, err = expc.operationGenerator.GenerateExpandAndRecoverVolumeFunc(pvc, pv, resizerName)
if err != nil {
klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return err
}
} else {
pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient)
if err != nil {
klog.Errorf("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return err
}
generatedOptions, err = expc.operationGenerator.GenerateExpandVolumeFunc(pvc, pv)
if err != nil {
klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return err
}
}
klog.V(5).Infof("Starting ExpandVolume for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
_, detailedErr := generatedOptions.Run()
return detailedErr
}
// TODO make concurrency configurable (workers argument). previously, nestedpendingoperations spawned unlimited goroutines
func (expc *expandController) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer expc.queue.ShutDown()
klog.Infof("Starting expand controller")
defer klog.Infof("Shutting down expand controller")
if !cache.WaitForNamedCacheSync("expand", ctx.Done(), expc.pvcsSynced, expc.pvSynced) {
return
}
for i := 0; i < defaultWorkerCount; i++ {
go wait.UntilWithContext(ctx, expc.runWorker, time.Second)
}
<-ctx.Done()
}
func (expc *expandController) runWorker(ctx context.Context) {
for expc.processNextWorkItem(ctx) {
}
}
func (expc *expandController) getPersistentVolume(ctx context.Context, pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
volumeName := pvc.Spec.VolumeName
pv, err := expc.kubeClient.CoreV1().PersistentVolumes().Get(ctx, volumeName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get PV %q: %v", volumeName, err)
}
return pv.DeepCopy(), nil
}
// isNodeExpandComplete returns true if pvc.Status.Capacity >= pv.Spec.Capacity
func (expc *expandController) isNodeExpandComplete(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
klog.V(4).Infof("pv %q capacity = %v, pvc %s capacity = %v", pv.Name, pv.Spec.Capacity[v1.ResourceStorage], pvc.ObjectMeta.Name, pvc.Status.Capacity[v1.ResourceStorage])
pvcSpecCap := pvc.Spec.Resources.Requests.Storage()
pvcStatusCap, pvCap := pvc.Status.Capacity[v1.ResourceStorage], pv.Spec.Capacity[v1.ResourceStorage]
// since we allow shrinking volumes, we must compare both pvc status and capacity
// with pv spec capacity.
if pvcStatusCap.Cmp(*pvcSpecCap) >= 0 && pvcStatusCap.Cmp(pvCap) >= 0 {
return true
}
return false
}
// Implementing VolumeHost interface
func (expc *expandController) GetPluginDir(pluginName string) string {
return ""
}
func (expc *expandController) GetVolumeDevicePluginDir(pluginName string) string {
return ""
}
func (expc *expandController) GetPodsDir() string {
return ""
}
func (expc *expandController) GetHostIDsForPod(pod *v1.Pod, containerUID, containerGID *int64) (hostUID, hostGID *int64, err error) {
return nil, nil, nil
}
func (expc *expandController) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
return ""
}
func (expc *expandController) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
return ""
}
func (expc *expandController) GetPodPluginDir(podUID types.UID, pluginName string) string {
return ""
}
func (expc *expandController) GetKubeClient() clientset.Interface {
return expc.kubeClient
}
func (expc *expandController) NewWrapperMounter(volName string, spec volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
return nil, fmt.Errorf("NewWrapperMounter not supported by expand controller's VolumeHost implementation")
}
func (expc *expandController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
return nil, fmt.Errorf("NewWrapperUnmounter not supported by expand controller's VolumeHost implementation")
}
func (expc *expandController) GetCloudProvider() cloudprovider.Interface {
return expc.cloud
}
func (expc *expandController) GetMounter(pluginName string) mount.Interface {
return nil
}
func (expc *expandController) GetExec(pluginName string) utilexec.Interface {
return utilexec.New()
}
func (expc *expandController) GetHostName() string {
return ""
}
func (expc *expandController) GetHostIP() (net.IP, error) {
return nil, fmt.Errorf("GetHostIP not supported by expand controller's VolumeHost implementation")
}
func (expc *expandController) GetNodeAllocatable() (v1.ResourceList, error) {
return v1.ResourceList{}, nil
}
func (expc *expandController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
return func(_, _ string) (*v1.Secret, error) {
return nil, fmt.Errorf("GetSecret unsupported in expandController")
}
}
func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
return func(_, _ string) (*v1.ConfigMap, error) {
return nil, fmt.Errorf("GetConfigMap unsupported in expandController")
}
}
func (expc *expandController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
return map[v1.UniqueVolumeName]string{}, nil
}
func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
return nil, fmt.Errorf("GetServiceAccountToken unsupported in expandController")
}
}
func (expc *expandController) DeleteServiceAccountTokenFunc() func(types.UID) {
return func(types.UID) {
klog.Errorf("DeleteServiceAccountToken unsupported in expandController")
}
}
func (expc *expandController) GetNodeLabels() (map[string]string, error) {
return nil, fmt.Errorf("GetNodeLabels unsupported in expandController")
}
func (expc *expandController) GetNodeName() types.NodeName {
return ""
}
func (expc *expandController) GetEventRecorder() record.EventRecorder {
return expc.recorder
}
func (expc *expandController) GetSubpather() subpath.Interface {
// not needed for expand controller
return nil
}
func (expc *expandController) GetFilteredDialOptions() *proxyutil.FilteredDialOptions {
return expc.filteredDialOptions
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦