kubectl top 命令原理

Dec 28, 2024 23:45 · 3578 words · 8 minute read Kubernetes Container Linux

部署了 kube-metrics-server 之后,我们能够通过 kubectl top 命令来查看 Kubernetes 集群中节点和 Pod 的计算资源使用情况:

$ kubectl top nodes vtester0
NAME       CPU(cores)   CPU%   MEMORY(bytes)   MEMORY%
vtester0   118m         2%     1859Mi          24%

$ kubectl top po kube-apiserver-vtester0 -n kube-system
NAME                      CPU(cores)   MEMORY(bytes)
kube-apiserver-vtester0   34m          632Mi

本文将顺着调用链探索该命令的完整调用路径,从 kubernetesmetrics-servercadvisor 以及 runc/libcontainer 项目的源码分析该过程的原理。

kubectl

我们首先要找出 kubectl 向 kube-apiserver 发送的 HTTP API 请求:

  • top Node:

    $ kubectl top nodes vtester0 -v 8
    I1226 04:38:00.472140 3448741 round_trippers.go:463] GET https://10.211.55.171:6443/apis/metrics.k8s.io/v1beta1/nodes/vtester0
    # a lot of log here
    
  • top Pod:

    $ kubectl top po kube-apiserver-vtester0 -n kube-system -v 8
    I1226 04:40:55.324492 3449842 round_trippers.go:463] GET https://10.211.55.171:6443/apis/metrics.k8s.io/v1beta1/namespaces/kube-system/pods/kube-apiserver-vtester0
    # a lot of log here
    

请求路径前缀都是 /apis/metrics.k8s.io/v1beta1/,后面的路径节点和 Pod 有所不同但都遵循 Kubernetes API 的“规则”。

通过 -v 选项调高 kubectl 的日志粒度是分析 kubectl 调用了哪些 HTTP API 最简单明了的方法,往往一个 kubectl 命令封装了多个 API 调用。

metrics-server

大家都知道 metrics-server 项目的原理是通过 Kubernetes API 的 Aggregation Layer 将请求从 kube-apiserver 转发至扩展 apiserver 的(即 metrics-server),已经有很多文章分析过 Kubernetes 的这个能力了,这里不多赘述。

如何使用 Aggregation Layer 的官方文档:https://kubernetes.io/docs/tasks/extend-kubernetes/configure-aggregation-layer

Node

kubectl top node 请求被 kube-apiserver 转发到 metrics-server,API handler 是 nodeMetrics.Get 方法:

func (m *nodeMetrics) Get(ctx context.Context, name string, opts *metav1.GetOptions) (runtime.Object, error) {
	node, err := m.nodeLister.Get(name)
	// a lot of code here
	ms, err := m.getMetrics(node)
	// a lot of code here
}

顺着调用链 nodeMetrics.Get -> nodeMetrics.getMetrics -> storage.GetNodeMetrics -> nodeStorage.GetMetrics 来到 nodeStorage.GetMetrics 方法:

func (s *nodeStorage) GetMetrics(nodes ...*corev1.Node) ([]metrics.NodeMetrics, error) {
	results := make([]metrics.NodeMetrics, 0, len(nodes))
	for _, node := range nodes {
		last, found := s.last[node.Name]
		if !found {
			continue
		}

		prev, found := s.prev[node.Name]
		if !found {
			continue
		}
		rl, ti, err := resourceUsage(last, prev)
		// a lot of code here
	}
	return results, nil
}

nodeStorage 是 metrics-server 中节点指标的缓存,会定时访问各节点上的 kubelet 读取指标并更新。而 nodeMetrics 直接从缓存中读取请求中指定节点的指标并返回,避免同步地访问节点上的 kubelet,虽然指标的时效性滞后一些但性能相对更好

Pod

kubectl top pod 请求被 kube-apiserver 转发到 metrics-server,API handler 是 podMetrics.Get 方法:

func (m *podMetrics) Get(ctx context.Context, name string, opts *metav1.GetOptions) (runtime.Object, error) {
	// ...
	pod, err := m.podLister.ByNamespace(namespace).Get(name)
	// a lot of code here
	ms, err := m.getMetrics(pod)
	// a lot of code here
}

也顺着调用链 podMetrics.Get -> podMetrics.getMetrics -> storage.GetPodMetrics -> podStorage.GetMetrics 来到 podStorage.GetMetrics 方法:podStoragenodeStorage 一样,是 Pod 指标数据的缓存。

缓存

storage 结构集合了 Node 与 Pod 指标数据缓存,除了 GetNodeMetricsGetPodMetrics 还有一个 Store 方法:

func (s *storage) Store(batch *MetricsBatch) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.nodes.Store(batch)
	s.pods.Store(batch)
}

该方法分别向 nodeStoragepodStorage 两个缓存中存储最新的相关指标数据。而 Store 方法的调用者是一个定时器方法 tick

func (s *server) tick(ctx context.Context, startTime time.Time) {
	// a lot of code here
	data := s.scraper.Scrape(ctx)

	klog.V(6).InfoS("Storing metrics")
	s.storage.Store(data)
	// a lot of code here
}

定期维护着缓存。

爬虫

metrics-server 中的“爬虫” scraper 访问集群中所有节点上的 kubelet 以获取 Node 和 Pod 指标:

type scraper struct {
	nodeLister    v1listers.NodeLister
	kubeletClient client.KubeletMetricsGetter
	scrapeTimeout time.Duration
	labelSelector labels.Selector
}

通过 Scrape 方法抓取 Node 和 Pod 指标:scraper.Scrape -> scraper.collectNode -> kubeletClient.GetMetrics -> kubeletClient.getMetrics -> decodeBatch

func (kc *kubeletClient) GetMetrics(ctx context.Context, node *corev1.Node) (*storage.MetricsBatch, error) {
	port := kc.defaultPort
	path := "/metrics/resource"
	nodeStatusPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
	// a lot of code here
	url := url.URL{
		Scheme: kc.scheme,
		Host:   net.JoinHostPort(addr, strconv.Itoa(port)),
		Path:   path,
	}
	return kc.getMetrics(ctx, url.String(), node.Name)
}

10250 是 kubelet 的 metrics 端口,其 HTTP API 路径为 /metrics/resource,大家可以尝试自行访问该接口来查看返回值。

获取到指标数据后下一步必然是解析(反序列化),指标解析的函数 decodeBatch 这部分代码中能够看出指标名:

var (
	nodeCpuUsageMetricName       = []byte("node_cpu_usage_seconds_total")
	nodeMemUsageMetricName       = []byte("node_memory_working_set_bytes")
	containerCpuUsageMetricName  = []byte("container_cpu_usage_seconds_total")
	containerMemUsageMetricName  = []byte("container_memory_working_set_bytes")
	containerStartTimeMetricName = []byte("container_start_time_seconds")
)

func decodeBatch(b []byte, defaultTime time.Time, nodeName string) (*storage.MetricsBatch, error) {
	// a lot of code here
		switch {
		case timeseriesMatchesName(timeseries, nodeCpuUsageMetricName):
			parseNodeCpuUsageMetrics(*maybeTimestamp, value, node)
		case timeseriesMatchesName(timeseries, nodeMemUsageMetricName):
			parseNodeMemUsageMetrics(*maybeTimestamp, value, node)
		case timeseriesMatchesName(timeseries, containerCpuUsageMetricName):
			namespaceName, containerName := parseContainerLabels(timeseries[len(containerCpuUsageMetricName):])
			parseContainerCpuMetrics(namespaceName, containerName, *maybeTimestamp, value, pods)
		case timeseriesMatchesName(timeseries, containerMemUsageMetricName):
			namespaceName, containerName := parseContainerLabels(timeseries[len(containerMemUsageMetricName):])
			parseContainerMemMetrics(namespaceName, containerName, *maybeTimestamp, value, pods)
		default:
			continue
	// a lot of code here
}

返回的指标中包含了:

  • 节点 CPU 使用率:node_cpu_usage_seconds_total
  • 节点内存使用率:node_memory_working_set_bytes
  • 容器 CPU 使用率:container_cpu_usage_seconds_total
  • 容器内存使用率:container_memory_working_set_bytes
  • 容器启动时间:container_start_time_seconds

通过 prometheus 也能够看到这些 kubelet 指标的明细。

kubelet

接下来到 kubelet 如何通过内置的 cAdvisor 从机器上获取上述节点和容器计算资源相关的指标。

https://github.com/kubernetes/kubernetes/blob/64af1adaceba4db8d0efdb91453bce7073973771/pkg/kubelet/metrics/collectors/resource_metrics.go 代码文件中有上述指标的定义:

  • node_cpu_usage_seconds_total

    Cumulative cpu time consumed by the node in core-seconds

  • node_memory_working_set_bytes:

    Current working set of the node in bytes

  • container_cpu_usage_seconds_total

    Cumulative cpu time consumed by the container in core-seconds

  • container_memory_working_set_bytes

    Current working set of the container in bytes

  1. 节点计算资源使用率

    func (rc *resourceMetricsCollector) collectNodeCPUMetrics(ch chan<- metrics.Metric, s summary.NodeStats) {
    	// a lot of code here
    	ch <- metrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time,
    		metrics.NewLazyConstMetric(nodeCPUUsageDesc, metrics.CounterValue, float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second)))
    }
    
    func (rc *resourceMetricsCollector) collectNodeMemoryMetrics(ch chan<- metrics.Metric, s summary.NodeStats) {
    	// a lot of code here
    	ch <- metrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time,
    		metrics.NewLazyConstMetric(nodeMemoryUsageDesc, metrics.GaugeValue, float64(*s.Memory.WorkingSetBytes)))
    }
    
  2. 容器计算资源使用率

    func (rc *resourceMetricsCollector) collectContainerCPUMetrics(ch chan<- metrics.Metric, pod summary.PodStats, s summary.ContainerStats) {
    	// a lot of code here
    	ch <- metrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time,
    		metrics.NewLazyConstMetric(containerCPUUsageDesc, metrics.CounterValue,
    			float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second), s.Name, pod.PodRef.Name, pod.PodRef.Namespace))
    }
    
    func (rc *resourceMetricsCollector) collectContainerMemoryMetrics(ch chan<- metrics.Metric, pod summary.PodStats, s summary.ContainerStats) {
    	// a lot of code here
    	ch <- metrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time,
    		metrics.NewLazyConstMetric(containerMemoryUsageDesc, metrics.GaugeValue,
    			float64(*s.Memory.WorkingSetBytes), s.Name, pod.PodRef.Name, pod.PodRef.Namespace))
    }
    

无论节点还是容器,计算资源的使用率的值都来自于 NodeStats 对象的 CPUMemory 字段:

// NodeStats holds node-level unprocessed sample stats.
type NodeStats struct {
	// Stats pertaining to CPU resources.
	// +optional
	CPU *CPUStats `json:"cpu,omitempty"`
	// Stats pertaining to memory (RAM) resources.
	// +optional
	Memory *MemoryStats `json:"memory,omitempty"`
}

而 collectNode/ContainerCPU/MemoryMetrics 方法都由 resourceMetricsCollector.CollectWithStability 调用:

func (rc *resourceMetricsCollector) CollectWithStability(ch chan<- metrics.Metric) {
	// a lot of code here
	statsSummary, err := rc.provider.GetCPUAndMemoryStats(ctx)
	if err != nil {
		errorCount = 1
		klog.ErrorS(err, "Error getting summary for resourceMetric prometheus endpoint")
		return
	}

	rc.collectNodeCPUMetrics(ch, statsSummary.Node)
	rc.collectNodeMemoryMetrics(ch, statsSummary.Node)

	for _, pod := range statsSummary.Pods {
		for _, container := range pod.Containers {
			rc.collectContainerStartTime(ch, pod, container)
			rc.collectContainerCPUMetrics(ch, pod, container)
			rc.collectContainerMemoryMetrics(ch, pod, container)
		}
	}
}

collectNode/ContainerCPU/MemoryMetrics 都依赖 statsSummary 对象,由 provider.GetCPUAndMemoryStats 返回:

func (sp *summaryProviderImpl) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) {
	node, err := sp.provider.GetNode()
	if err != nil {
		return nil, fmt.Errorf("failed to get node info: %v", err)
	}
	nodeConfig := sp.provider.GetNodeConfig()
	rootStats, err := sp.provider.GetCgroupCPUAndMemoryStats("/", false)
	if err != nil {
		return nil, fmt.Errorf("failed to get root cgroup stats: %v", err)
	}

	podStats, err := sp.provider.ListPodCPUAndMemoryStats(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to list pod stats: %v", err)
	}

	nodeStats := statsapi.NodeStats{
		NodeName:         node.Name,
		CPU:              rootStats.CPU,
		Memory:           rootStats.Memory,
		StartTime:        rootStats.StartTime,
		SystemContainers: sp.GetSystemContainersCPUAndMemoryStats(nodeConfig, podStats, false),
	}
	summary := statsapi.Summary{
		Node: nodeStats,
		Pods: podStats,
	}
	return &summary, nil
}

summary 分为两部分:Node 和 Pod 数据。Node 数据来自 rootStats 对象,由 provider.GetCgroupCPUAndMemoryStats 方法提供:provider.GetCgroupCPUAndMemoryStats -> cadvisorInfoToContainerCPUAndMemoryStats -> cadvisorInfoToCPUandMemoryStats

func cadvisorInfoToCPUandMemoryStats(info *cadvisorapiv2.ContainerInfo) (*statsapi.CPUStats, *statsapi.MemoryStats) {
	cstat, found := latestContainerStats(info)
	// a lot of code here
	if info.Spec.HasCpu {
		// ...
		if cstat.Cpu != nil {
			cpuStats.UsageCoreNanoSeconds = &cstat.Cpu.Usage.Total
		}
	}
	if info.Spec.HasMemory && cstat.Memory != nil {
		pageFaults := cstat.Memory.ContainerData.Pgfault
		majorPageFaults := cstat.Memory.ContainerData.Pgmajfault
		memoryStats = &statsapi.MemoryStats{
			Time:            metav1.NewTime(cstat.Timestamp),
			UsageBytes:      &cstat.Memory.Usage,
			WorkingSetBytes: &cstat.Memory.WorkingSet,
			RSSBytes:        &cstat.Memory.RSS,
			PageFaults:      &pageFaults,
			MajorPageFaults: &majorPageFaults,
		}
		// ...
	}
	return cpuStats, memoryStats
}

UsageCoreNanoSecondsWorkingSetBytes 都来自于 cstat 对象(container stat)。

cadvisorInfoToCPUandMemoryStats -> latestContainerStats

func latestContainerStats(info *cadvisorapiv2.ContainerInfo) (*cadvisorapiv2.ContainerStats, bool) {
	stats := info.Stats
	if len(stats) < 1 {
		return nil, false
	}
	latest := stats[len(stats)-1]
	// a lot of code here
}

info.Stats 列表最后一个元素也就是最近(新)的数据。

回到 info 的来源处 Provider.GetCgroupCPUAndMemoryStats

func (p *Provider) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) {
	info, err := getCgroupInfo(p.cadvisor, cgroupName, updateStats)
	if err != nil {
		return nil, fmt.Errorf("failed to get cgroup stats for %q: %v", cgroupName, err)
	}
	// Rootfs and imagefs doesn't make sense for raw cgroup.
	s := cadvisorInfoToContainerCPUAndMemoryStats(cgroupName, info)
	return s, nil
}

Provider.GetCgroupCPUAndMemoryStats -> getCgroupInfo

func getCgroupInfo(cadvisor cadvisor.Interface, containerName string, updateStats bool) (*cadvisorapiv2.ContainerInfo, error) {
	// ...
	infoMap, err := cadvisor.ContainerInfoV2(containerName, cadvisorapiv2.RequestOptions{
		IdType:    cadvisorapiv2.TypeName,
		Count:     2, // 2 samples are needed to compute "instantaneous" CPU
		Recursive: false,
		MaxAge:    maxAge,
	})
	// ...
	info := infoMap[containerName]
	return &info, nil
}

从函数名也能看出来跟 cgroup 有关,这里入参 containerName 的值为 /,接下来要找到 cadvisor.Interface 的实现。

cAdvisor

cAdvisor(Container Advisor)是一个 Google 的开源项目,用于收集、汇总、处理运行中容器的信息。根据文档 https://github.com/google/cadvisor/blob/v0.46.1/docs/api_v2.md#container-name/ 是 root container,表示宿主机

google/cadvisor 项目中搜索 GetContainerInfoV2 方法的实现:

func (m *manager) GetContainerInfoV2(containerName string, options v2.RequestOptions) (map[string]v2.ContainerInfo, error) {
	containers, err := m.getRequestedContainers(containerName, options)
	if err != nil {
		return nil, err
	}

	var errs partialFailure
	var nilTime time.Time // Ignored.

	infos := make(map[string]v2.ContainerInfo, len(containers))
	for name, container := range containers {
		result := v2.ContainerInfo{}
		cinfo, err := container.GetInfo(false)
		if err != nil {
			errs.append(name, "GetInfo", err)
			infos[name] = result
			continue
		}
		result.Spec = m.getV2Spec(cinfo)

		stats, err := m.memoryCache.RecentStats(name, nilTime, nilTime, options.Count)
		if err != nil {
			errs.append(name, "RecentStats", err)
			infos[name] = result
			continue
		}

		result.Stats = v2.ContainerStatsFromV1(containerName, &cinfo.Spec, stats)
		infos[name] = result
	}

	return infos, errs.OrNil()
}

manager.GetContainerInfoV2 -> m.memoryCache.RecentStats 从缓存中拿到“容器数据” info.ContainerStats

从 metrics-server 到 kubelet(cadvisor)查询相关指标数据,都是从各自的缓存中获取,而非同步式调用;而缓存则各自定期维护,这也是 exporter 的设计哲学,牺牲指标数据的时效性换取更好的性能。接下来看 cadvisor 如何维护缓存,已知 containerName 的值为 /,直接根据该信息去搜索代码,定位至 manager.Start 方法:

// Start the container manager.
func (m *manager) Start() error {
	// a lot of code here
	// Create root and then recover all containers.
	err = m.createContainer("/", watcher.Raw)
	if err != nil {
		return err
	}
	// ...
}

Container Manager manager.Start -> manager.createContainer -> manager.createContainerLocked -> containerData.Start 为 root container 启动采集 containerData.housekeeping -> containerData.housekeepingTick -> containerData.updateStats,CPU 时间相关数据(CpuStats)则通过 runc/libcontainer 获取:

type Stats struct {
	CpuStats    CpuStats    `json:"cpu_stats,omitempty"`
	CPUSetStats CPUSetStats `json:"cpuset_stats,omitempty"`
	MemoryStats MemoryStats `json:"memory_stats,omitempty"`
	PidsStats   PidsStats   `json:"pids_stats,omitempty"`
	BlkioStats  BlkioStats  `json:"blkio_stats,omitempty"`
	// the map is in the format "size of hugepage: stats of the hugepage"
	HugetlbStats map[string]HugetlbStats `json:"hugetlb_stats,omitempty"`
	RdmaStats    RdmaStats               `json:"rdma_stats,omitempty"`
}

type CpuStats struct {
	CpuUsage       CpuUsage       `json:"cpu_usage,omitempty"`
	ThrottlingData ThrottlingData `json:"throttling_data,omitempty"`
}

type CpuUsage struct {
	// Total CPU time consumed.
	// Units: nanoseconds.
	TotalUsage uint64 `json:"total_usage,omitempty"`
	// Total CPU time consumed per core.
	// Units: nanoseconds.
	PercpuUsage []uint64 `json:"percpu_usage,omitempty"`
	// CPU time consumed per core in kernel mode
	// Units: nanoseconds.
	PercpuUsageInKernelmode []uint64 `json:"percpu_usage_in_kernelmode"`
	// CPU time consumed per core in user mode
	// Units: nanoseconds.
	PercpuUsageInUsermode []uint64 `json:"percpu_usage_in_usermode"`
	// Time spent by tasks of the cgroup in kernel mode.
	// Units: nanoseconds.
	UsageInKernelmode uint64 `json:"usage_in_kernelmode"`
	// Time spent by tasks of the cgroup in user mode.
	// Units: nanoseconds.
	UsageInUsermode uint64 `json:"usage_in_usermode"`
}

containerData.updateStats -> containerdContainerHandler.GetStats -> Handler.GetStats

func (h *Handler) GetStats() (*info.ContainerStats, error) {
	// a lot of code here
	cgroupStats, err := h.cgroupManager.GetStats()
	// ...
	libcontainerStats := &libcontainer.Stats{
		CgroupStats: cgroupStats,
	}
	stats := newContainerStats(libcontainerStats, h.includedMetrics)
	// ...
}

无论是 Docker、Containerd、CRI-O 甚至 Mesos,最终都指向了 libcontainer 的 Handler.GetStats

runc/libcontainer

最终来到 libcontainer 中的 CgroupManager注意 cgroup 分为 v1/v2 两个版本,所以 CgroupManager 也有两个实现,这里选看 cgroup v1 https://github.com/opencontainers/runc/blob/5fd4c4d144137e991c4acebb2146ab1483a97925/libcontainer/cgroups/fs/fs.go#L153-L167

func (m *manager) GetStats() (*cgroups.Stats, error) {
	m.mu.Lock()
	defer m.mu.Unlock()
	stats := cgroups.NewStats()
	for _, sys := range subsystems {
		path := m.paths[sys.Name()]
		if path == "" {
			continue
		}
		if err := sys.GetStats(path, stats); err != nil {
			return nil, err
		}
	}
	return stats, nil
}

每个 cgroup 子系统都分别实现了 subsystem 这个接口,而 CPU 使用时间相关的数据在 cpuacct 子系统中,其 GetStats 方法:

func (s *CpuacctGroup) GetStats(path string, stats *cgroups.Stats) error {
	if !cgroups.PathExists(path) {
		return nil
	}
	userModeUsage, kernelModeUsage, err := getCpuUsageBreakdown(path)
	if err != nil {
		return err
	}

	totalUsage, err := fscommon.GetCgroupParamUint(path, "cpuacct.usage")
	if err != nil {
		return err
	}

	percpuUsage, err := getPercpuUsage(path)
	if err != nil {
		return err
	}

	percpuUsageInKernelmode, percpuUsageInUsermode, err := getPercpuUsageInModes(path)
	if err != nil {
		return err
	}

	stats.CpuStats.CpuUsage.TotalUsage = totalUsage
	stats.CpuStats.CpuUsage.PercpuUsage = percpuUsage
	stats.CpuStats.CpuUsage.PercpuUsageInKernelmode = percpuUsageInKernelmode
	stats.CpuStats.CpuUsage.PercpuUsageInUsermode = percpuUsageInUsermode
	stats.CpuStats.CpuUsage.UsageInUsermode = userModeUsage
	stats.CpuStats.CpuUsage.UsageInKernelmode = kernelModeUsage
	return nil
}

即读取 /sys/fs/cgroup/cpuacct/cpuacct.usage 文件的值就是 root container 使用 CPU 的总时间,这就是 node_cpu_usage_seconds_total 指标的数据源,有经验的同学应该一开始就能猜出来。由于篇幅过长,节点内存和 Pod 计算资源就不详写了,顺着调用链分析即可。