kubernetes cri_stats_provider 源码
kubernetes cri_stats_provider 代码
文件路径:/pkg/kubelet/stats/cri_stats_provider.go
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"errors"
"fmt"
"path"
"sort"
"strings"
"sync"
"time"
cadvisorfs "github.com/google/cadvisor/fs"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/clock"
)
var (
// defaultCachePeriod is the default cache period for each cpuUsage.
defaultCachePeriod = 10 * time.Minute
)
// cpuUsageRecord holds the cpu usage stats and the calculated usageNanoCores.
type cpuUsageRecord struct {
stats *runtimeapi.CpuUsage
usageNanoCores *uint64
}
// criStatsProvider implements the containerStatsProvider interface by getting
// the container stats from CRI.
type criStatsProvider struct {
// cadvisor is used to get the node root filesystem's stats (such as the
// capacity/available bytes/inodes) that will be populated in per container
// filesystem stats.
cadvisor cadvisor.Interface
// resourceAnalyzer is used to get the volume stats of the pods.
resourceAnalyzer stats.ResourceAnalyzer
// runtimeService is used to get the status and stats of the pods and its
// managed containers.
runtimeService internalapi.RuntimeService
// imageService is used to get the stats of the image filesystem.
imageService internalapi.ImageManagerService
// hostStatsProvider is used to get the status of the host filesystem consumed by pods.
hostStatsProvider HostStatsProvider
// windowsNetworkStatsProvider is used by kubelet to gather networking stats on Windows
windowsNetworkStatsProvider interface{} //nolint:unused // U1000 We can't import hcsshim due to Build constraints in hcsshim
// clock is used report current time
clock clock.Clock
// cpuUsageCache caches the cpu usage for containers.
cpuUsageCache map[string]*cpuUsageRecord
mutex sync.RWMutex
podAndContainerStatsFromCRI bool
}
// newCRIStatsProvider returns a containerStatsProvider implementation that
// provides container stats using CRI.
func newCRIStatsProvider(
cadvisor cadvisor.Interface,
resourceAnalyzer stats.ResourceAnalyzer,
runtimeService internalapi.RuntimeService,
imageService internalapi.ImageManagerService,
hostStatsProvider HostStatsProvider,
podAndContainerStatsFromCRI bool,
) containerStatsProvider {
return &criStatsProvider{
cadvisor: cadvisor,
resourceAnalyzer: resourceAnalyzer,
runtimeService: runtimeService,
imageService: imageService,
hostStatsProvider: hostStatsProvider,
cpuUsageCache: make(map[string]*cpuUsageRecord),
podAndContainerStatsFromCRI: podAndContainerStatsFromCRI,
clock: clock.RealClock{},
}
}
// ListPodStats returns the stats of all the pod-managed containers.
func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
// Don't update CPU nano core usage.
return p.listPodStats(false)
}
// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
// This is a workaround because CRI runtimes do not supply nano core usages,
// so this function calculate the difference between the current and the last
// (cached) cpu stats to calculate this metrics. The implementation assumes a
// single caller to periodically invoke this function to update the metrics. If
// there exist multiple callers, the period used to compute the cpu usage may
// vary and the usage could be incoherent (e.g., spiky). If no caller calls
// this function, the cpu usage will stay nil. Right now, eviction manager is
// the only caller, and it calls this function every 10s.
func (p *criStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
// Update CPU nano core usage.
return p.listPodStats(true)
}
func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]statsapi.PodStats, error) {
// Gets node root filesystem information, which will be used to populate
// the available and capacity bytes/inodes in container stats.
rootFsInfo, err := p.cadvisor.RootFsInfo()
if err != nil {
return nil, fmt.Errorf("failed to get rootFs info: %v", err)
}
containerMap, podSandboxMap, err := p.getPodAndContainerMaps()
if err != nil {
return nil, fmt.Errorf("failed to get pod or container map: %v", err)
}
if p.podAndContainerStatsFromCRI {
_, err := p.listPodStatsStrictlyFromCRI(updateCPUNanoCoreUsage, containerMap, podSandboxMap, &rootFsInfo)
if err != nil {
s, ok := status.FromError(err)
// Legitimate failure, rather than the CRI implementation does not support ListPodSandboxStats.
if !ok || s.Code() != codes.Unimplemented {
return nil, err
}
// CRI implementation doesn't support ListPodSandboxStats, warn and fallback.
klog.V(5).ErrorS(err,
"CRI implementation must be updated to support ListPodSandboxStats if PodAndContainerStatsFromCRI feature gate is enabled. Falling back to populating with cAdvisor; this call will fail in the future.",
)
}
}
return p.listPodStatsPartiallyFromCRI(updateCPUNanoCoreUsage, containerMap, podSandboxMap, &rootFsInfo)
}
func (p *criStatsProvider) listPodStatsPartiallyFromCRI(updateCPUNanoCoreUsage bool, containerMap map[string]*runtimeapi.Container, podSandboxMap map[string]*runtimeapi.PodSandbox, rootFsInfo *cadvisorapiv2.FsInfo) ([]statsapi.PodStats, error) {
// fsIDtoInfo is a map from filesystem id to its stats. This will be used
// as a cache to avoid querying cAdvisor for the filesystem stats with the
// same filesystem id many times.
fsIDtoInfo := make(map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo)
// sandboxIDToPodStats is a temporary map from sandbox ID to its pod stats.
sandboxIDToPodStats := make(map[string]*statsapi.PodStats)
resp, err := p.runtimeService.ListContainerStats(&runtimeapi.ContainerStatsFilter{})
if err != nil {
return nil, fmt.Errorf("failed to list all container stats: %v", err)
}
allInfos, err := getCadvisorContainerInfo(p.cadvisor)
if err != nil {
return nil, fmt.Errorf("failed to fetch cadvisor stats: %v", err)
}
caInfos, allInfos := getCRICadvisorStats(allInfos)
// get network stats for containers.
// This is only used on Windows. For other platforms, (nil, nil) should be returned.
containerNetworkStats, err := p.listContainerNetworkStats()
if err != nil {
return nil, fmt.Errorf("failed to list container network stats: %v", err)
}
for _, stats := range resp {
containerID := stats.Attributes.Id
container, found := containerMap[containerID]
if !found {
continue
}
podSandboxID := container.PodSandboxId
podSandbox, found := podSandboxMap[podSandboxID]
if !found {
continue
}
// Creates the stats of the pod (if not created yet) which the
// container belongs to.
ps, found := sandboxIDToPodStats[podSandboxID]
if !found {
ps = buildPodStats(podSandbox)
sandboxIDToPodStats[podSandboxID] = ps
}
// Fill available stats for full set of required pod stats
cs := p.makeContainerStats(stats, container, rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata(), updateCPUNanoCoreUsage)
p.addPodNetworkStats(ps, podSandboxID, caInfos, cs, containerNetworkStats[podSandboxID])
p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
p.addProcessStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
// If cadvisor stats is available for the container, use it to populate
// container stats
caStats, caFound := caInfos[containerID]
if !caFound {
klog.V(5).InfoS("Unable to find cadvisor stats for container", "containerID", containerID)
} else {
p.addCadvisorContainerStats(cs, &caStats)
}
ps.Containers = append(ps.Containers, *cs)
}
// cleanup outdated caches.
p.cleanupOutdatedCaches()
result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats))
for _, s := range sandboxIDToPodStats {
makePodStorageStats(s, rootFsInfo, p.resourceAnalyzer, p.hostStatsProvider, true)
result = append(result, *s)
}
return result, nil
}
func (p *criStatsProvider) listPodStatsStrictlyFromCRI(updateCPUNanoCoreUsage bool, containerMap map[string]*runtimeapi.Container, podSandboxMap map[string]*runtimeapi.PodSandbox, rootFsInfo *cadvisorapiv2.FsInfo) ([]statsapi.PodStats, error) {
criSandboxStats, err := p.runtimeService.ListPodSandboxStats(&runtimeapi.PodSandboxStatsFilter{})
if err != nil {
return nil, err
}
fsIDtoInfo := make(map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo)
summarySandboxStats := make([]statsapi.PodStats, 0, len(podSandboxMap))
for _, criSandboxStat := range criSandboxStats {
if criSandboxStat == nil || criSandboxStat.Attributes == nil {
klog.V(5).InfoS("Unable to find CRI stats for sandbox")
continue
}
podSandbox, found := podSandboxMap[criSandboxStat.Attributes.Id]
if !found {
continue
}
ps := buildPodStats(podSandbox)
for _, criContainerStat := range criSandboxStat.Linux.Containers {
container, found := containerMap[criContainerStat.Attributes.Id]
if !found {
continue
}
// Fill available stats for full set of required pod stats
cs := p.makeContainerStats(criContainerStat, container, rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata(), updateCPUNanoCoreUsage)
ps.Containers = append(ps.Containers, *cs)
}
addCRIPodNetworkStats(ps, criSandboxStat)
addCRIPodCPUStats(ps, criSandboxStat)
addCRIPodMemoryStats(ps, criSandboxStat)
addCRIPodProcessStats(ps, criSandboxStat)
makePodStorageStats(ps, rootFsInfo, p.resourceAnalyzer, p.hostStatsProvider, true)
summarySandboxStats = append(summarySandboxStats, *ps)
}
return summarySandboxStats, nil
}
// ListPodCPUAndMemoryStats returns the CPU and Memory stats of all the pod-managed containers.
func (p *criStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) {
// sandboxIDToPodStats is a temporary map from sandbox ID to its pod stats.
sandboxIDToPodStats := make(map[string]*statsapi.PodStats)
containerMap, podSandboxMap, err := p.getPodAndContainerMaps()
if err != nil {
return nil, fmt.Errorf("failed to get pod or container map: %v", err)
}
result := make([]statsapi.PodStats, 0, len(podSandboxMap))
if p.podAndContainerStatsFromCRI {
criSandboxStats, err := p.runtimeService.ListPodSandboxStats(&runtimeapi.PodSandboxStatsFilter{})
// Call succeeded
if err == nil {
for _, criSandboxStat := range criSandboxStats {
podSandbox, found := podSandboxMap[criSandboxStat.Attributes.Id]
if !found {
continue
}
ps := buildPodStats(podSandbox)
addCRIPodCPUStats(ps, criSandboxStat)
addCRIPodMemoryStats(ps, criSandboxStat)
result = append(result, *ps)
}
return result, err
}
// Call failed, why?
s, ok := status.FromError(err)
// Legitimate failure, rather than the CRI implementation does not support ListPodSandboxStats.
if !ok || s.Code() != codes.Unimplemented {
return nil, err
}
// CRI implementation doesn't support ListPodSandboxStats, warn and fallback.
klog.ErrorS(err,
"CRI implementation must be updated to support ListPodSandboxStats if PodAndContainerStatsFromCRI feature gate is enabled. Falling back to populating with cAdvisor; this call will fail in the future.",
)
}
resp, err := p.runtimeService.ListContainerStats(&runtimeapi.ContainerStatsFilter{})
if err != nil {
return nil, fmt.Errorf("failed to list all container stats: %v", err)
}
allInfos, err := getCadvisorContainerInfo(p.cadvisor)
if err != nil {
return nil, fmt.Errorf("failed to fetch cadvisor stats: %v", err)
}
caInfos, allInfos := getCRICadvisorStats(allInfos)
for _, stats := range resp {
containerID := stats.Attributes.Id
container, found := containerMap[containerID]
if !found {
continue
}
podSandboxID := container.PodSandboxId
podSandbox, found := podSandboxMap[podSandboxID]
if !found {
continue
}
// Creates the stats of the pod (if not created yet) which the
// container belongs to.
ps, found := sandboxIDToPodStats[podSandboxID]
if !found {
ps = buildPodStats(podSandbox)
sandboxIDToPodStats[podSandboxID] = ps
}
// Fill available CPU and memory stats for full set of required pod stats
cs := p.makeContainerCPUAndMemoryStats(stats, container)
p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
// If cadvisor stats is available for the container, use it to populate
// container stats
caStats, caFound := caInfos[containerID]
if !caFound {
klog.V(4).InfoS("Unable to find cadvisor stats for container", "containerID", containerID)
} else {
p.addCadvisorContainerCPUAndMemoryStats(cs, &caStats)
}
ps.Containers = append(ps.Containers, *cs)
}
// cleanup outdated caches.
p.cleanupOutdatedCaches()
for _, s := range sandboxIDToPodStats {
result = append(result, *s)
}
return result, nil
}
func (p *criStatsProvider) getPodAndContainerMaps() (map[string]*runtimeapi.Container, map[string]*runtimeapi.PodSandbox, error) {
containers, err := p.runtimeService.ListContainers(&runtimeapi.ContainerFilter{})
if err != nil {
return nil, nil, fmt.Errorf("failed to list all containers: %v", err)
}
// Creates pod sandbox map between the pod sandbox ID and the PodSandbox object.
podSandboxMap := make(map[string]*runtimeapi.PodSandbox)
podSandboxes, err := p.runtimeService.ListPodSandbox(&runtimeapi.PodSandboxFilter{})
if err != nil {
return nil, nil, fmt.Errorf("failed to list all pod sandboxes: %v", err)
}
podSandboxes = removeTerminatedPods(podSandboxes)
for _, s := range podSandboxes {
podSandboxMap[s.Id] = s
}
containers = removeTerminatedContainers(containers)
// Creates container map between the container ID and the Container object.
containerMap := make(map[string]*runtimeapi.Container)
for _, c := range containers {
containerMap[c.Id] = c
}
return containerMap, podSandboxMap, nil
}
// ImageFsStats returns the stats of the image filesystem.
func (p *criStatsProvider) ImageFsStats() (*statsapi.FsStats, error) {
resp, err := p.imageService.ImageFsInfo()
if err != nil {
return nil, err
}
// CRI may return the stats of multiple image filesystems but we only
// return the first one.
//
// TODO(yguo0905): Support returning stats of multiple image filesystems.
if len(resp) == 0 {
return nil, fmt.Errorf("imageFs information is unavailable")
}
fs := resp[0]
s := &statsapi.FsStats{
Time: metav1.NewTime(time.Unix(0, fs.Timestamp)),
UsedBytes: &fs.UsedBytes.Value,
}
if fs.InodesUsed != nil {
s.InodesUsed = &fs.InodesUsed.Value
}
imageFsInfo := p.getFsInfo(fs.GetFsId())
if imageFsInfo != nil {
// The image filesystem id is unknown to the local node or there's
// an error on retrieving the stats. In these cases, we omit those
// stats and return the best-effort partial result. See
// https://github.com/kubernetes/heapster/issues/1793.
s.AvailableBytes = &imageFsInfo.Available
s.CapacityBytes = &imageFsInfo.Capacity
s.InodesFree = imageFsInfo.InodesFree
s.Inodes = imageFsInfo.Inodes
}
return s, nil
}
// ImageFsDevice returns name of the device where the image filesystem locates,
// e.g. /dev/sda1.
func (p *criStatsProvider) ImageFsDevice() (string, error) {
resp, err := p.imageService.ImageFsInfo()
if err != nil {
return "", err
}
for _, fs := range resp {
fsInfo := p.getFsInfo(fs.GetFsId())
if fsInfo != nil {
return fsInfo.Device, nil
}
}
return "", errors.New("imagefs device is not found")
}
// getFsInfo returns the information of the filesystem with the specified
// fsID. If any error occurs, this function logs the error and returns
// nil.
func (p *criStatsProvider) getFsInfo(fsID *runtimeapi.FilesystemIdentifier) *cadvisorapiv2.FsInfo {
if fsID == nil {
klog.V(2).InfoS("Failed to get filesystem info: fsID is nil")
return nil
}
mountpoint := fsID.GetMountpoint()
fsInfo, err := p.cadvisor.GetDirFsInfo(mountpoint)
if err != nil {
msg := "Failed to get the info of the filesystem with mountpoint"
if err == cadvisorfs.ErrNoSuchDevice {
klog.V(2).InfoS(msg, "mountpoint", mountpoint, "err", err)
} else {
klog.ErrorS(err, msg, "mountpoint", mountpoint)
}
return nil
}
return &fsInfo
}
// buildPodStats returns a PodStats that identifies the Pod managing cinfo
func buildPodStats(podSandbox *runtimeapi.PodSandbox) *statsapi.PodStats {
return &statsapi.PodStats{
PodRef: statsapi.PodReference{
Name: podSandbox.Metadata.Name,
UID: podSandbox.Metadata.Uid,
Namespace: podSandbox.Metadata.Namespace,
},
// The StartTime in the summary API is the pod creation time.
StartTime: metav1.NewTime(time.Unix(0, podSandbox.CreatedAt)),
}
}
func (p *criStatsProvider) addPodNetworkStats(
ps *statsapi.PodStats,
podSandboxID string,
caInfos map[string]cadvisorapiv2.ContainerInfo,
cs *statsapi.ContainerStats,
netStats *statsapi.NetworkStats,
) {
caPodSandbox, found := caInfos[podSandboxID]
// try get network stats from cadvisor first.
if found {
networkStats := cadvisorInfoToNetworkStats(&caPodSandbox)
if networkStats != nil {
ps.Network = networkStats
return
}
}
// Not found from cadvisor, get from netStats.
if netStats != nil {
ps.Network = netStats
return
}
// TODO: sum Pod network stats from container stats.
klog.V(4).InfoS("Unable to find network stats for sandbox", "sandboxID", podSandboxID)
}
func (p *criStatsProvider) addPodCPUMemoryStats(
ps *statsapi.PodStats,
podUID types.UID,
allInfos map[string]cadvisorapiv2.ContainerInfo,
cs *statsapi.ContainerStats,
) {
// try get cpu and memory stats from cadvisor first.
podCgroupInfo := getCadvisorPodInfoFromPodUID(podUID, allInfos)
if podCgroupInfo != nil {
cpu, memory := cadvisorInfoToCPUandMemoryStats(podCgroupInfo)
ps.CPU = cpu
ps.Memory = memory
return
}
// Sum Pod cpu and memory stats from containers stats.
if cs.CPU != nil {
if ps.CPU == nil {
ps.CPU = &statsapi.CPUStats{}
}
ps.CPU.Time = cs.CPU.Time
usageCoreNanoSeconds := getUint64Value(cs.CPU.UsageCoreNanoSeconds) + getUint64Value(ps.CPU.UsageCoreNanoSeconds)
usageNanoCores := getUint64Value(cs.CPU.UsageNanoCores) + getUint64Value(ps.CPU.UsageNanoCores)
ps.CPU.UsageCoreNanoSeconds = &usageCoreNanoSeconds
ps.CPU.UsageNanoCores = &usageNanoCores
}
if cs.Memory != nil {
if ps.Memory == nil {
ps.Memory = &statsapi.MemoryStats{}
}
ps.Memory.Time = cs.Memory.Time
availableBytes := getUint64Value(cs.Memory.AvailableBytes) + getUint64Value(ps.Memory.AvailableBytes)
usageBytes := getUint64Value(cs.Memory.UsageBytes) + getUint64Value(ps.Memory.UsageBytes)
workingSetBytes := getUint64Value(cs.Memory.WorkingSetBytes) + getUint64Value(ps.Memory.WorkingSetBytes)
rSSBytes := getUint64Value(cs.Memory.RSSBytes) + getUint64Value(ps.Memory.RSSBytes)
pageFaults := getUint64Value(cs.Memory.PageFaults) + getUint64Value(ps.Memory.PageFaults)
majorPageFaults := getUint64Value(cs.Memory.MajorPageFaults) + getUint64Value(ps.Memory.MajorPageFaults)
ps.Memory.AvailableBytes = &availableBytes
ps.Memory.UsageBytes = &usageBytes
ps.Memory.WorkingSetBytes = &workingSetBytes
ps.Memory.RSSBytes = &rSSBytes
ps.Memory.PageFaults = &pageFaults
ps.Memory.MajorPageFaults = &majorPageFaults
}
}
func (p *criStatsProvider) addProcessStats(
ps *statsapi.PodStats,
podUID types.UID,
allInfos map[string]cadvisorapiv2.ContainerInfo,
cs *statsapi.ContainerStats,
) {
// try get process stats from cadvisor only.
info := getCadvisorPodInfoFromPodUID(podUID, allInfos)
if info != nil {
ps.ProcessStats = cadvisorInfoToProcessStats(info)
return
}
}
func (p *criStatsProvider) makeContainerStats(
stats *runtimeapi.ContainerStats,
container *runtimeapi.Container,
rootFsInfo *cadvisorapiv2.FsInfo,
fsIDtoInfo map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo,
meta *runtimeapi.PodSandboxMetadata,
updateCPUNanoCoreUsage bool,
) *statsapi.ContainerStats {
result := &statsapi.ContainerStats{
Name: stats.Attributes.Metadata.Name,
// The StartTime in the summary API is the container creation time.
StartTime: metav1.NewTime(time.Unix(0, container.CreatedAt)),
CPU: &statsapi.CPUStats{},
Memory: &statsapi.MemoryStats{},
Rootfs: &statsapi.FsStats{},
// UserDefinedMetrics is not supported by CRI.
}
if stats.Cpu != nil {
result.CPU.Time = metav1.NewTime(time.Unix(0, stats.Cpu.Timestamp))
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}
var usageNanoCores *uint64
if updateCPUNanoCoreUsage {
usageNanoCores = p.getAndUpdateContainerUsageNanoCores(stats)
} else {
usageNanoCores = p.getContainerUsageNanoCores(stats)
}
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
} else {
result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
result.CPU.UsageNanoCores = uint64Ptr(0)
}
if stats.Memory != nil {
result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
if stats.Memory.WorkingSetBytes != nil {
result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value
}
} else {
result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.Memory.WorkingSetBytes = uint64Ptr(0)
}
if stats.WritableLayer != nil {
result.Rootfs.Time = metav1.NewTime(time.Unix(0, stats.WritableLayer.Timestamp))
if stats.WritableLayer.UsedBytes != nil {
result.Rootfs.UsedBytes = &stats.WritableLayer.UsedBytes.Value
}
if stats.WritableLayer.InodesUsed != nil {
result.Rootfs.InodesUsed = &stats.WritableLayer.InodesUsed.Value
}
}
fsID := stats.GetWritableLayer().GetFsId()
if fsID != nil {
imageFsInfo, found := fsIDtoInfo[*fsID]
if !found {
imageFsInfo = p.getFsInfo(fsID)
fsIDtoInfo[*fsID] = imageFsInfo
}
if imageFsInfo != nil {
// The image filesystem id is unknown to the local node or there's
// an error on retrieving the stats. In these cases, we omit those stats
// and return the best-effort partial result. See
// https://github.com/kubernetes/heapster/issues/1793.
result.Rootfs.AvailableBytes = &imageFsInfo.Available
result.Rootfs.CapacityBytes = &imageFsInfo.Capacity
result.Rootfs.InodesFree = imageFsInfo.InodesFree
result.Rootfs.Inodes = imageFsInfo.Inodes
}
}
// NOTE: This doesn't support the old pod log path, `/var/log/pods/UID`. For containers
// using old log path, empty log stats are returned. This is fine, because we don't
// officially support in-place upgrade anyway.
var err error
result.Logs, err = p.hostStatsProvider.getPodContainerLogStats(meta.GetNamespace(), meta.GetName(), types.UID(meta.GetUid()), container.GetMetadata().GetName(), rootFsInfo)
if err != nil {
klog.ErrorS(err, "Unable to fetch container log stats", "containerName", container.GetMetadata().GetName())
}
return result
}
func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
stats *runtimeapi.ContainerStats,
container *runtimeapi.Container,
) *statsapi.ContainerStats {
result := &statsapi.ContainerStats{
Name: stats.Attributes.Metadata.Name,
// The StartTime in the summary API is the container creation time.
StartTime: metav1.NewTime(time.Unix(0, container.CreatedAt)),
CPU: &statsapi.CPUStats{},
Memory: &statsapi.MemoryStats{},
// UserDefinedMetrics is not supported by CRI.
}
if stats.Cpu != nil {
result.CPU.Time = metav1.NewTime(time.Unix(0, stats.Cpu.Timestamp))
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}
usageNanoCores := p.getContainerUsageNanoCores(stats)
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
} else {
result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
result.CPU.UsageNanoCores = uint64Ptr(0)
}
if stats.Memory != nil {
result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
if stats.Memory.WorkingSetBytes != nil {
result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value
}
} else {
result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.Memory.WorkingSetBytes = uint64Ptr(0)
}
return result
}
// getContainerUsageNanoCores first attempts to get the usage nano cores from the stats reported
// by the CRI. If it is unable to, it gets the information from the cache instead.
func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Attributes == nil {
return nil
}
// Bypass the cache if the CRI implementation specified the UsageNanoCores.
if stats.Cpu != nil && stats.Cpu.UsageNanoCores != nil {
return &stats.Cpu.UsageNanoCores.Value
}
p.mutex.RLock()
defer p.mutex.RUnlock()
cached, ok := p.cpuUsageCache[stats.Attributes.Id]
if !ok || cached.usageNanoCores == nil {
return nil
}
// return a copy of the usage
latestUsage := *cached.usageNanoCores
return &latestUsage
}
// getAndUpdateContainerUsageNanoCores first attempts to get the usage nano cores from the stats reported
// by the CRI. If it is unable to, it computes usageNanoCores based on the given and the cached usageCoreNanoSeconds,
// updates the cache with the computed usageNanoCores, and returns the usageNanoCores.
func (p *criStatsProvider) getAndUpdateContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Attributes == nil || stats.Cpu == nil {
return nil
}
// Bypass the cache if the CRI implementation specified the UsageNanoCores.
if stats.Cpu.UsageNanoCores != nil {
return &stats.Cpu.UsageNanoCores.Value
}
// If there is no UsageNanoCores, nor UsageCoreNanoSeconds, there is no information to use
if stats.Cpu.UsageCoreNanoSeconds == nil {
return nil
}
id := stats.Attributes.Id
usage, err := func() (*uint64, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
cached, ok := p.cpuUsageCache[id]
if !ok || cached.stats.UsageCoreNanoSeconds == nil || stats.Cpu.UsageCoreNanoSeconds.Value < cached.stats.UsageCoreNanoSeconds.Value {
// Cannot compute the usage now, but update the cached stats anyway
p.cpuUsageCache[id] = &cpuUsageRecord{stats: stats.Cpu, usageNanoCores: nil}
return nil, nil
}
newStats := stats.Cpu
cachedStats := cached.stats
nanoSeconds := newStats.Timestamp - cachedStats.Timestamp
if nanoSeconds <= 0 {
return nil, fmt.Errorf("zero or negative interval (%v - %v)", newStats.Timestamp, cachedStats.Timestamp)
}
usageNanoCores := uint64(float64(newStats.UsageCoreNanoSeconds.Value-cachedStats.UsageCoreNanoSeconds.Value) /
float64(nanoSeconds) * float64(time.Second/time.Nanosecond))
// Update cache with new value.
usageToUpdate := usageNanoCores
p.cpuUsageCache[id] = &cpuUsageRecord{stats: newStats, usageNanoCores: &usageToUpdate}
return &usageNanoCores, nil
}()
if err != nil {
// This should not happen. Log now to raise visibility
klog.ErrorS(err, "Failed updating cpu usage nano core")
}
return usage
}
func (p *criStatsProvider) cleanupOutdatedCaches() {
p.mutex.Lock()
defer p.mutex.Unlock()
for k, v := range p.cpuUsageCache {
if v == nil {
delete(p.cpuUsageCache, k)
continue
}
if time.Since(time.Unix(0, v.stats.Timestamp)) > defaultCachePeriod {
delete(p.cpuUsageCache, k)
}
}
}
// removeTerminatedPods returns pods with terminated ones removed.
// It only removes a terminated pod when there is a running instance
// of the pod with the same name and namespace.
// This is needed because:
// 1) PodSandbox may be recreated;
// 2) Pod may be recreated with the same name and namespace.
func removeTerminatedPods(pods []*runtimeapi.PodSandbox) []*runtimeapi.PodSandbox {
podMap := make(map[statsapi.PodReference][]*runtimeapi.PodSandbox)
// Sort order by create time
sort.Slice(pods, func(i, j int) bool {
return pods[i].CreatedAt < pods[j].CreatedAt
})
for _, pod := range pods {
refID := statsapi.PodReference{
Name: pod.GetMetadata().GetName(),
Namespace: pod.GetMetadata().GetNamespace(),
// UID is intentionally left empty.
}
podMap[refID] = append(podMap[refID], pod)
}
result := make([]*runtimeapi.PodSandbox, 0)
for _, refs := range podMap {
if len(refs) == 1 {
result = append(result, refs[0])
continue
}
found := false
for i := 0; i < len(refs); i++ {
if refs[i].State == runtimeapi.PodSandboxState_SANDBOX_READY {
found = true
result = append(result, refs[i])
}
}
if !found {
result = append(result, refs[len(refs)-1])
}
}
return result
}
// removeTerminatedContainers removes all terminated containers since they should
// not be used for usage calculations.
func removeTerminatedContainers(containers []*runtimeapi.Container) []*runtimeapi.Container {
containerMap := make(map[containerID][]*runtimeapi.Container)
// Sort order by create time
sort.Slice(containers, func(i, j int) bool {
return containers[i].CreatedAt < containers[j].CreatedAt
})
for _, container := range containers {
refID := containerID{
podRef: buildPodRef(container.Labels),
containerName: kubetypes.GetContainerName(container.Labels),
}
containerMap[refID] = append(containerMap[refID], container)
}
result := make([]*runtimeapi.Container, 0)
for _, refs := range containerMap {
for i := 0; i < len(refs); i++ {
if refs[i].State == runtimeapi.ContainerState_CONTAINER_RUNNING {
result = append(result, refs[i])
}
}
}
return result
}
func (p *criStatsProvider) addCadvisorContainerStats(
cs *statsapi.ContainerStats,
caPodStats *cadvisorapiv2.ContainerInfo,
) {
if caPodStats.Spec.HasCustomMetrics {
cs.UserDefinedMetrics = cadvisorInfoToUserDefinedMetrics(caPodStats)
}
cpu, memory := cadvisorInfoToCPUandMemoryStats(caPodStats)
if cpu != nil {
cs.CPU = cpu
}
if memory != nil {
cs.Memory = memory
}
}
func (p *criStatsProvider) addCadvisorContainerCPUAndMemoryStats(
cs *statsapi.ContainerStats,
caPodStats *cadvisorapiv2.ContainerInfo,
) {
if caPodStats.Spec.HasCustomMetrics {
cs.UserDefinedMetrics = cadvisorInfoToUserDefinedMetrics(caPodStats)
}
cpu, memory := cadvisorInfoToCPUandMemoryStats(caPodStats)
if cpu != nil {
cs.CPU = cpu
}
if memory != nil {
cs.Memory = memory
}
}
func getCRICadvisorStats(infos map[string]cadvisorapiv2.ContainerInfo) (map[string]cadvisorapiv2.ContainerInfo, map[string]cadvisorapiv2.ContainerInfo) {
stats := make(map[string]cadvisorapiv2.ContainerInfo)
filteredInfos, cinfosByPodCgroupKey := filterTerminatedContainerInfoAndAssembleByPodCgroupKey(infos)
for key, info := range filteredInfos {
// On systemd using devicemapper each mount into the container has an
// associated cgroup. We ignore them to ensure we do not get duplicate
// entries in our summary. For details on .mount units:
// http://man7.org/linux/man-pages/man5/systemd.mount.5.html
if strings.HasSuffix(key, ".mount") {
continue
}
// Build the Pod key if this container is managed by a Pod
if !isPodManagedContainer(&info) {
continue
}
stats[extractIDFromCgroupPath(key)] = info
}
return stats, cinfosByPodCgroupKey
}
func extractIDFromCgroupPath(cgroupPath string) string {
// case0 == cgroupfs: "/kubepods/burstable/pod2fc932ce-fdcc-454b-97bd-aadfdeb4c340/9be25294016e2dc0340dd605ce1f57b492039b267a6a618a7ad2a7a58a740f32"
id := path.Base(cgroupPath)
// case1 == systemd: "/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod2fc932ce_fdcc_454b_97bd_aadfdeb4c340.slice/cri-containerd-aaefb9d8feed2d453b543f6d928cede7a4dbefa6a0ae7c9b990dd234c56e93b9.scope"
// trim anything before the final '-' and suffix .scope
systemdSuffix := ".scope"
if strings.HasSuffix(id, systemdSuffix) {
id = strings.TrimSuffix(id, systemdSuffix)
components := strings.Split(id, "-")
if len(components) > 1 {
id = components[len(components)-1]
}
}
return id
}
func addCRIPodNetworkStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Network == nil {
return
}
criNetwork := criPodStat.Linux.Network
iStats := statsapi.NetworkStats{
Time: metav1.NewTime(time.Unix(0, criNetwork.Timestamp)),
InterfaceStats: criInterfaceToSummary(criNetwork.DefaultInterface),
Interfaces: make([]statsapi.InterfaceStats, 0, len(criNetwork.Interfaces)),
}
for _, iface := range criNetwork.Interfaces {
iStats.Interfaces = append(iStats.Interfaces, criInterfaceToSummary(iface))
}
ps.Network = &iStats
}
func criInterfaceToSummary(criIface *runtimeapi.NetworkInterfaceUsage) statsapi.InterfaceStats {
return statsapi.InterfaceStats{
Name: criIface.Name,
RxBytes: valueOfUInt64Value(criIface.RxBytes),
RxErrors: valueOfUInt64Value(criIface.RxErrors),
TxBytes: valueOfUInt64Value(criIface.TxBytes),
TxErrors: valueOfUInt64Value(criIface.TxErrors),
}
}
func addCRIPodCPUStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Cpu == nil {
return
}
criCPU := criPodStat.Linux.Cpu
ps.CPU = &statsapi.CPUStats{
Time: metav1.NewTime(time.Unix(0, criCPU.Timestamp)),
UsageNanoCores: valueOfUInt64Value(criCPU.UsageNanoCores),
UsageCoreNanoSeconds: valueOfUInt64Value(criCPU.UsageCoreNanoSeconds),
}
}
func addCRIPodMemoryStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Memory == nil {
return
}
criMemory := criPodStat.Linux.Memory
ps.Memory = &statsapi.MemoryStats{
Time: metav1.NewTime(time.Unix(0, criMemory.Timestamp)),
AvailableBytes: valueOfUInt64Value(criMemory.AvailableBytes),
UsageBytes: valueOfUInt64Value(criMemory.UsageBytes),
WorkingSetBytes: valueOfUInt64Value(criMemory.WorkingSetBytes),
RSSBytes: valueOfUInt64Value(criMemory.RssBytes),
PageFaults: valueOfUInt64Value(criMemory.PageFaults),
MajorPageFaults: valueOfUInt64Value(criMemory.MajorPageFaults),
}
}
func addCRIPodProcessStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Process == nil {
return
}
ps.ProcessStats = &statsapi.ProcessStats{
ProcessCount: valueOfUInt64Value(criPodStat.Linux.Process.ProcessCount),
}
}
func valueOfUInt64Value(value *runtimeapi.UInt64Value) *uint64 {
if value == nil {
return nil
}
return &value.Value
}
相关信息
相关文章
kubernetes cadvisor_stats_provider 源码
kubernetes cadvisor_stats_provider_test 源码
kubernetes cri_stats_provider_others 源码
kubernetes cri_stats_provider_test 源码
kubernetes cri_stats_provider_windows 源码
kubernetes cri_stats_provider_windows_test 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦