众所周知,apiserver是Kubernetes最核心的组件之一,它提供了REST API用于集群管理。我们可以通过REST API直接与apiserver交互,但实际中,更多的是使用kubectl或SDK。Kubernetes提供了多种语言的SDK,本文主要是通过几个小例子简介针对Go语言的SDK,也就是client-go。
概述
Kubernetes client-go在github上的repository如下:
https://github.com/kubernetes/client-go
总的来说,通过client-go有三种方式与apiserver交互:
1、Clientset;
2、dynamicClient;
3、RESTClient;
Clientset用于对kubernetes原生的各种资源进行操作,例如deployment、node、daemonset等等。而dynamicClient则用于对扩展类型进行管理。不管是Clientset,还是dynamicClient,最终都是通过RESTClient来与apiserver进行交互。
kubeconfig
当使用命令行工具kubectl的时候,需要配置kubeconfig,也就是配置环境变量KUBECONFIG,使其指向kube config文件,例如:
export KUBECONFIG=/tmp/config
同理,使用client-go的时候,也需要配置kubeconfig。如果程序直接运行在k8s cluster内,那么可以直接通过如下代码获取kubeconfig,
package kubeconfig
import (
"log"
"k8s.io/client-go/rest"
)
func GetKubeconfig() *rest.Config {
cfg, err := rest.InClusterConfig()
if err != nil {
log.Fatalf("Failed to get the kubeconfig in cluster, error: %v", err)
}
return cfg
}
如果程序运行在k8s cluster之外,则需要提供kube config文件的路径,代码如下:
package kubeconfig
import (
"log"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func BuildKubeconfig(configFile string) *rest.Config {
cfg, err := clientcmd.BuildConfigFromFlags("", configFile)
if err != nil {
log.Fatalf("Failed to build kubeconfig: %v", err)
}
return cfg
}
实际上,当给上面的函数BuildKubeconfig传入的参数为空字符串“”时,clientcmd.BuildConfigFromFlags最终调用的还是rest.InClusterConfig()。
Clientset
绝大多数情况下,都是对kubernetes原生的资源进行操作,这时只需要Clientset就足够了。下面通过两个实际的小例子来说明如下使用Clientset。
例子1:获取cluster中所有node的信息
使用kubectl非常简单,命令如下:
kubectl get nodes
使用Clientset,同样很简单,代码如下。这里直接使用上面的BuildKubeconfig函数。kube config文件的路径是“/tmp/config”。
package main
import (
"examples/kube-client-go/kubeconfig"
"log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
func main() {
cfg := kubeconfig.BuildKubeconfig("/tmp/config")
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
log.Fatalf("Failed to build kubernetes clientset: %v", err)
}
log.Printf("Built kube clientset successfully")
nodeObj := kubeClient.CoreV1().Nodes()
nl, err := nodeObj.List(metav1.ListOptions{})
if err != nil {
log.Fatalf("Failed to get node, error: %v", err)
}
log.Printf("Node count: %d", len(nl.Items))
for i, item := range nl.Items {
log.Printf("node %d, %s", i, item.ObjectMeta.Name)
}
}
从Clientset的名字也可以看出,Clientset里面包含了许多client,分别用于操作不同的资源。上面的例子中,是使用CoreV1Client操作node。如果操作deployment、StatefulSet、Daemonset等,则需要使用AppV1Client,例如使用下面的语句获取所有的deployment,
dpm := kubeClient.AppsV1().Deployments()
例子2:监控集群中某些特定的deployment的变化
该例子监控集群中所有带有label "mylabelkey"的deployment的增、删、改事件。onAddResource/onUpdateResource/onDeleteResource分别是对应增、改、删事件的回调处理函数。
Kubernetes的list-watch机制是实现controler的核心。Kubernetes的核心组件,例如kube-controler、kube-scheduler以及kubelet都是通过list-watch机制来管理各种资源。关于list-watch,网上已经有很多讲解了,这里就不啰嗦了。
package main
import (
"examples/kube-client-go/kubeconfig"
"log"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgrt "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
func main() {
cfg := kubeconfig.BuildKubeconfig("/tmp/config")
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
log.Fatalf("Failed to build kubernetes clientset: %v", err)
}
log.Printf("Built kube clientset successfully")
deploymentWatcher := newListWatchFromClient(kubeClient.AppsV1().RESTClient(), "deployments", "mylabelkey")
_, deploymentInformer := cache.NewIndexerInformer(deploymentWatcher, &appsv1.Deployment{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: onAddResource,
UpdateFunc: onUpdateResource,
DeleteFunc: onDeleteResource,
}, cache.Indexers{})
stopCh := make(chan struct{})
go deploymentInformer.Run(stopCh)
<-stopCh
}
func onAddResource(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Printf("Failed to get key when processing the OnAdd callback, error: %v", err)
return
}
log.Printf("New deployment was added, %s", key)
}
func onUpdateResource(old interface{}, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
log.Printf("Failed to get key when processing the OnUpdate callback, error: %v", err)
return
}
log.Printf("Deployment was updated, %s", key)
}
func onDeleteResource(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
log.Printf("Failed to get key when processing the OnDelete callback, error: %v", err)
return
}
log.Printf("Deployment was deleted, %s", key)
}
func newListWatchFromClient(c cache.Getter, resource string, lbs string) *cache.ListWatch {
listFunc := func(options metav1.ListOptions) (pkgrt.Object, error) {
if len(lbs) > 0 {
options.LabelSelector = lbs
}
return c.Get().
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do().
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
if len(lbs) > 0 {
options.LabelSelector = lbs
}
return c.Get().
Prefix("watch").
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch()
}
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
上面的代码逻辑比较简单,一目了然,所以就不啰嗦解释了。
dynamicClient
如果需要对扩展资源进行操作,则需要借助于dynamicClient。这里通过流行的监控系统Prometheus为例,来做简要的说明。
部署Prometheus的YAML文件大致如下:
apiVersion: monitoring.coreos.com/v1
kind: Prometheus
metadata:
name: k8s
labels:
prometheus: k8s
spec:
replicas: 1
version: "v2.0.0"
serviceMonitorSelector:
matchExpressions:
- {key: k8s-app, operator: Exists}
很显然,kind为"Prometheus"的资源并不是Kubernetes原生支持的资源类型。另外,"ServiceMonitor"也不是Kubernetes支持的类型,例如:
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: test1-metrics
......
实际上"Prometheus"和"ServiceMonitor"都是prometheus-operator实现的Kubernetes扩展类型。它们的CRD定义大致如下。
Prometheus CRD:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
creationTimestamp: null
name: prometheuses.monitoring.coreos.com
spec:
group: monitoring.coreos.com
names:
kind: Prometheus
plural: prometheuses
scope: Namespaced
......
ServiceMonitor CRD:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
creationTimestamp: null
name: servicemonitors.monitoring.coreos.com
spec:
group: monitoring.coreos.com
names:
kind: ServiceMonitor
plural: servicemonitors
scope: Cluster
通过如下命令注册CRD:
kubectl apply -f prometheus_crd.yaml
kubectl apply -f servicemonitor_crd_yaml
Prometheus-operator中对kubernetes client-go中的dynamicClient的处理主要封装在下面的文件中,
https://github.com/coreos/prometheus-operator/blob/master/pkg/client/monitoring/v1/client.go
其中关键函数如下:
package v1
import (
"fmt"
"strings"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
dynamic "k8s.io/client-go/deprecated-dynamic"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
)
func NewForConfig(crdKinds *CrdKinds, apiGroup string, c *rest.Config) (*MonitoringV1Client, error) {
config := *c
SetConfigDefaults(apiGroup, &config)
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewClient(&config, schema.GroupVersion{
Group: apiGroup,
Version: Version,
})
if err != nil {
return nil, err
}
return &MonitoringV1Client{client, dynamicClient, crdKinds}, nil
}
在上面的函数中,创建了kubernetes client-go中dynamicClient的实例。之后与apiserver的交互就是通过这个dynamicClient的实例进行的。不过有两点值得注意:
1、与apiserver的list-watch的交互,是通过restclient进行的;其它操作则是通过dynamiclient进行的;
2、kubernetes client-go对dynamic包进行了更新,之前的dynamic改名为deprecated-dynamic。所以,以后要使用新的dynamic包。
小结
本文通过几个小例子简要介绍了kubernetes client-go的用法,以及如下几个重要的概念:
1、list-watch机制:是实现controller的关键;
2、通过CustomResourceDefinition可以对kubernetes进行扩展,这时就需要使用dynamicClient来管理这些扩展资源。




