kubernetes cri_stats_provider 源码

  • 2022-09-18
  • 浏览 (462)

kubernetes cri_stats_provider 代码

文件路径:/pkg/kubelet/stats/cri_stats_provider.go

/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package stats

import (
	"errors"
	"fmt"
	"path"
	"sort"
	"strings"
	"sync"
	"time"

	cadvisorfs "github.com/google/cadvisor/fs"
	cadvisorapiv2 "github.com/google/cadvisor/info/v2"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/types"
	internalapi "k8s.io/cri-api/pkg/apis"
	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
	"k8s.io/klog/v2"
	statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
	"k8s.io/kubernetes/pkg/kubelet/server/stats"
	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
	"k8s.io/utils/clock"
)

var (
	// defaultCachePeriod is the default cache period for each cpuUsage.
	defaultCachePeriod = 10 * time.Minute
)

// cpuUsageRecord holds the cpu usage stats and the calculated usageNanoCores.
type cpuUsageRecord struct {
	stats          *runtimeapi.CpuUsage
	usageNanoCores *uint64
}

// criStatsProvider implements the containerStatsProvider interface by getting
// the container stats from CRI.
type criStatsProvider struct {
	// cadvisor is used to get the node root filesystem's stats (such as the
	// capacity/available bytes/inodes) that will be populated in per container
	// filesystem stats.
	cadvisor cadvisor.Interface
	// resourceAnalyzer is used to get the volume stats of the pods.
	resourceAnalyzer stats.ResourceAnalyzer
	// runtimeService is used to get the status and stats of the pods and its
	// managed containers.
	runtimeService internalapi.RuntimeService
	// imageService is used to get the stats of the image filesystem.
	imageService internalapi.ImageManagerService
	// hostStatsProvider is used to get the status of the host filesystem consumed by pods.
	hostStatsProvider HostStatsProvider
	// windowsNetworkStatsProvider is used by kubelet to gather networking stats on Windows
	windowsNetworkStatsProvider interface{} //nolint:unused // U1000 We can't import hcsshim due to Build constraints in hcsshim
	// clock is used report current time
	clock clock.Clock

	// cpuUsageCache caches the cpu usage for containers.
	cpuUsageCache               map[string]*cpuUsageRecord
	mutex                       sync.RWMutex
	podAndContainerStatsFromCRI bool
}

// newCRIStatsProvider returns a containerStatsProvider implementation that
// provides container stats using CRI.
func newCRIStatsProvider(
	cadvisor cadvisor.Interface,
	resourceAnalyzer stats.ResourceAnalyzer,
	runtimeService internalapi.RuntimeService,
	imageService internalapi.ImageManagerService,
	hostStatsProvider HostStatsProvider,
	podAndContainerStatsFromCRI bool,
) containerStatsProvider {
	return &criStatsProvider{
		cadvisor:                    cadvisor,
		resourceAnalyzer:            resourceAnalyzer,
		runtimeService:              runtimeService,
		imageService:                imageService,
		hostStatsProvider:           hostStatsProvider,
		cpuUsageCache:               make(map[string]*cpuUsageRecord),
		podAndContainerStatsFromCRI: podAndContainerStatsFromCRI,
		clock:                       clock.RealClock{},
	}
}

// ListPodStats returns the stats of all the pod-managed containers.
func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
	// Don't update CPU nano core usage.
	return p.listPodStats(false)
}

// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
// This is a workaround because CRI runtimes do not supply nano core usages,
// so this function calculate the difference between the current and the last
// (cached) cpu stats to calculate this metrics. The implementation assumes a
// single caller to periodically invoke this function to update the metrics. If
// there exist multiple callers, the period used to compute the cpu usage may
// vary and the usage could be incoherent (e.g., spiky). If no caller calls
// this function, the cpu usage will stay nil. Right now, eviction manager is
// the only caller, and it calls this function every 10s.
func (p *criStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
	// Update CPU nano core usage.
	return p.listPodStats(true)
}

func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]statsapi.PodStats, error) {
	// Gets node root filesystem information, which will be used to populate
	// the available and capacity bytes/inodes in container stats.
	rootFsInfo, err := p.cadvisor.RootFsInfo()
	if err != nil {
		return nil, fmt.Errorf("failed to get rootFs info: %v", err)
	}

	containerMap, podSandboxMap, err := p.getPodAndContainerMaps()
	if err != nil {
		return nil, fmt.Errorf("failed to get pod or container map: %v", err)
	}

	if p.podAndContainerStatsFromCRI {
		_, err := p.listPodStatsStrictlyFromCRI(updateCPUNanoCoreUsage, containerMap, podSandboxMap, &rootFsInfo)
		if err != nil {
			s, ok := status.FromError(err)
			// Legitimate failure, rather than the CRI implementation does not support ListPodSandboxStats.
			if !ok || s.Code() != codes.Unimplemented {
				return nil, err
			}
			// CRI implementation doesn't support ListPodSandboxStats, warn and fallback.
			klog.V(5).ErrorS(err,
				"CRI implementation must be updated to support ListPodSandboxStats if PodAndContainerStatsFromCRI feature gate is enabled. Falling back to populating with cAdvisor; this call will fail in the future.",
			)
		}
	}
	return p.listPodStatsPartiallyFromCRI(updateCPUNanoCoreUsage, containerMap, podSandboxMap, &rootFsInfo)
}

func (p *criStatsProvider) listPodStatsPartiallyFromCRI(updateCPUNanoCoreUsage bool, containerMap map[string]*runtimeapi.Container, podSandboxMap map[string]*runtimeapi.PodSandbox, rootFsInfo *cadvisorapiv2.FsInfo) ([]statsapi.PodStats, error) {
	// fsIDtoInfo is a map from filesystem id to its stats. This will be used
	// as a cache to avoid querying cAdvisor for the filesystem stats with the
	// same filesystem id many times.
	fsIDtoInfo := make(map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo)

	// sandboxIDToPodStats is a temporary map from sandbox ID to its pod stats.
	sandboxIDToPodStats := make(map[string]*statsapi.PodStats)

	resp, err := p.runtimeService.ListContainerStats(&runtimeapi.ContainerStatsFilter{})
	if err != nil {
		return nil, fmt.Errorf("failed to list all container stats: %v", err)
	}
	allInfos, err := getCadvisorContainerInfo(p.cadvisor)
	if err != nil {
		return nil, fmt.Errorf("failed to fetch cadvisor stats: %v", err)
	}
	caInfos, allInfos := getCRICadvisorStats(allInfos)

	// get network stats for containers.
	// This is only used on Windows. For other platforms, (nil, nil) should be returned.
	containerNetworkStats, err := p.listContainerNetworkStats()
	if err != nil {
		return nil, fmt.Errorf("failed to list container network stats: %v", err)
	}

	for _, stats := range resp {
		containerID := stats.Attributes.Id
		container, found := containerMap[containerID]
		if !found {
			continue
		}

		podSandboxID := container.PodSandboxId
		podSandbox, found := podSandboxMap[podSandboxID]
		if !found {
			continue
		}

		// Creates the stats of the pod (if not created yet) which the
		// container belongs to.
		ps, found := sandboxIDToPodStats[podSandboxID]
		if !found {
			ps = buildPodStats(podSandbox)
			sandboxIDToPodStats[podSandboxID] = ps
		}

		// Fill available stats for full set of required pod stats
		cs := p.makeContainerStats(stats, container, rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata(), updateCPUNanoCoreUsage)
		p.addPodNetworkStats(ps, podSandboxID, caInfos, cs, containerNetworkStats[podSandboxID])
		p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
		p.addProcessStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)

		// If cadvisor stats is available for the container, use it to populate
		// container stats
		caStats, caFound := caInfos[containerID]
		if !caFound {
			klog.V(5).InfoS("Unable to find cadvisor stats for container", "containerID", containerID)
		} else {
			p.addCadvisorContainerStats(cs, &caStats)
		}
		ps.Containers = append(ps.Containers, *cs)
	}
	// cleanup outdated caches.
	p.cleanupOutdatedCaches()

	result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats))
	for _, s := range sandboxIDToPodStats {
		makePodStorageStats(s, rootFsInfo, p.resourceAnalyzer, p.hostStatsProvider, true)
		result = append(result, *s)
	}
	return result, nil
}

func (p *criStatsProvider) listPodStatsStrictlyFromCRI(updateCPUNanoCoreUsage bool, containerMap map[string]*runtimeapi.Container, podSandboxMap map[string]*runtimeapi.PodSandbox, rootFsInfo *cadvisorapiv2.FsInfo) ([]statsapi.PodStats, error) {
	criSandboxStats, err := p.runtimeService.ListPodSandboxStats(&runtimeapi.PodSandboxStatsFilter{})
	if err != nil {
		return nil, err
	}

	fsIDtoInfo := make(map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo)
	summarySandboxStats := make([]statsapi.PodStats, 0, len(podSandboxMap))
	for _, criSandboxStat := range criSandboxStats {
		if criSandboxStat == nil || criSandboxStat.Attributes == nil {
			klog.V(5).InfoS("Unable to find CRI stats for sandbox")
			continue
		}
		podSandbox, found := podSandboxMap[criSandboxStat.Attributes.Id]
		if !found {
			continue
		}
		ps := buildPodStats(podSandbox)
		for _, criContainerStat := range criSandboxStat.Linux.Containers {
			container, found := containerMap[criContainerStat.Attributes.Id]
			if !found {
				continue
			}
			// Fill available stats for full set of required pod stats
			cs := p.makeContainerStats(criContainerStat, container, rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata(), updateCPUNanoCoreUsage)
			ps.Containers = append(ps.Containers, *cs)
		}
		addCRIPodNetworkStats(ps, criSandboxStat)
		addCRIPodCPUStats(ps, criSandboxStat)
		addCRIPodMemoryStats(ps, criSandboxStat)
		addCRIPodProcessStats(ps, criSandboxStat)
		makePodStorageStats(ps, rootFsInfo, p.resourceAnalyzer, p.hostStatsProvider, true)
		summarySandboxStats = append(summarySandboxStats, *ps)
	}
	return summarySandboxStats, nil
}

// ListPodCPUAndMemoryStats returns the CPU and Memory stats of all the pod-managed containers.
func (p *criStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) {
	// sandboxIDToPodStats is a temporary map from sandbox ID to its pod stats.
	sandboxIDToPodStats := make(map[string]*statsapi.PodStats)
	containerMap, podSandboxMap, err := p.getPodAndContainerMaps()
	if err != nil {
		return nil, fmt.Errorf("failed to get pod or container map: %v", err)
	}

	result := make([]statsapi.PodStats, 0, len(podSandboxMap))
	if p.podAndContainerStatsFromCRI {
		criSandboxStats, err := p.runtimeService.ListPodSandboxStats(&runtimeapi.PodSandboxStatsFilter{})
		// Call succeeded
		if err == nil {
			for _, criSandboxStat := range criSandboxStats {
				podSandbox, found := podSandboxMap[criSandboxStat.Attributes.Id]
				if !found {
					continue
				}
				ps := buildPodStats(podSandbox)
				addCRIPodCPUStats(ps, criSandboxStat)
				addCRIPodMemoryStats(ps, criSandboxStat)
				result = append(result, *ps)
			}
			return result, err
		}
		// Call failed, why?
		s, ok := status.FromError(err)
		// Legitimate failure, rather than the CRI implementation does not support ListPodSandboxStats.
		if !ok || s.Code() != codes.Unimplemented {
			return nil, err
		}
		// CRI implementation doesn't support ListPodSandboxStats, warn and fallback.
		klog.ErrorS(err,
			"CRI implementation must be updated to support ListPodSandboxStats if PodAndContainerStatsFromCRI feature gate is enabled. Falling back to populating with cAdvisor; this call will fail in the future.",
		)
	}

	resp, err := p.runtimeService.ListContainerStats(&runtimeapi.ContainerStatsFilter{})
	if err != nil {
		return nil, fmt.Errorf("failed to list all container stats: %v", err)
	}

	allInfos, err := getCadvisorContainerInfo(p.cadvisor)
	if err != nil {
		return nil, fmt.Errorf("failed to fetch cadvisor stats: %v", err)
	}
	caInfos, allInfos := getCRICadvisorStats(allInfos)

	for _, stats := range resp {
		containerID := stats.Attributes.Id
		container, found := containerMap[containerID]
		if !found {
			continue
		}

		podSandboxID := container.PodSandboxId
		podSandbox, found := podSandboxMap[podSandboxID]
		if !found {
			continue
		}

		// Creates the stats of the pod (if not created yet) which the
		// container belongs to.
		ps, found := sandboxIDToPodStats[podSandboxID]
		if !found {
			ps = buildPodStats(podSandbox)
			sandboxIDToPodStats[podSandboxID] = ps
		}

		// Fill available CPU and memory stats for full set of required pod stats
		cs := p.makeContainerCPUAndMemoryStats(stats, container)
		p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)

		// If cadvisor stats is available for the container, use it to populate
		// container stats
		caStats, caFound := caInfos[containerID]
		if !caFound {
			klog.V(4).InfoS("Unable to find cadvisor stats for container", "containerID", containerID)
		} else {
			p.addCadvisorContainerCPUAndMemoryStats(cs, &caStats)
		}
		ps.Containers = append(ps.Containers, *cs)
	}
	// cleanup outdated caches.
	p.cleanupOutdatedCaches()

	for _, s := range sandboxIDToPodStats {
		result = append(result, *s)
	}
	return result, nil
}

func (p *criStatsProvider) getPodAndContainerMaps() (map[string]*runtimeapi.Container, map[string]*runtimeapi.PodSandbox, error) {
	containers, err := p.runtimeService.ListContainers(&runtimeapi.ContainerFilter{})
	if err != nil {
		return nil, nil, fmt.Errorf("failed to list all containers: %v", err)
	}

	// Creates pod sandbox map between the pod sandbox ID and the PodSandbox object.
	podSandboxMap := make(map[string]*runtimeapi.PodSandbox)
	podSandboxes, err := p.runtimeService.ListPodSandbox(&runtimeapi.PodSandboxFilter{})
	if err != nil {
		return nil, nil, fmt.Errorf("failed to list all pod sandboxes: %v", err)
	}
	podSandboxes = removeTerminatedPods(podSandboxes)
	for _, s := range podSandboxes {
		podSandboxMap[s.Id] = s
	}

	containers = removeTerminatedContainers(containers)
	// Creates container map between the container ID and the Container object.
	containerMap := make(map[string]*runtimeapi.Container)
	for _, c := range containers {
		containerMap[c.Id] = c
	}
	return containerMap, podSandboxMap, nil
}

// ImageFsStats returns the stats of the image filesystem.
func (p *criStatsProvider) ImageFsStats() (*statsapi.FsStats, error) {
	resp, err := p.imageService.ImageFsInfo()
	if err != nil {
		return nil, err
	}

	// CRI may return the stats of multiple image filesystems but we only
	// return the first one.
	//
	// TODO(yguo0905): Support returning stats of multiple image filesystems.
	if len(resp) == 0 {
		return nil, fmt.Errorf("imageFs information is unavailable")
	}
	fs := resp[0]
	s := &statsapi.FsStats{
		Time:      metav1.NewTime(time.Unix(0, fs.Timestamp)),
		UsedBytes: &fs.UsedBytes.Value,
	}
	if fs.InodesUsed != nil {
		s.InodesUsed = &fs.InodesUsed.Value
	}
	imageFsInfo := p.getFsInfo(fs.GetFsId())
	if imageFsInfo != nil {
		// The image filesystem id is unknown to the local node or there's
		// an error on retrieving the stats. In these cases, we omit those
		// stats and return the best-effort partial result. See
		// https://github.com/kubernetes/heapster/issues/1793.
		s.AvailableBytes = &imageFsInfo.Available
		s.CapacityBytes = &imageFsInfo.Capacity
		s.InodesFree = imageFsInfo.InodesFree
		s.Inodes = imageFsInfo.Inodes
	}
	return s, nil
}

// ImageFsDevice returns name of the device where the image filesystem locates,
// e.g. /dev/sda1.
func (p *criStatsProvider) ImageFsDevice() (string, error) {
	resp, err := p.imageService.ImageFsInfo()
	if err != nil {
		return "", err
	}
	for _, fs := range resp {
		fsInfo := p.getFsInfo(fs.GetFsId())
		if fsInfo != nil {
			return fsInfo.Device, nil
		}
	}
	return "", errors.New("imagefs device is not found")
}

// getFsInfo returns the information of the filesystem with the specified
// fsID. If any error occurs, this function logs the error and returns
// nil.
func (p *criStatsProvider) getFsInfo(fsID *runtimeapi.FilesystemIdentifier) *cadvisorapiv2.FsInfo {
	if fsID == nil {
		klog.V(2).InfoS("Failed to get filesystem info: fsID is nil")
		return nil
	}
	mountpoint := fsID.GetMountpoint()
	fsInfo, err := p.cadvisor.GetDirFsInfo(mountpoint)
	if err != nil {
		msg := "Failed to get the info of the filesystem with mountpoint"
		if err == cadvisorfs.ErrNoSuchDevice {
			klog.V(2).InfoS(msg, "mountpoint", mountpoint, "err", err)
		} else {
			klog.ErrorS(err, msg, "mountpoint", mountpoint)
		}
		return nil
	}
	return &fsInfo
}

// buildPodStats returns a PodStats that identifies the Pod managing cinfo
func buildPodStats(podSandbox *runtimeapi.PodSandbox) *statsapi.PodStats {
	return &statsapi.PodStats{
		PodRef: statsapi.PodReference{
			Name:      podSandbox.Metadata.Name,
			UID:       podSandbox.Metadata.Uid,
			Namespace: podSandbox.Metadata.Namespace,
		},
		// The StartTime in the summary API is the pod creation time.
		StartTime: metav1.NewTime(time.Unix(0, podSandbox.CreatedAt)),
	}
}

func (p *criStatsProvider) addPodNetworkStats(
	ps *statsapi.PodStats,
	podSandboxID string,
	caInfos map[string]cadvisorapiv2.ContainerInfo,
	cs *statsapi.ContainerStats,
	netStats *statsapi.NetworkStats,
) {
	caPodSandbox, found := caInfos[podSandboxID]
	// try get network stats from cadvisor first.
	if found {
		networkStats := cadvisorInfoToNetworkStats(&caPodSandbox)
		if networkStats != nil {
			ps.Network = networkStats
			return
		}
	}

	// Not found from cadvisor, get from netStats.
	if netStats != nil {
		ps.Network = netStats
		return
	}

	// TODO: sum Pod network stats from container stats.
	klog.V(4).InfoS("Unable to find network stats for sandbox", "sandboxID", podSandboxID)
}

func (p *criStatsProvider) addPodCPUMemoryStats(
	ps *statsapi.PodStats,
	podUID types.UID,
	allInfos map[string]cadvisorapiv2.ContainerInfo,
	cs *statsapi.ContainerStats,
) {
	// try get cpu and memory stats from cadvisor first.
	podCgroupInfo := getCadvisorPodInfoFromPodUID(podUID, allInfos)
	if podCgroupInfo != nil {
		cpu, memory := cadvisorInfoToCPUandMemoryStats(podCgroupInfo)
		ps.CPU = cpu
		ps.Memory = memory
		return
	}

	// Sum Pod cpu and memory stats from containers stats.
	if cs.CPU != nil {
		if ps.CPU == nil {
			ps.CPU = &statsapi.CPUStats{}
		}

		ps.CPU.Time = cs.CPU.Time
		usageCoreNanoSeconds := getUint64Value(cs.CPU.UsageCoreNanoSeconds) + getUint64Value(ps.CPU.UsageCoreNanoSeconds)
		usageNanoCores := getUint64Value(cs.CPU.UsageNanoCores) + getUint64Value(ps.CPU.UsageNanoCores)
		ps.CPU.UsageCoreNanoSeconds = &usageCoreNanoSeconds
		ps.CPU.UsageNanoCores = &usageNanoCores
	}

	if cs.Memory != nil {
		if ps.Memory == nil {
			ps.Memory = &statsapi.MemoryStats{}
		}

		ps.Memory.Time = cs.Memory.Time
		availableBytes := getUint64Value(cs.Memory.AvailableBytes) + getUint64Value(ps.Memory.AvailableBytes)
		usageBytes := getUint64Value(cs.Memory.UsageBytes) + getUint64Value(ps.Memory.UsageBytes)
		workingSetBytes := getUint64Value(cs.Memory.WorkingSetBytes) + getUint64Value(ps.Memory.WorkingSetBytes)
		rSSBytes := getUint64Value(cs.Memory.RSSBytes) + getUint64Value(ps.Memory.RSSBytes)
		pageFaults := getUint64Value(cs.Memory.PageFaults) + getUint64Value(ps.Memory.PageFaults)
		majorPageFaults := getUint64Value(cs.Memory.MajorPageFaults) + getUint64Value(ps.Memory.MajorPageFaults)
		ps.Memory.AvailableBytes = &availableBytes
		ps.Memory.UsageBytes = &usageBytes
		ps.Memory.WorkingSetBytes = &workingSetBytes
		ps.Memory.RSSBytes = &rSSBytes
		ps.Memory.PageFaults = &pageFaults
		ps.Memory.MajorPageFaults = &majorPageFaults
	}
}

func (p *criStatsProvider) addProcessStats(
	ps *statsapi.PodStats,
	podUID types.UID,
	allInfos map[string]cadvisorapiv2.ContainerInfo,
	cs *statsapi.ContainerStats,
) {
	// try get process stats from cadvisor only.
	info := getCadvisorPodInfoFromPodUID(podUID, allInfos)
	if info != nil {
		ps.ProcessStats = cadvisorInfoToProcessStats(info)
		return
	}
}

func (p *criStatsProvider) makeContainerStats(
	stats *runtimeapi.ContainerStats,
	container *runtimeapi.Container,
	rootFsInfo *cadvisorapiv2.FsInfo,
	fsIDtoInfo map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo,
	meta *runtimeapi.PodSandboxMetadata,
	updateCPUNanoCoreUsage bool,
) *statsapi.ContainerStats {
	result := &statsapi.ContainerStats{
		Name: stats.Attributes.Metadata.Name,
		// The StartTime in the summary API is the container creation time.
		StartTime: metav1.NewTime(time.Unix(0, container.CreatedAt)),
		CPU:       &statsapi.CPUStats{},
		Memory:    &statsapi.MemoryStats{},
		Rootfs:    &statsapi.FsStats{},
		// UserDefinedMetrics is not supported by CRI.
	}
	if stats.Cpu != nil {
		result.CPU.Time = metav1.NewTime(time.Unix(0, stats.Cpu.Timestamp))
		if stats.Cpu.UsageCoreNanoSeconds != nil {
			result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
		}
		var usageNanoCores *uint64
		if updateCPUNanoCoreUsage {
			usageNanoCores = p.getAndUpdateContainerUsageNanoCores(stats)
		} else {
			usageNanoCores = p.getContainerUsageNanoCores(stats)
		}
		if usageNanoCores != nil {
			result.CPU.UsageNanoCores = usageNanoCores
		}
	} else {
		result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
		result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
		result.CPU.UsageNanoCores = uint64Ptr(0)
	}
	if stats.Memory != nil {
		result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
		if stats.Memory.WorkingSetBytes != nil {
			result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value
		}
	} else {
		result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
		result.Memory.WorkingSetBytes = uint64Ptr(0)
	}
	if stats.WritableLayer != nil {
		result.Rootfs.Time = metav1.NewTime(time.Unix(0, stats.WritableLayer.Timestamp))
		if stats.WritableLayer.UsedBytes != nil {
			result.Rootfs.UsedBytes = &stats.WritableLayer.UsedBytes.Value
		}
		if stats.WritableLayer.InodesUsed != nil {
			result.Rootfs.InodesUsed = &stats.WritableLayer.InodesUsed.Value
		}
	}
	fsID := stats.GetWritableLayer().GetFsId()
	if fsID != nil {
		imageFsInfo, found := fsIDtoInfo[*fsID]
		if !found {
			imageFsInfo = p.getFsInfo(fsID)
			fsIDtoInfo[*fsID] = imageFsInfo
		}
		if imageFsInfo != nil {
			// The image filesystem id is unknown to the local node or there's
			// an error on retrieving the stats. In these cases, we omit those stats
			// and return the best-effort partial result. See
			// https://github.com/kubernetes/heapster/issues/1793.
			result.Rootfs.AvailableBytes = &imageFsInfo.Available
			result.Rootfs.CapacityBytes = &imageFsInfo.Capacity
			result.Rootfs.InodesFree = imageFsInfo.InodesFree
			result.Rootfs.Inodes = imageFsInfo.Inodes
		}
	}
	// NOTE: This doesn't support the old pod log path, `/var/log/pods/UID`. For containers
	// using old log path, empty log stats are returned. This is fine, because we don't
	// officially support in-place upgrade anyway.
	var err error
	result.Logs, err = p.hostStatsProvider.getPodContainerLogStats(meta.GetNamespace(), meta.GetName(), types.UID(meta.GetUid()), container.GetMetadata().GetName(), rootFsInfo)
	if err != nil {
		klog.ErrorS(err, "Unable to fetch container log stats", "containerName", container.GetMetadata().GetName())
	}
	return result
}

func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
	stats *runtimeapi.ContainerStats,
	container *runtimeapi.Container,
) *statsapi.ContainerStats {
	result := &statsapi.ContainerStats{
		Name: stats.Attributes.Metadata.Name,
		// The StartTime in the summary API is the container creation time.
		StartTime: metav1.NewTime(time.Unix(0, container.CreatedAt)),
		CPU:       &statsapi.CPUStats{},
		Memory:    &statsapi.MemoryStats{},
		// UserDefinedMetrics is not supported by CRI.
	}
	if stats.Cpu != nil {
		result.CPU.Time = metav1.NewTime(time.Unix(0, stats.Cpu.Timestamp))
		if stats.Cpu.UsageCoreNanoSeconds != nil {
			result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
		}

		usageNanoCores := p.getContainerUsageNanoCores(stats)
		if usageNanoCores != nil {
			result.CPU.UsageNanoCores = usageNanoCores
		}
	} else {
		result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
		result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
		result.CPU.UsageNanoCores = uint64Ptr(0)
	}
	if stats.Memory != nil {
		result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
		if stats.Memory.WorkingSetBytes != nil {
			result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value
		}
	} else {
		result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
		result.Memory.WorkingSetBytes = uint64Ptr(0)
	}

	return result
}

// getContainerUsageNanoCores first attempts to get the usage nano cores from the stats reported
// by the CRI. If it is unable to, it gets the information from the cache instead.
func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
	if stats == nil || stats.Attributes == nil {
		return nil
	}

	// Bypass the cache if the CRI implementation specified the UsageNanoCores.
	if stats.Cpu != nil && stats.Cpu.UsageNanoCores != nil {
		return &stats.Cpu.UsageNanoCores.Value
	}

	p.mutex.RLock()
	defer p.mutex.RUnlock()

	cached, ok := p.cpuUsageCache[stats.Attributes.Id]
	if !ok || cached.usageNanoCores == nil {
		return nil
	}
	// return a copy of the usage
	latestUsage := *cached.usageNanoCores
	return &latestUsage
}

// getAndUpdateContainerUsageNanoCores first attempts to get the usage nano cores from the stats reported
// by the CRI. If it is unable to, it computes usageNanoCores based on the given and the cached usageCoreNanoSeconds,
// updates the cache with the computed usageNanoCores, and returns the usageNanoCores.
func (p *criStatsProvider) getAndUpdateContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
	if stats == nil || stats.Attributes == nil || stats.Cpu == nil {
		return nil
	}
	// Bypass the cache if the CRI implementation specified the UsageNanoCores.
	if stats.Cpu.UsageNanoCores != nil {
		return &stats.Cpu.UsageNanoCores.Value
	}
	// If there is no UsageNanoCores, nor UsageCoreNanoSeconds, there is no information to use
	if stats.Cpu.UsageCoreNanoSeconds == nil {
		return nil
	}
	id := stats.Attributes.Id
	usage, err := func() (*uint64, error) {
		p.mutex.Lock()
		defer p.mutex.Unlock()

		cached, ok := p.cpuUsageCache[id]
		if !ok || cached.stats.UsageCoreNanoSeconds == nil || stats.Cpu.UsageCoreNanoSeconds.Value < cached.stats.UsageCoreNanoSeconds.Value {
			// Cannot compute the usage now, but update the cached stats anyway
			p.cpuUsageCache[id] = &cpuUsageRecord{stats: stats.Cpu, usageNanoCores: nil}
			return nil, nil
		}

		newStats := stats.Cpu
		cachedStats := cached.stats
		nanoSeconds := newStats.Timestamp - cachedStats.Timestamp
		if nanoSeconds <= 0 {
			return nil, fmt.Errorf("zero or negative interval (%v - %v)", newStats.Timestamp, cachedStats.Timestamp)
		}
		usageNanoCores := uint64(float64(newStats.UsageCoreNanoSeconds.Value-cachedStats.UsageCoreNanoSeconds.Value) /
			float64(nanoSeconds) * float64(time.Second/time.Nanosecond))

		// Update cache with new value.
		usageToUpdate := usageNanoCores
		p.cpuUsageCache[id] = &cpuUsageRecord{stats: newStats, usageNanoCores: &usageToUpdate}

		return &usageNanoCores, nil
	}()

	if err != nil {
		// This should not happen. Log now to raise visibility
		klog.ErrorS(err, "Failed updating cpu usage nano core")
	}
	return usage
}

func (p *criStatsProvider) cleanupOutdatedCaches() {
	p.mutex.Lock()
	defer p.mutex.Unlock()

	for k, v := range p.cpuUsageCache {
		if v == nil {
			delete(p.cpuUsageCache, k)
			continue
		}

		if time.Since(time.Unix(0, v.stats.Timestamp)) > defaultCachePeriod {
			delete(p.cpuUsageCache, k)
		}
	}
}

// removeTerminatedPods returns pods with terminated ones removed.
// It only removes a terminated pod when there is a running instance
// of the pod with the same name and namespace.
// This is needed because:
// 1) PodSandbox may be recreated;
// 2) Pod may be recreated with the same name and namespace.
func removeTerminatedPods(pods []*runtimeapi.PodSandbox) []*runtimeapi.PodSandbox {
	podMap := make(map[statsapi.PodReference][]*runtimeapi.PodSandbox)
	// Sort order by create time
	sort.Slice(pods, func(i, j int) bool {
		return pods[i].CreatedAt < pods[j].CreatedAt
	})
	for _, pod := range pods {
		refID := statsapi.PodReference{
			Name:      pod.GetMetadata().GetName(),
			Namespace: pod.GetMetadata().GetNamespace(),
			// UID is intentionally left empty.
		}
		podMap[refID] = append(podMap[refID], pod)
	}

	result := make([]*runtimeapi.PodSandbox, 0)
	for _, refs := range podMap {
		if len(refs) == 1 {
			result = append(result, refs[0])
			continue
		}
		found := false
		for i := 0; i < len(refs); i++ {
			if refs[i].State == runtimeapi.PodSandboxState_SANDBOX_READY {
				found = true
				result = append(result, refs[i])
			}
		}
		if !found {
			result = append(result, refs[len(refs)-1])
		}
	}
	return result
}

// removeTerminatedContainers removes all terminated containers since they should
// not be used for usage calculations.
func removeTerminatedContainers(containers []*runtimeapi.Container) []*runtimeapi.Container {
	containerMap := make(map[containerID][]*runtimeapi.Container)
	// Sort order by create time
	sort.Slice(containers, func(i, j int) bool {
		return containers[i].CreatedAt < containers[j].CreatedAt
	})
	for _, container := range containers {
		refID := containerID{
			podRef:        buildPodRef(container.Labels),
			containerName: kubetypes.GetContainerName(container.Labels),
		}
		containerMap[refID] = append(containerMap[refID], container)
	}

	result := make([]*runtimeapi.Container, 0)
	for _, refs := range containerMap {
		for i := 0; i < len(refs); i++ {
			if refs[i].State == runtimeapi.ContainerState_CONTAINER_RUNNING {
				result = append(result, refs[i])
			}
		}
	}
	return result
}

func (p *criStatsProvider) addCadvisorContainerStats(
	cs *statsapi.ContainerStats,
	caPodStats *cadvisorapiv2.ContainerInfo,
) {
	if caPodStats.Spec.HasCustomMetrics {
		cs.UserDefinedMetrics = cadvisorInfoToUserDefinedMetrics(caPodStats)
	}

	cpu, memory := cadvisorInfoToCPUandMemoryStats(caPodStats)
	if cpu != nil {
		cs.CPU = cpu
	}
	if memory != nil {
		cs.Memory = memory
	}
}

func (p *criStatsProvider) addCadvisorContainerCPUAndMemoryStats(
	cs *statsapi.ContainerStats,
	caPodStats *cadvisorapiv2.ContainerInfo,
) {
	if caPodStats.Spec.HasCustomMetrics {
		cs.UserDefinedMetrics = cadvisorInfoToUserDefinedMetrics(caPodStats)
	}

	cpu, memory := cadvisorInfoToCPUandMemoryStats(caPodStats)
	if cpu != nil {
		cs.CPU = cpu
	}
	if memory != nil {
		cs.Memory = memory
	}
}

func getCRICadvisorStats(infos map[string]cadvisorapiv2.ContainerInfo) (map[string]cadvisorapiv2.ContainerInfo, map[string]cadvisorapiv2.ContainerInfo) {
	stats := make(map[string]cadvisorapiv2.ContainerInfo)
	filteredInfos, cinfosByPodCgroupKey := filterTerminatedContainerInfoAndAssembleByPodCgroupKey(infos)
	for key, info := range filteredInfos {
		// On systemd using devicemapper each mount into the container has an
		// associated cgroup. We ignore them to ensure we do not get duplicate
		// entries in our summary. For details on .mount units:
		// http://man7.org/linux/man-pages/man5/systemd.mount.5.html
		if strings.HasSuffix(key, ".mount") {
			continue
		}
		// Build the Pod key if this container is managed by a Pod
		if !isPodManagedContainer(&info) {
			continue
		}
		stats[extractIDFromCgroupPath(key)] = info
	}
	return stats, cinfosByPodCgroupKey
}

func extractIDFromCgroupPath(cgroupPath string) string {
	// case0 == cgroupfs: "/kubepods/burstable/pod2fc932ce-fdcc-454b-97bd-aadfdeb4c340/9be25294016e2dc0340dd605ce1f57b492039b267a6a618a7ad2a7a58a740f32"
	id := path.Base(cgroupPath)

	// case1 == systemd: "/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod2fc932ce_fdcc_454b_97bd_aadfdeb4c340.slice/cri-containerd-aaefb9d8feed2d453b543f6d928cede7a4dbefa6a0ae7c9b990dd234c56e93b9.scope"
	// trim anything before the final '-' and suffix .scope
	systemdSuffix := ".scope"
	if strings.HasSuffix(id, systemdSuffix) {
		id = strings.TrimSuffix(id, systemdSuffix)
		components := strings.Split(id, "-")
		if len(components) > 1 {
			id = components[len(components)-1]
		}
	}
	return id
}

func addCRIPodNetworkStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
	if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Network == nil {
		return
	}
	criNetwork := criPodStat.Linux.Network
	iStats := statsapi.NetworkStats{
		Time:           metav1.NewTime(time.Unix(0, criNetwork.Timestamp)),
		InterfaceStats: criInterfaceToSummary(criNetwork.DefaultInterface),
		Interfaces:     make([]statsapi.InterfaceStats, 0, len(criNetwork.Interfaces)),
	}
	for _, iface := range criNetwork.Interfaces {
		iStats.Interfaces = append(iStats.Interfaces, criInterfaceToSummary(iface))
	}
	ps.Network = &iStats
}

func criInterfaceToSummary(criIface *runtimeapi.NetworkInterfaceUsage) statsapi.InterfaceStats {
	return statsapi.InterfaceStats{
		Name:     criIface.Name,
		RxBytes:  valueOfUInt64Value(criIface.RxBytes),
		RxErrors: valueOfUInt64Value(criIface.RxErrors),
		TxBytes:  valueOfUInt64Value(criIface.TxBytes),
		TxErrors: valueOfUInt64Value(criIface.TxErrors),
	}
}

func addCRIPodCPUStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
	if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Cpu == nil {
		return
	}
	criCPU := criPodStat.Linux.Cpu
	ps.CPU = &statsapi.CPUStats{
		Time:                 metav1.NewTime(time.Unix(0, criCPU.Timestamp)),
		UsageNanoCores:       valueOfUInt64Value(criCPU.UsageNanoCores),
		UsageCoreNanoSeconds: valueOfUInt64Value(criCPU.UsageCoreNanoSeconds),
	}
}

func addCRIPodMemoryStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
	if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Memory == nil {
		return
	}
	criMemory := criPodStat.Linux.Memory
	ps.Memory = &statsapi.MemoryStats{
		Time:            metav1.NewTime(time.Unix(0, criMemory.Timestamp)),
		AvailableBytes:  valueOfUInt64Value(criMemory.AvailableBytes),
		UsageBytes:      valueOfUInt64Value(criMemory.UsageBytes),
		WorkingSetBytes: valueOfUInt64Value(criMemory.WorkingSetBytes),
		RSSBytes:        valueOfUInt64Value(criMemory.RssBytes),
		PageFaults:      valueOfUInt64Value(criMemory.PageFaults),
		MajorPageFaults: valueOfUInt64Value(criMemory.MajorPageFaults),
	}
}

func addCRIPodProcessStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
	if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Process == nil {
		return
	}
	ps.ProcessStats = &statsapi.ProcessStats{
		ProcessCount: valueOfUInt64Value(criPodStat.Linux.Process.ProcessCount),
	}
}

func valueOfUInt64Value(value *runtimeapi.UInt64Value) *uint64 {
	if value == nil {
		return nil
	}
	return &value.Value
}

相关信息

kubernetes 源码目录

相关文章

kubernetes cadvisor_stats_provider 源码

kubernetes cadvisor_stats_provider_test 源码

kubernetes cri_stats_provider_others 源码

kubernetes cri_stats_provider_test 源码

kubernetes cri_stats_provider_windows 源码

kubernetes cri_stats_provider_windows_test 源码

kubernetes helper 源码

kubernetes helper_test 源码

kubernetes host_stats_provider 源码

kubernetes host_stats_provider_fake 源码

0  赞