kubernetes instances 源码

  • 2022-09-18
  • 浏览 (209)

kubernetes instances 代码

文件路径:/staging/src/k8s.io/legacy-cloud-providers/aws/instances.go

//go:build !providerless
// +build !providerless

/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package aws

import (
	"fmt"
	"net/url"
	"regexp"
	"strings"
	"sync"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/ec2"
	"k8s.io/klog/v2"

	"k8s.io/api/core/v1"
)

// awsInstanceRegMatch represents Regex Match for AWS instance.
var awsInstanceRegMatch = regexp.MustCompile("^i-[^/]*$")

// InstanceID represents the ID of the instance in the AWS API, e.g. i-12345678
// The "traditional" format is "i-12345678"
// A new longer format is also being introduced: "i-12345678abcdef01"
// We should not assume anything about the length or format, though it seems
// reasonable to assume that instances will continue to start with "i-".
type InstanceID string

func (i InstanceID) awsString() *string {
	return aws.String(string(i))
}

// KubernetesInstanceID represents the id for an instance in the kubernetes API;
// the following form
//   - aws:///<zone>/<awsInstanceId>
//   - aws:////<awsInstanceId>
//   - <awsInstanceId>
type KubernetesInstanceID string

// MapToAWSInstanceID extracts the InstanceID from the KubernetesInstanceID
func (name KubernetesInstanceID) MapToAWSInstanceID() (InstanceID, error) {
	s := string(name)

	if !strings.HasPrefix(s, "aws://") {
		// Assume a bare aws volume id (vol-1234...)
		// Build a URL with an empty host (AZ)
		s = "aws://" + "/" + "/" + s
	}
	url, err := url.Parse(s)
	if err != nil {
		return "", fmt.Errorf("Invalid instance name (%s): %v", name, err)
	}
	if url.Scheme != "aws" {
		return "", fmt.Errorf("Invalid scheme for AWS instance (%s)", name)
	}

	awsID := ""
	tokens := strings.Split(strings.Trim(url.Path, "/"), "/")
	if len(tokens) == 1 {
		// instanceId
		awsID = tokens[0]
	} else if len(tokens) == 2 {
		// az/instanceId
		awsID = tokens[1]
	}

	// We sanity check the resulting volume; the two known formats are
	// i-12345678 and i-12345678abcdef01
	if awsID == "" || !awsInstanceRegMatch.MatchString(awsID) {
		return "", fmt.Errorf("Invalid format for AWS instance (%s)", name)
	}

	return InstanceID(awsID), nil
}

// mapToAWSInstanceID extracts the InstanceIDs from the Nodes, returning an error if a Node cannot be mapped
func mapToAWSInstanceIDs(nodes []*v1.Node) ([]InstanceID, error) {
	var instanceIDs []InstanceID
	for _, node := range nodes {
		if node.Spec.ProviderID == "" {
			return nil, fmt.Errorf("node %q did not have ProviderID set", node.Name)
		}
		instanceID, err := KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
		if err != nil {
			return nil, fmt.Errorf("unable to parse ProviderID %q for node %q", node.Spec.ProviderID, node.Name)
		}
		instanceIDs = append(instanceIDs, instanceID)
	}

	return instanceIDs, nil
}

// mapToAWSInstanceIDsTolerant extracts the InstanceIDs from the Nodes, skipping Nodes that cannot be mapped
func mapToAWSInstanceIDsTolerant(nodes []*v1.Node) []InstanceID {
	var instanceIDs []InstanceID
	for _, node := range nodes {
		if node.Spec.ProviderID == "" {
			klog.Warningf("node %q did not have ProviderID set", node.Name)
			continue
		}
		instanceID, err := KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
		if err != nil {
			klog.Warningf("unable to parse ProviderID %q for node %q", node.Spec.ProviderID, node.Name)
			continue
		}
		instanceIDs = append(instanceIDs, instanceID)
	}

	return instanceIDs
}

// Gets the full information about this instance from the EC2 API
func describeInstance(ec2Client EC2, instanceID InstanceID) (*ec2.Instance, error) {
	request := &ec2.DescribeInstancesInput{
		InstanceIds: []*string{instanceID.awsString()},
	}

	instances, err := ec2Client.DescribeInstances(request)
	if err != nil {
		return nil, err
	}
	if len(instances) == 0 {
		return nil, fmt.Errorf("no instances found for instance: %s", instanceID)
	}
	if len(instances) > 1 {
		return nil, fmt.Errorf("multiple instances found for instance: %s", instanceID)
	}
	return instances[0], nil
}

// instanceCache manages the cache of DescribeInstances
type instanceCache struct {
	// TODO: Get rid of this field, send all calls through the instanceCache
	cloud *Cloud

	mutex    sync.Mutex
	snapshot *allInstancesSnapshot
}

// Gets the full information about these instance from the EC2 API
func (c *instanceCache) describeAllInstancesUncached() (*allInstancesSnapshot, error) {
	now := time.Now()

	klog.V(4).Infof("EC2 DescribeInstances - fetching all instances")

	var filters []*ec2.Filter
	instances, err := c.cloud.describeInstances(filters)
	if err != nil {
		return nil, err
	}

	m := make(map[InstanceID]*ec2.Instance)
	for _, i := range instances {
		id := InstanceID(aws.StringValue(i.InstanceId))
		m[id] = i
	}

	snapshot := &allInstancesSnapshot{now, m}

	c.mutex.Lock()
	defer c.mutex.Unlock()

	if c.snapshot != nil && snapshot.olderThan(c.snapshot) {
		// If this happens a lot, we could run this function in a mutex and only return one result
		klog.Infof("Not caching concurrent AWS DescribeInstances results")
	} else {
		c.snapshot = snapshot
	}

	return snapshot, nil
}

// cacheCriteria holds criteria that must hold to use a cached snapshot
type cacheCriteria struct {
	// MaxAge indicates the maximum age of a cached snapshot we can accept.
	// If set to 0 (i.e. unset), cached values will not time out because of age.
	MaxAge time.Duration

	// HasInstances is a list of InstanceIDs that must be in a cached snapshot for it to be considered valid.
	// If an instance is not found in the cached snapshot, the snapshot be ignored and we will re-fetch.
	HasInstances []InstanceID
}

// describeAllInstancesCached returns all instances, using cached results if applicable
func (c *instanceCache) describeAllInstancesCached(criteria cacheCriteria) (*allInstancesSnapshot, error) {
	var err error
	snapshot := c.getSnapshot()
	if snapshot != nil && !snapshot.MeetsCriteria(criteria) {
		snapshot = nil
	}

	if snapshot == nil {
		snapshot, err = c.describeAllInstancesUncached()
		if err != nil {
			return nil, err
		}
	} else {
		klog.V(6).Infof("EC2 DescribeInstances - using cached results")
	}

	return snapshot, nil
}

// getSnapshot returns a snapshot if one exists
func (c *instanceCache) getSnapshot() *allInstancesSnapshot {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	return c.snapshot
}

// olderThan is a simple helper to encapsulate timestamp comparison
func (s *allInstancesSnapshot) olderThan(other *allInstancesSnapshot) bool {
	// After() is technically broken by time changes until we have monotonic time
	return other.timestamp.After(s.timestamp)
}

// MeetsCriteria returns true if the snapshot meets the criteria in cacheCriteria
func (s *allInstancesSnapshot) MeetsCriteria(criteria cacheCriteria) bool {
	if criteria.MaxAge > 0 {
		// Sub() is technically broken by time changes until we have monotonic time
		now := time.Now()
		if now.Sub(s.timestamp) > criteria.MaxAge {
			klog.V(6).Infof("instanceCache snapshot cannot be used as is older than MaxAge=%s", criteria.MaxAge)
			return false
		}
	}

	if len(criteria.HasInstances) != 0 {
		for _, id := range criteria.HasInstances {
			if nil == s.instances[id] {
				klog.V(6).Infof("instanceCache snapshot cannot be used as does not contain instance %s", id)
				return false
			}
		}
	}

	return true
}

// allInstancesSnapshot holds the results from querying for all instances,
// along with the timestamp for cache-invalidation purposes
type allInstancesSnapshot struct {
	timestamp time.Time
	instances map[InstanceID]*ec2.Instance
}

// FindInstances returns the instances corresponding to the specified ids.  If an id is not found, it is ignored.
func (s *allInstancesSnapshot) FindInstances(ids []InstanceID) map[InstanceID]*ec2.Instance {
	m := make(map[InstanceID]*ec2.Instance)
	for _, id := range ids {
		instance := s.instances[id]
		if instance != nil {
			m[id] = instance
		}
	}
	return m
}

相关信息

kubernetes 源码目录

相关文章

kubernetes aws 源码

kubernetes aws_assumerole_provider 源码

kubernetes aws_assumerole_provider_test 源码

kubernetes aws_fakes 源码

kubernetes aws_instancegroups 源码

kubernetes aws_loadbalancer 源码

kubernetes aws_loadbalancer_test 源码

kubernetes aws_metrics 源码

kubernetes aws_routes 源码

kubernetes aws_test 源码

0  赞