暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kubernetes client-go简介

零君聊软件 2018-09-01
715

众所周知,apiserver是Kubernetes最核心的组件之一,它提供了REST API用于集群管理。我们可以通过REST API直接与apiserver交互,但实际中,更多的是使用kubectl或SDK。Kubernetes提供了多种语言的SDK,本文主要是通过几个小例子简介针对Go语言的SDK,也就是client-go。


概述

Kubernetes client-go在github上的repository如下:

https://github.com/kubernetes/client-go


总的来说,通过client-go有三种方式与apiserver交互:

1、Clientset;

2、dynamicClient;

3、RESTClient;


Clientset用于对kubernetes原生的各种资源进行操作,例如deployment、node、daemonset等等。而dynamicClient则用于对扩展类型进行管理。不管是Clientset,还是dynamicClient,最终都是通过RESTClient来与apiserver进行交互。


kubeconfig

当使用命令行工具kubectl的时候,需要配置kubeconfig,也就是配置环境变量KUBECONFIG,使其指向kube config文件,例如:

export KUBECONFIG=/tmp/config


同理,使用client-go的时候,也需要配置kubeconfig。如果程序直接运行在k8s cluster内,那么可以直接通过如下代码获取kubeconfig,

package kubeconfig


import (

    "log"

    "k8s.io/client-go/rest"

)


func GetKubeconfig() *rest.Config {

    cfg, err := rest.InClusterConfig()

    if err != nil {

     log.Fatalf("Failed to get the kubeconfig in cluster, error: %v", err)

    }

    return cfg

}


如果程序运行在k8s cluster之外,则需要提供kube config文件的路径,代码如下:

package kubeconfig


import (

"log"

        "k8s.io/client-go/rest"

"k8s.io/client-go/tools/clientcmd"

)


func BuildKubeconfig(configFile string) *rest.Config {

    cfg, err := clientcmd.BuildConfigFromFlags("", configFile)

    if err != nil {

log.Fatalf("Failed to build kubeconfig: %v", err)

    }

    return cfg

}


实际上,当给上面的函数BuildKubeconfig传入的参数为空字符串“”时,clientcmd.BuildConfigFromFlags最终调用的还是rest.InClusterConfig()。


Clientset

绝大多数情况下,都是对kubernetes原生的资源进行操作,这时只需要Clientset就足够了。下面通过两个实际的小例子来说明如下使用Clientset。


例子1:获取cluster中所有node的信息

使用kubectl非常简单,命令如下:

kubectl get nodes


使用Clientset,同样很简单,代码如下。这里直接使用上面的BuildKubeconfig函数。kube config文件的路径是“/tmp/config”。

package main


import (

    "examples/kube-client-go/kubeconfig"

    "log"


    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

    "k8s.io/client-go/kubernetes"

)


func main() {

    cfg := kubeconfig.BuildKubeconfig("/tmp/config")


    kubeClient, err := kubernetes.NewForConfig(cfg)

    if err != nil {

log.Fatalf("Failed to build kubernetes clientset: %v", err)

    }


    log.Printf("Built kube clientset successfully")

    nodeObj := kubeClient.CoreV1().Nodes()

    nl, err := nodeObj.List(metav1.ListOptions{})


    if err != nil {

log.Fatalf("Failed to get node, error: %v", err)

    }


    log.Printf("Node count: %d", len(nl.Items))

    for i, item := range nl.Items {

log.Printf("node %d, %s", i, item.ObjectMeta.Name)

    }

}


从Clientset的名字也可以看出,Clientset里面包含了许多client,分别用于操作不同的资源。上面的例子中,是使用CoreV1Client操作node。如果操作deployment、StatefulSet、Daemonset等,则需要使用AppV1Client,例如使用下面的语句获取所有的deployment,

dpm := kubeClient.AppsV1().Deployments()


例子2:监控集群中某些特定的deployment的变化

该例子监控集群中所有带有label "mylabelkey"的deployment的增、删、改事件。onAddResource/onUpdateResource/onDeleteResource分别是对应增、改、删事件的回调处理函数。


Kubernetes的list-watch机制是实现controler的核心。Kubernetes的核心组件,例如kube-controler、kube-scheduler以及kubelet都是通过list-watch机制来管理各种资源。关于list-watch,网上已经有很多讲解了,这里就不啰嗦了。

package main


import (

"examples/kube-client-go/kubeconfig"

"log"


appsv1 "k8s.io/api/apps/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

pkgrt "k8s.io/apimachinery/pkg/runtime"

"k8s.io/apimachinery/pkg/watch"

"k8s.io/client-go/kubernetes"

"k8s.io/client-go/tools/cache"

)


func main() {

cfg := kubeconfig.BuildKubeconfig("/tmp/config")

kubeClient, err := kubernetes.NewForConfig(cfg)

if err != nil {

log.Fatalf("Failed to build kubernetes clientset: %v", err)

}


log.Printf("Built kube clientset successfully")

deploymentWatcher := newListWatchFromClient(kubeClient.AppsV1().RESTClient(), "deployments", "mylabelkey")

_, deploymentInformer := cache.NewIndexerInformer(deploymentWatcher, &appsv1.Deployment{}, 0, cache.ResourceEventHandlerFuncs{

AddFunc:    onAddResource,

UpdateFunc: onUpdateResource,

DeleteFunc: onDeleteResource,

}, cache.Indexers{})


stopCh := make(chan struct{})

go deploymentInformer.Run(stopCh)


<-stopCh

}


func onAddResource(obj interface{}) {

key, err := cache.MetaNamespaceKeyFunc(obj)

if err != nil {

log.Printf("Failed to get key when processing the OnAdd callback, error: %v", err)

return

}

log.Printf("New deployment was added, %s", key)

}


func onUpdateResource(old interface{}, newObj interface{}) {

key, err := cache.MetaNamespaceKeyFunc(newObj)

if err != nil {

log.Printf("Failed to get key when processing the OnUpdate callback, error: %v", err)

return

}

log.Printf("Deployment was updated, %s", key)

}


func onDeleteResource(obj interface{}) {

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)

if err != nil {

log.Printf("Failed to get key when processing the OnDelete callback, error: %v", err)

return

}

log.Printf("Deployment was deleted, %s", key)

}


func newListWatchFromClient(c cache.Getter, resource string, lbs string) *cache.ListWatch {

listFunc := func(options metav1.ListOptions) (pkgrt.Object, error) {

if len(lbs) > 0 {

options.LabelSelector = lbs

}

return c.Get().

Resource(resource).

VersionedParams(&options, metav1.ParameterCodec).

Do().

Get()

}

watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {

if len(lbs) > 0 {

options.LabelSelector = lbs

}

return c.Get().

Prefix("watch").

Resource(resource).

VersionedParams(&options, metav1.ParameterCodec).

Watch()

}

return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}

}


上面的代码逻辑比较简单,一目了然,所以就不啰嗦解释了。


dynamicClient

如果需要对扩展资源进行操作,则需要借助于dynamicClient。这里通过流行的监控系统Prometheus为例,来做简要的说明。


部署Prometheus的YAML文件大致如下:

apiVersion: monitoring.coreos.com/v1

kind: Prometheus

metadata:

  name: k8s

  labels:

    prometheus: k8s

spec:

  replicas: 1

  version: "v2.0.0"

  serviceMonitorSelector:

    matchExpressions:

    - {key: k8s-app, operator: Exists}


很显然,kind为"Prometheus"的资源并不是Kubernetes原生支持的资源类型。另外,"ServiceMonitor"也不是Kubernetes支持的类型,例如:

apiVersion: monitoring.coreos.com/v1

kind: ServiceMonitor

metadata:

  name: test1-metrics

......


实际上"Prometheus"和"ServiceMonitor"都是prometheus-operator实现的Kubernetes扩展类型。它们的CRD定义大致如下。

Prometheus CRD:

apiVersion: apiextensions.k8s.io/v1

kind: CustomResourceDefinition

metadata:

  creationTimestamp: null

  name: prometheuses.monitoring.coreos.com

spec:

  group: monitoring.coreos.com

  names:

    kind: Prometheus

    plural: prometheuses

  scope: Namespaced

......


ServiceMonitor CRD:

apiVersion: apiextensions.k8s.io/v1

kind: CustomResourceDefinition

metadata:

  creationTimestamp: null

  name: servicemonitors.monitoring.coreos.com

spec:

  group: monitoring.coreos.com

  names:

    kind: ServiceMonitor

    plural: servicemonitors

  scope: Cluster


通过如下命令注册CRD:

kubectl apply -f prometheus_crd.yaml

kubectl apply -f servicemonitor_crd_yaml


Prometheus-operator中对kubernetes client-go中的dynamicClient的处理主要封装在下面的文件中,

https://github.com/coreos/prometheus-operator/blob/master/pkg/client/monitoring/v1/client.go


其中关键函数如下:

package v1


import (

"fmt"

"strings"


"k8s.io/apimachinery/pkg/runtime/schema"

"k8s.io/apimachinery/pkg/runtime/serializer"

dynamic "k8s.io/client-go/deprecated-dynamic"

"k8s.io/client-go/kubernetes/scheme"

"k8s.io/client-go/rest"

)


func NewForConfig(crdKinds *CrdKinds, apiGroup string, c *rest.Config) (*MonitoringV1Client, error) {

config := *c

SetConfigDefaults(apiGroup, &config)

client, err := rest.RESTClientFor(&config)

if err != nil {

return nil, err

}


dynamicClient, err := dynamic.NewClient(&config, schema.GroupVersion{

Group:   apiGroup,

Version: Version,

})

if err != nil {

return nil, err

}


return &MonitoringV1Client{client, dynamicClient, crdKinds}, nil

}


在上面的函数中,创建了kubernetes client-go中dynamicClient的实例。之后与apiserver的交互就是通过这个dynamicClient的实例进行的。不过有两点值得注意:

1、与apiserver的list-watch的交互,是通过restclient进行的;其它操作则是通过dynamiclient进行的;

2、kubernetes client-go对dynamic包进行了更新,之前的dynamic改名为deprecated-dynamic。所以,以后要使用新的dynamic包。


小结

本文通过几个小例子简要介绍了kubernetes client-go的用法,以及如下几个重要的概念:

1、list-watch机制:是实现controller的关键;

2、通过CustomResourceDefinition可以对kubernetes进行扩展,这时就需要使用dynamicClient来管理这些扩展资源。

文章转载自零君聊软件,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论