kubectl top 命令原理
Dec 28, 2024 23:45 · 3578 words · 8 minute read
在部署了 kube-metrics-server 之后,我们能够通过 kubectl top
命令来查看 Kubernetes 集群中节点和 Pod 的计算资源使用情况:
$ kubectl top nodes vtester0
NAME CPU(cores) CPU% MEMORY(bytes) MEMORY%
vtester0 118m 2% 1859Mi 24%
$ kubectl top po kube-apiserver-vtester0 -n kube-system
NAME CPU(cores) MEMORY(bytes)
kube-apiserver-vtester0 34m 632Mi
本文将顺着调用链探索该命令的完整调用路径,从 kubernetes、metrics-server、cadvisor 以及 runc/libcontainer 项目的源码分析该过程的原理。
kubectl
我们首先要找出 kubectl 向 kube-apiserver 发送的 HTTP API 请求:
-
top Node:
$ kubectl top nodes vtester0 -v 8 I1226 04:38:00.472140 3448741 round_trippers.go:463] GET https://10.211.55.171:6443/apis/metrics.k8s.io/v1beta1/nodes/vtester0 # a lot of log here
-
top Pod:
$ kubectl top po kube-apiserver-vtester0 -n kube-system -v 8 I1226 04:40:55.324492 3449842 round_trippers.go:463] GET https://10.211.55.171:6443/apis/metrics.k8s.io/v1beta1/namespaces/kube-system/pods/kube-apiserver-vtester0 # a lot of log here
请求路径前缀都是 /apis/metrics.k8s.io/v1beta1/
,后面的路径节点和 Pod 有所不同但都遵循 Kubernetes API 的“规则”。
通过
-v
选项调高 kubectl 的日志粒度是分析 kubectl 调用了哪些 HTTP API 最简单明了的方法,往往一个 kubectl 命令封装了多个 API 调用。
metrics-server
大家都知道 metrics-server 项目的原理是通过 Kubernetes API 的 Aggregation Layer 将请求从 kube-apiserver 转发至扩展 apiserver 的(即 metrics-server),已经有很多文章分析过 Kubernetes 的这个能力了,这里不多赘述。
如何使用 Aggregation Layer 的官方文档:https://kubernetes.io/docs/tasks/extend-kubernetes/configure-aggregation-layer
Node
kubectl top node
请求被 kube-apiserver 转发到 metrics-server,API handler 是 nodeMetrics
.Get
方法:
func (m *nodeMetrics) Get(ctx context.Context, name string, opts *metav1.GetOptions) (runtime.Object, error) {
node, err := m.nodeLister.Get(name)
// a lot of code here
ms, err := m.getMetrics(node)
// a lot of code here
}
顺着调用链 nodeMetrics
.Get
-> nodeMetrics
.getMetrics
-> storage
.GetNodeMetrics
-> nodeStorage
.GetMetrics
来到 nodeStorage
.GetMetrics
方法:
func (s *nodeStorage) GetMetrics(nodes ...*corev1.Node) ([]metrics.NodeMetrics, error) {
results := make([]metrics.NodeMetrics, 0, len(nodes))
for _, node := range nodes {
last, found := s.last[node.Name]
if !found {
continue
}
prev, found := s.prev[node.Name]
if !found {
continue
}
rl, ti, err := resourceUsage(last, prev)
// a lot of code here
}
return results, nil
}
nodeStorage
是 metrics-server 中节点指标的缓存,会定时访问各节点上的 kubelet 读取指标并更新。而 nodeMetrics
直接从缓存中读取请求中指定节点的指标并返回,避免同步地访问节点上的 kubelet,虽然指标的时效性滞后一些但性能相对更好。
Pod
kubectl top pod
请求被 kube-apiserver 转发到 metrics-server,API handler 是 podMetrics
.Get
方法:
func (m *podMetrics) Get(ctx context.Context, name string, opts *metav1.GetOptions) (runtime.Object, error) {
// ...
pod, err := m.podLister.ByNamespace(namespace).Get(name)
// a lot of code here
ms, err := m.getMetrics(pod)
// a lot of code here
}
也顺着调用链 podMetrics
.Get
-> podMetrics
.getMetrics
-> storage
.GetPodMetrics
-> podStorage
.GetMetrics
来到 podStorage
.GetMetrics
方法:podStorage
与 nodeStorage
一样,是 Pod 指标数据的缓存。
缓存
storage
结构集合了 Node 与 Pod 指标数据缓存,除了 GetNodeMetrics
和 GetPodMetrics
还有一个 Store
方法:
func (s *storage) Store(batch *MetricsBatch) {
s.mu.Lock()
defer s.mu.Unlock()
s.nodes.Store(batch)
s.pods.Store(batch)
}
该方法分别向 nodeStorage
和 podStorage
两个缓存中存储最新的相关指标数据。而 Store
方法的调用者是一个定时器方法 tick
:
func (s *server) tick(ctx context.Context, startTime time.Time) {
// a lot of code here
data := s.scraper.Scrape(ctx)
klog.V(6).InfoS("Storing metrics")
s.storage.Store(data)
// a lot of code here
}
定期维护着缓存。
爬虫
metrics-server 中的“爬虫” scraper
访问集群中所有节点上的 kubelet 以获取 Node 和 Pod 指标:
type scraper struct {
nodeLister v1listers.NodeLister
kubeletClient client.KubeletMetricsGetter
scrapeTimeout time.Duration
labelSelector labels.Selector
}
通过 Scrape
方法抓取 Node 和 Pod 指标:scraper
.Scrape
-> scraper
.collectNode
-> kubeletClient
.GetMetrics
-> kubeletClient
.getMetrics
-> decodeBatch
:
func (kc *kubeletClient) GetMetrics(ctx context.Context, node *corev1.Node) (*storage.MetricsBatch, error) {
port := kc.defaultPort
path := "/metrics/resource"
nodeStatusPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
// a lot of code here
url := url.URL{
Scheme: kc.scheme,
Host: net.JoinHostPort(addr, strconv.Itoa(port)),
Path: path,
}
return kc.getMetrics(ctx, url.String(), node.Name)
}
10250 是 kubelet 的 metrics 端口,其 HTTP API 路径为 /metrics/resource,大家可以尝试自行访问该接口来查看返回值。
获取到指标数据后下一步必然是解析(反序列化),指标解析的函数 decodeBatch
这部分代码中能够看出指标名:
var (
nodeCpuUsageMetricName = []byte("node_cpu_usage_seconds_total")
nodeMemUsageMetricName = []byte("node_memory_working_set_bytes")
containerCpuUsageMetricName = []byte("container_cpu_usage_seconds_total")
containerMemUsageMetricName = []byte("container_memory_working_set_bytes")
containerStartTimeMetricName = []byte("container_start_time_seconds")
)
func decodeBatch(b []byte, defaultTime time.Time, nodeName string) (*storage.MetricsBatch, error) {
// a lot of code here
switch {
case timeseriesMatchesName(timeseries, nodeCpuUsageMetricName):
parseNodeCpuUsageMetrics(*maybeTimestamp, value, node)
case timeseriesMatchesName(timeseries, nodeMemUsageMetricName):
parseNodeMemUsageMetrics(*maybeTimestamp, value, node)
case timeseriesMatchesName(timeseries, containerCpuUsageMetricName):
namespaceName, containerName := parseContainerLabels(timeseries[len(containerCpuUsageMetricName):])
parseContainerCpuMetrics(namespaceName, containerName, *maybeTimestamp, value, pods)
case timeseriesMatchesName(timeseries, containerMemUsageMetricName):
namespaceName, containerName := parseContainerLabels(timeseries[len(containerMemUsageMetricName):])
parseContainerMemMetrics(namespaceName, containerName, *maybeTimestamp, value, pods)
default:
continue
// a lot of code here
}
返回的指标中包含了:
- 节点 CPU 使用率:
node_cpu_usage_seconds_total
- 节点内存使用率:
node_memory_working_set_bytes
- 容器 CPU 使用率:
container_cpu_usage_seconds_total
- 容器内存使用率:
container_memory_working_set_bytes
- 容器启动时间:
container_start_time_seconds
通过 prometheus 也能够看到这些 kubelet 指标的明细。
kubelet
接下来到 kubelet 如何通过内置的 cAdvisor 从机器上获取上述节点和容器计算资源相关的指标。
-
node_cpu_usage_seconds_total
:Cumulative cpu time consumed by the node in core-seconds
-
node_memory_working_set_bytes
:Current working set of the node in bytes
-
container_cpu_usage_seconds_total
Cumulative cpu time consumed by the container in core-seconds
-
container_memory_working_set_bytes
Current working set of the container in bytes
-
节点计算资源使用率
func (rc *resourceMetricsCollector) collectNodeCPUMetrics(ch chan<- metrics.Metric, s summary.NodeStats) { // a lot of code here ch <- metrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time, metrics.NewLazyConstMetric(nodeCPUUsageDesc, metrics.CounterValue, float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second))) } func (rc *resourceMetricsCollector) collectNodeMemoryMetrics(ch chan<- metrics.Metric, s summary.NodeStats) { // a lot of code here ch <- metrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time, metrics.NewLazyConstMetric(nodeMemoryUsageDesc, metrics.GaugeValue, float64(*s.Memory.WorkingSetBytes))) }
-
容器计算资源使用率
func (rc *resourceMetricsCollector) collectContainerCPUMetrics(ch chan<- metrics.Metric, pod summary.PodStats, s summary.ContainerStats) { // a lot of code here ch <- metrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time, metrics.NewLazyConstMetric(containerCPUUsageDesc, metrics.CounterValue, float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second), s.Name, pod.PodRef.Name, pod.PodRef.Namespace)) } func (rc *resourceMetricsCollector) collectContainerMemoryMetrics(ch chan<- metrics.Metric, pod summary.PodStats, s summary.ContainerStats) { // a lot of code here ch <- metrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time, metrics.NewLazyConstMetric(containerMemoryUsageDesc, metrics.GaugeValue, float64(*s.Memory.WorkingSetBytes), s.Name, pod.PodRef.Name, pod.PodRef.Namespace)) }
无论节点还是容器,计算资源的使用率的值都来自于 NodeStats
对象的 CPU
和 Memory
字段:
// NodeStats holds node-level unprocessed sample stats.
type NodeStats struct {
// Stats pertaining to CPU resources.
// +optional
CPU *CPUStats `json:"cpu,omitempty"`
// Stats pertaining to memory (RAM) resources.
// +optional
Memory *MemoryStats `json:"memory,omitempty"`
}
而 collectNode/ContainerCPU/MemoryMetrics 方法都由 resourceMetricsCollector
.CollectWithStability
调用:
func (rc *resourceMetricsCollector) CollectWithStability(ch chan<- metrics.Metric) {
// a lot of code here
statsSummary, err := rc.provider.GetCPUAndMemoryStats(ctx)
if err != nil {
errorCount = 1
klog.ErrorS(err, "Error getting summary for resourceMetric prometheus endpoint")
return
}
rc.collectNodeCPUMetrics(ch, statsSummary.Node)
rc.collectNodeMemoryMetrics(ch, statsSummary.Node)
for _, pod := range statsSummary.Pods {
for _, container := range pod.Containers {
rc.collectContainerStartTime(ch, pod, container)
rc.collectContainerCPUMetrics(ch, pod, container)
rc.collectContainerMemoryMetrics(ch, pod, container)
}
}
}
collectNode/ContainerCPU/MemoryMetrics 都依赖 statsSummary
对象,由 provider.GetCPUAndMemoryStats
返回:
func (sp *summaryProviderImpl) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) {
node, err := sp.provider.GetNode()
if err != nil {
return nil, fmt.Errorf("failed to get node info: %v", err)
}
nodeConfig := sp.provider.GetNodeConfig()
rootStats, err := sp.provider.GetCgroupCPUAndMemoryStats("/", false)
if err != nil {
return nil, fmt.Errorf("failed to get root cgroup stats: %v", err)
}
podStats, err := sp.provider.ListPodCPUAndMemoryStats(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list pod stats: %v", err)
}
nodeStats := statsapi.NodeStats{
NodeName: node.Name,
CPU: rootStats.CPU,
Memory: rootStats.Memory,
StartTime: rootStats.StartTime,
SystemContainers: sp.GetSystemContainersCPUAndMemoryStats(nodeConfig, podStats, false),
}
summary := statsapi.Summary{
Node: nodeStats,
Pods: podStats,
}
return &summary, nil
}
summary
分为两部分:Node 和 Pod 数据。Node 数据来自 rootStats
对象,由 provider.GetCgroupCPUAndMemoryStats
方法提供:provider.GetCgroupCPUAndMemoryStats
-> cadvisorInfoToContainerCPUAndMemoryStats
-> cadvisorInfoToCPUandMemoryStats
func cadvisorInfoToCPUandMemoryStats(info *cadvisorapiv2.ContainerInfo) (*statsapi.CPUStats, *statsapi.MemoryStats) {
cstat, found := latestContainerStats(info)
// a lot of code here
if info.Spec.HasCpu {
// ...
if cstat.Cpu != nil {
cpuStats.UsageCoreNanoSeconds = &cstat.Cpu.Usage.Total
}
}
if info.Spec.HasMemory && cstat.Memory != nil {
pageFaults := cstat.Memory.ContainerData.Pgfault
majorPageFaults := cstat.Memory.ContainerData.Pgmajfault
memoryStats = &statsapi.MemoryStats{
Time: metav1.NewTime(cstat.Timestamp),
UsageBytes: &cstat.Memory.Usage,
WorkingSetBytes: &cstat.Memory.WorkingSet,
RSSBytes: &cstat.Memory.RSS,
PageFaults: &pageFaults,
MajorPageFaults: &majorPageFaults,
}
// ...
}
return cpuStats, memoryStats
}
UsageCoreNanoSeconds
和 WorkingSetBytes
都来自于 cstat
对象(container stat)。
cadvisorInfoToCPUandMemoryStats
-> latestContainerStats
func latestContainerStats(info *cadvisorapiv2.ContainerInfo) (*cadvisorapiv2.ContainerStats, bool) {
stats := info.Stats
if len(stats) < 1 {
return nil, false
}
latest := stats[len(stats)-1]
// a lot of code here
}
取 info.Stats
列表最后一个元素也就是最近(新)的数据。
回到 info
的来源处 Provider
.GetCgroupCPUAndMemoryStats
:
func (p *Provider) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) {
info, err := getCgroupInfo(p.cadvisor, cgroupName, updateStats)
if err != nil {
return nil, fmt.Errorf("failed to get cgroup stats for %q: %v", cgroupName, err)
}
// Rootfs and imagefs doesn't make sense for raw cgroup.
s := cadvisorInfoToContainerCPUAndMemoryStats(cgroupName, info)
return s, nil
}
Provider
.GetCgroupCPUAndMemoryStats
-> getCgroupInfo
func getCgroupInfo(cadvisor cadvisor.Interface, containerName string, updateStats bool) (*cadvisorapiv2.ContainerInfo, error) {
// ...
infoMap, err := cadvisor.ContainerInfoV2(containerName, cadvisorapiv2.RequestOptions{
IdType: cadvisorapiv2.TypeName,
Count: 2, // 2 samples are needed to compute "instantaneous" CPU
Recursive: false,
MaxAge: maxAge,
})
// ...
info := infoMap[containerName]
return &info, nil
}
从函数名也能看出来跟 cgroup 有关,这里入参 containerName
的值为 /
,接下来要找到 cadvisor.Interface
的实现。
cAdvisor
cAdvisor(Container Advisor)是一个 Google 的开源项目,用于收集、汇总、处理运行中容器的信息。根据文档 https://github.com/google/cadvisor/blob/v0.46.1/docs/api_v2.md#container-name,/
是 root container,表示宿主机。
在 google/cadvisor 项目中搜索 GetContainerInfoV2
方法的实现:
func (m *manager) GetContainerInfoV2(containerName string, options v2.RequestOptions) (map[string]v2.ContainerInfo, error) {
containers, err := m.getRequestedContainers(containerName, options)
if err != nil {
return nil, err
}
var errs partialFailure
var nilTime time.Time // Ignored.
infos := make(map[string]v2.ContainerInfo, len(containers))
for name, container := range containers {
result := v2.ContainerInfo{}
cinfo, err := container.GetInfo(false)
if err != nil {
errs.append(name, "GetInfo", err)
infos[name] = result
continue
}
result.Spec = m.getV2Spec(cinfo)
stats, err := m.memoryCache.RecentStats(name, nilTime, nilTime, options.Count)
if err != nil {
errs.append(name, "RecentStats", err)
infos[name] = result
continue
}
result.Stats = v2.ContainerStatsFromV1(containerName, &cinfo.Spec, stats)
infos[name] = result
}
return infos, errs.OrNil()
}
manager
.GetContainerInfoV2
-> m.memoryCache.RecentStats
从缓存中拿到“容器数据” info.ContainerStats
从 metrics-server 到 kubelet(cadvisor)查询相关指标数据,都是从各自的缓存中获取,而非同步式调用;而缓存则各自定期维护,这也是 exporter 的设计哲学,牺牲指标数据的时效性换取更好的性能。接下来看 cadvisor 如何维护缓存,已知 containerName
的值为 /
,直接根据该信息去搜索代码,定位至 manager
.Start
方法:
// Start the container manager.
func (m *manager) Start() error {
// a lot of code here
// Create root and then recover all containers.
err = m.createContainer("/", watcher.Raw)
if err != nil {
return err
}
// ...
}
Container Manager manager
.Start
-> manager
.createContainer
-> manager
.createContainerLocked
-> containerData
.Start
为 root container 启动采集 containerData
.housekeeping
-> containerData
.housekeepingTick
-> containerData
.updateStats
,CPU 时间相关数据(CpuStats
)则通过 runc/libcontainer 获取:
type Stats struct {
CpuStats CpuStats `json:"cpu_stats,omitempty"`
CPUSetStats CPUSetStats `json:"cpuset_stats,omitempty"`
MemoryStats MemoryStats `json:"memory_stats,omitempty"`
PidsStats PidsStats `json:"pids_stats,omitempty"`
BlkioStats BlkioStats `json:"blkio_stats,omitempty"`
// the map is in the format "size of hugepage: stats of the hugepage"
HugetlbStats map[string]HugetlbStats `json:"hugetlb_stats,omitempty"`
RdmaStats RdmaStats `json:"rdma_stats,omitempty"`
}
type CpuStats struct {
CpuUsage CpuUsage `json:"cpu_usage,omitempty"`
ThrottlingData ThrottlingData `json:"throttling_data,omitempty"`
}
type CpuUsage struct {
// Total CPU time consumed.
// Units: nanoseconds.
TotalUsage uint64 `json:"total_usage,omitempty"`
// Total CPU time consumed per core.
// Units: nanoseconds.
PercpuUsage []uint64 `json:"percpu_usage,omitempty"`
// CPU time consumed per core in kernel mode
// Units: nanoseconds.
PercpuUsageInKernelmode []uint64 `json:"percpu_usage_in_kernelmode"`
// CPU time consumed per core in user mode
// Units: nanoseconds.
PercpuUsageInUsermode []uint64 `json:"percpu_usage_in_usermode"`
// Time spent by tasks of the cgroup in kernel mode.
// Units: nanoseconds.
UsageInKernelmode uint64 `json:"usage_in_kernelmode"`
// Time spent by tasks of the cgroup in user mode.
// Units: nanoseconds.
UsageInUsermode uint64 `json:"usage_in_usermode"`
}
containerData
.updateStats
-> containerdContainerHandler
.GetStats
-> Handler
.GetStats
func (h *Handler) GetStats() (*info.ContainerStats, error) {
// a lot of code here
cgroupStats, err := h.cgroupManager.GetStats()
// ...
libcontainerStats := &libcontainer.Stats{
CgroupStats: cgroupStats,
}
stats := newContainerStats(libcontainerStats, h.includedMetrics)
// ...
}
无论是 Docker、Containerd、CRI-O 甚至 Mesos,最终都指向了 libcontainer 的 Handler
.GetStats
。
runc/libcontainer
最终来到 libcontainer 中的 CgroupManager,注意 cgroup 分为 v1/v2 两个版本,所以 CgroupManager 也有两个实现,这里选看 cgroup v1 https://github.com/opencontainers/runc/blob/5fd4c4d144137e991c4acebb2146ab1483a97925/libcontainer/cgroups/fs/fs.go#L153-L167:
func (m *manager) GetStats() (*cgroups.Stats, error) {
m.mu.Lock()
defer m.mu.Unlock()
stats := cgroups.NewStats()
for _, sys := range subsystems {
path := m.paths[sys.Name()]
if path == "" {
continue
}
if err := sys.GetStats(path, stats); err != nil {
return nil, err
}
}
return stats, nil
}
每个 cgroup 子系统都分别实现了 subsystem 这个接口,而 CPU 使用时间相关的数据在 cpuacct 子系统中,其 GetStats 方法:
func (s *CpuacctGroup) GetStats(path string, stats *cgroups.Stats) error {
if !cgroups.PathExists(path) {
return nil
}
userModeUsage, kernelModeUsage, err := getCpuUsageBreakdown(path)
if err != nil {
return err
}
totalUsage, err := fscommon.GetCgroupParamUint(path, "cpuacct.usage")
if err != nil {
return err
}
percpuUsage, err := getPercpuUsage(path)
if err != nil {
return err
}
percpuUsageInKernelmode, percpuUsageInUsermode, err := getPercpuUsageInModes(path)
if err != nil {
return err
}
stats.CpuStats.CpuUsage.TotalUsage = totalUsage
stats.CpuStats.CpuUsage.PercpuUsage = percpuUsage
stats.CpuStats.CpuUsage.PercpuUsageInKernelmode = percpuUsageInKernelmode
stats.CpuStats.CpuUsage.PercpuUsageInUsermode = percpuUsageInUsermode
stats.CpuStats.CpuUsage.UsageInUsermode = userModeUsage
stats.CpuStats.CpuUsage.UsageInKernelmode = kernelModeUsage
return nil
}
即读取 /sys/fs/cgroup/cpuacct/cpuacct.usage 文件的值就是 root container 使用 CPU 的总时间,这就是 node_cpu_usage_seconds_total
指标的数据源,有经验的同学应该一开始就能猜出来。由于篇幅过长,节点内存和 Pod 计算资源就不详写了,顺着调用链分析即可。