Kubernetes apiserver 源码解析

Sep 5, 2018 00:30 · 1031 words · 3 minute read Kubernetes Golang

需要提前配置好 Golang 环境,本文基于 v1.15.2 版本。

Kubernetes 项目源码:https://github.com/kubernetes/kubernetes

编译源码

go get -d k8s.io/kubernetes
cd $GOPATH/src/k8s.io/kubernetes
git checkout v1.15.2
make

二进制可执行文件生成在 _output/bin/ 路径下:

$ ls _output/bin/
apiextensions-apiserver   defaulter-gen  genkubedocs         ginkgo      kubeadm                  kubelet         linkcheck
cloud-controller-manager  e2e_node.test  genman              go2make     kube-apiserver           kubemark        mounter
conversion-gen            e2e.test       genswaggertypedocs  go-bindata  kube-controller-manager  kube-proxy      openapi-gen
deepcopy-gen              gendocs        genyaml             hyperkube   kubectl                  kube-scheduler

源码结构

kube-apiserver 的源码在 cmd/kube-apiserver 路径下:

$ tree cmd/kube-apiserver
.
├── BUILD
├── OWNERS
├── apiserver.go
└── app
    ├── BUILD
    ├── aggregator.go
    ├── apiextensions.go
    ├── options
    │   ├── BUILD
    │   ├── globalflags.go
    │   ├── globalflags_test.go
    │   ├── options.go
    │   ├── options_test.go
    │   └── validation.go
    ├── server.go
    └── testing
        ├── BUILD
        ├── testdata
        │   ├── 127.0.0.1_10.0.0.1_kubernetes.default.svc-kubernetes.default-kubernetes-localhost.crt
        │   └── 127.0.0.1_10.0.0.1_kubernetes.default.svc-kubernetes.default-kubernetes-localhost.key
        └── testserver.go

apiserver.go,也就是 main() 的所在,是 kube-apiserver 应用程序的入口。app.NewAPIServerCommand() 构造函数构建了一个 cobra 命令对象。

Cobra 提供简单的接口来创建强大的现代化 CLI 接口,比如 git 与 go 工具。Cobra 同时也是一个程序, 用于创建 CLI 程序。

查看 app.NewAPIServerCommand() 函数的定义:

// app/server.go L93
func NewAPIServerCommand() *cobra.Command {
    // Section 1
    s := options.NewServerRunOptions()
    cmd := &cobra.Command{
        // ...
        RunE: func(cmd *cobra.Command, args []string) error {
            verflag.PrintAndExitIfRequested()
            utilflag.PrintFlags(cmd.Flags())

            // Section 2
            // set default options
            completedOptions, err := Complete(s)
            if err != nil {
                return err
            }

            // Section 3
            // validate options
            if errs := completedOptions.Validate(); len(errs) != 0 {
                return utilerrors.NewAggregate(errs)
            }

            // Section 4
            return Run(completedOptions, genericapiserver.SetupSignalHandler())
        },
    }
    // ...
}
  1. 构建参数对象
  2. 完成参数配置
  3. 判断参数是否合法
// app/options/options.go L39
type ServerRunOptions struct {
    // ...
}

ServerRunOptions 结构(对象)定义了启动 apiserver 时的配置参数,结构嵌套结构,字段非常多,对应着执行 kube-apiserver -h,确实有相当多的 flag。

  1. 执行 Run() 函数,创建并运行了 API server 对象:
// app/server.go L145
// Run runs the specified APIServer.  This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
    // To help debugging, immediately log version
    klog.Infof("Version: %+v", version.Get())

    server, err := CreateServerChain(completeOptions, stopCh)
    if err != nil {
        return err
    }

    return server.PrepareRun().Run(stopCh)
}

server 对象

// app/server.go L158
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
    // Section 1
    nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
    if err != nil {
        return nil, err
    }

    // Section 2
    kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
    if err != nil {
        return nil, err
    }

    // Section 3
    // If additional API servers are added, they should be gated.
    apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
        serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
    if err != nil {
        return nil, err
    }
    // Section 4
    apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
    if err != nil {
        return nil, err
    }

    kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
    if err != nil {
        return nil, err
    }

    // Section 5
    // otherwise go down the normal path of standing the aggregator up in front of the API server
    // this wires up openapi
    kubeAPIServer.GenericAPIServer.PrepareRun()

    // This will wire up openapi for extension api server
    apiExtensionsServer.GenericAPIServer.PrepareRun()

    // Section 6
    // aggregator comes last in the chain
    aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
    if err != nil {
        return nil, err
    }
    aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
    if err != nil {
        // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
        return nil, err
    }

    // Section 7
    if insecureServingInfo != nil {
        insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
        if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
            return nil, err
        }
    }

    return aggregatorServer.GenericAPIServer, nil
}
  1. CreateNodeDialer() 函数构建节点之间相互通讯的 SSH 隧道。
  2. CreateKubeAPIServerConfig() 函数构建 API server 运行所需的资源(配置)。
  3. createAPIExtensionsConfig() 函数构建扩展 API server 的配置。
  4. createAPIExtensionsServer()CreateKubeAPIServer() 函数构建扩展和原生的 API server。
  5. 调用扩展和原生 API server 的 PrepareRun() 方法。
  6. createAggregatorConfig() 函数聚合 server 的配置;createAggregatorServer() 函数创建了聚合 server 对象,将原生和扩展的 API server 的访问进行整合。
  7. 针对不安全(HTTP)的情况做一些额外处理。

这里最终返回的是聚合 server。