本章我们学习在vscode
上的remote container
插件基础上,尝试debug
和学习kubesphere
后端模块架构。
前提
安装好 vscode
以及remote container
插件;在远程主机上安装好 kubenertes
容器"操作系统"和kubesphere >= v3.1.0
云“控制面板”;安装 go >=1.16
;在 kubesphere
上使用ks-installer
安装了需要debug
的插件,如devops
、kubeedge
或者whatever
。
配置launch文件
$ cat .vscode/launch.json{// 使用 IntelliSense 了解相关属性。// 悬停以查看现有属性的描述。// 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387"version": "0.2.0","configurations": [{"name": "Launch Package","type": "go","request": "launch","mode": "auto","program": "${workspaceFolder}/cmd/ks-apiserver/apiserver.go"}]}
ks-apiserver调试依赖文件
在相对路径cmd/ks-apiserver/
下配置kubesphere.yaml
。
首先,查看集群之中的cm
配置文件:
$ kubectl -n kubesphere-system get cm kubesphere-config -oyamlapiVersion: v1data:kubesphere.yaml: |authentication:authenticateRateLimiterMaxTries: 10authenticateRateLimiterDuration: 10m0sloginHistoryRetentionPeriod: 168hmaximumClockSkew: 10smultipleLogin: TruekubectlImage: kubesphere/kubectl:v1.20.0jwtSecret: "Xtc8ZWUf9f3cJN89bglrTJhfUPMZR87d"oauthOptions:clients:- name: kubespheresecret: kubesphereredirectURIs:- '*'network:ippoolType: nonemonitoring:endpoint: http://prometheus-operated.kubesphere-monitoring-system.svc:9090enableGPUMonitoring: falsegpu:kinds:- resourceName: nvidia.com/gpuresourceType: GPUdefault: Truenotification:endpoint: http://notification-manager-svc.kubesphere-monitoring-system.svc:19093kubeedge:endpoint: http://edge-watcher.kubeedge.svc/api/gateway:watchesPath: /var/helm-charts/watches.yamlnamespace: kubesphere-controls-systemkind: ConfigMapmetadata:name: kubesphere-confignamespace: kubesphere-system
以上是我安装的或ks
默认激活的几个组件,将其中的yaml
文件拷贝过来,并加上kubeconfig
文件路径的配置。
之所以需要添加云主机上的kubeconfig
文件,主要是因为我们是远程主机去debug
, 而容器中会用到inclusterconfig
就不需要添加了。
$ cat ./cmd/ks-apiserver/kubesphere.yamlkubernetes:kubeconfig: "/root/.kube/config"master: https://192.168.88.6:6443$qps: 1e+06burst: 1000000authentication:authenticateRateLimiterMaxTries: 10authenticateRateLimiterDuration: 10m0sloginHistoryRetentionPeriod: 168hmaximumClockSkew: 10smultipleLogin: TruekubectlImage: kubesphere/kubectl:v1.20.0jwtSecret: "Xtc8ZWUf9f3cJN89bglrTJhfUPMZR87d"oauthOptions:clients:- name: kubespheresecret: kubesphereredirectURIs:- '*'network:ippoolType: nonemonitoring:endpoint: http://prometheus-operated.kubesphere-monitoring-system.svc:9090enableGPUMonitoring: falsegpu:kinds:- resourceName: nvidia.com/gpuresourceType: GPUdefault: Truenotification:endpoint: http://notification-manager-svc.kubesphere-monitoring-system.svc:19093kubeedge:endpoint: http://edge-watcher.kubeedge.svc/api/gateway:watchesPath: /var/helm-charts/watches.yamlnamespace: kubesphere-controls-system
现在就可以通过F5
来启动debug
了。
在debug
之前,你可能会问,这个配置文件为啥要放在/cmd/ks-apiserver/kubesphere.yaml
?
我们先来探索一波ks-apiserver
的运行逻辑。
启动ks-apiserver
查看cmd/ks-apiserver/app/server.go
的逻辑:
// Load configuration from fileconf, err := apiserverconfig.TryLoadFromDisk()
TryLoadFromDisk
的逻辑如下:
viper.SetConfigName(defaultConfigurationName) // defaultConfigurationName = "kubesphere"// AddConfigPath adds a path for Viper to search for the config file in.// Can be called multiple times to define multiple search paths.viper.AddConfigPath(defaultConfigurationPath) // defaultConfigurationPath = "/etc/kubesphere"// Load from current working directory, only used for debuggingviper.AddConfigPath(".")// Load from Environment variables// E.g. if your prefix is "spf", the env registry will look for env variables that start with "SPF_".viper.SetEnvPrefix("kubesphere")viper.AutomaticEnv()viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))// 单步调试,这一步读取的文件路径是 jww.INFO.Println("Searching for config in ", v.configPaths)// v.configPaths:["/etc/kubesphere","/root/go/src/kubesphere.io/kubesphere/cmd/ks-apiserver"]if err := viper.ReadInConfig(); err != nil {if _, ok := err.(viper.ConfigFileNotFoundError); ok {return nil, err} else {return nil, fmt.Errorf("error parsing configuration file %s", err)}}conf := New() // 获取各个组件的默认配置if err := viper.Unmarshal(conf); err != nil {return nil, err}return conf, nil
上面的注释,解释了需要在指定路径下添加kubesphere.yaml
来debug
。
我们接着往下撸,这里使用cobra.Command
来调用:
func Run(s *options.ServerRunOptions, ctx context.Context) error {// NewAPIServer 通过给定的配置启动apiserver实例,绑定实例化的各个模块的client,绑定RuntimeClient和RuntimeCacheapiserver, err := s.NewAPIServer(ctx.Done())if err != nil {return err}// PrepareRun 主要是使用resful-go集成kapis, 也就是代理服务err = apiserver.PrepareRun(ctx.Done())if err != nil {return nil}// Start listening, 开启监听return apiserver.Run(ctx)}
上面的注释已经阐述, s.NewAPIServer(ctx.Done())
主要是启动apiserver
实例, 集成了apis
功能,PrepareRun
主要是使用resful-go
集成kapis
, 也就是代理服务。下面分开阐述说明。
NewAPIServer
client
实例化完毕后会启动一个server
来响应请求:
...server := &http.Server{Addr: fmt.Sprintf(":%d", s.GenericServerRunOptions.InsecurePort),}if s.GenericServerRunOptions.SecurePort != 0 {certificate, err := tls.LoadX509KeyPair(s.GenericServerRunOptions.TlsCertFile, s.GenericServerRunOptions.TlsPrivateKey)if err != nil {return nil, err}server.TLSConfig = &tls.Config{Certificates: []tls.Certificate{certificate},}server.Addr = fmt.Sprintf(":%d", s.GenericServerRunOptions.SecurePort)}sch := scheme.Schemeif err := apis.AddToScheme(sch); err != nil {klog.Fatalf("unable add APIs to scheme: %v", err)}...
注意这一步apis.AddToScheme(sch)
, 看下怎么注入apis
接口的:
// AddToSchemes may be used to add all resources defined in the project to a Schemevar AddToSchemes runtime.SchemeBuilder// AddToScheme adds all Resources to the Schemefunc AddToScheme(s *runtime.Scheme) error {return AddToSchemes.AddToScheme(s)}
而AddToSchemes
这个类型的是[]func(*Scheme) error
的别名,只需要在package apis
下的接口文件中实现init()
方法来导入实现的版本API
,就可以注入对应的API
,举个例子:
$ cat pkg/apis/addtoscheme_dashboard_v1alpha2.gopackage apisimport monitoringdashboardv1alpha2 "kubesphere.io/monitoring-dashboard/api/v1alpha2"func init() { AddToSchemes = append(AddToSchemes, monitoringdashboardv1alpha2.SchemeBuilder.AddToScheme)}
也就是,我们开发的插件集成的版本化API
,必须集成xxx.SchemeBuilder.AddToScheme
功能。
至此,所有子模块对应的client
已经与这个apiserver
绑定,apis
也注入了。
PrepareRun
我们探讨下PrepareRun
是怎么注册kapis
以及绑定handler
的。
主要是通过restful-go
框架来实现的,前文提到该框架是使用container
来hold
住拥有特定GVR
的webservice
, 而一个webserver
可以绑定多个router
,container
或者webserver
还能自定义拦截器,也就是调用filter
方法。
func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error { s.container = restful.NewContainer() / 添加请求Request日志拦截器 s.container.Filter(logRequestAndResponse) s.container.Router(restful.CurlyRouter{}) / RecoverHandler changes the default function (logStackOnRecover) to be called / when a panic is detected. DoNotRecover must be have its default value (=false). s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) { logStackOnRecover(panicReason, httpWriter) }) / 注册、绑定kapi路由和回调函数,发生在所有informers启动之前 s.installKubeSphereAPIs() / 注册metrics指标: ks_server_request_total、ks_server_request_duration_seconds / 绑定API kapis/metrics以及handle, 主要是调用prometheus collector集成,在pkg/utils/metrics/metrics.go s.installMetricsAPI() / 过滤无效监控请求 s.container.Filter(monitorRequest) for _, ws := range s.container.RegisteredWebServices() { klog.V(2).Infof("%s", ws.RootPath()) } / 将container绑定给s.Server.Handler s.Server.Handler = s.container / 添加k8s api资源、auditing资源、登录验证、凭据验证等各个调用链的拦截器 s.buildHandlerChain(stopCh) return nil}
上面主要使用restful-go
框架给s.Server.handler
绑定了一个container
, 添加了各种拦截器。
在s.installKubeSphereAPIS()
这一步安装GVR
绑定了kapis
代理,具体是这样实现的:
// 调用各api组的AddToContainer方法来向container注册api:urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))// 详细来说,各个组件实现的AddToContainer方法则实现了带组和版本信息子路由和handler的绑定:// 首先给router绑定/kapis的父router。ws := runtime.NewWebService(GroupVersion)// 给子路由绑定回调函数ws.Route(ws.GET("/kubesphere").To(h.handleKubeSphereMetricsQuery).Doc("Get platform-level metric data.").Metadata(restfulspec.KeyOpenAPITags, []string{constants.KubeSphereMetricsTag}).Writes(model.Metrics{}).Returns(http.StatusOK, respOK, model.Metrics{})).Produces(restful.MIME_JSON)
至此,ks-apiserver
就启动了,我们做一下简单总结:
根据配置文件验证并创建 ks-apiserver
实例, 该实例启动调用了两个关键方法,分别是NewAPIServer
和PrepareRun
;NewAPIServer
通过给定的配置启动实例,绑定实例化的各个模块的client
,绑定RuntimeClient
和RuntimeCache
;PrepareRun
通过restful-go
框架来注册、绑定kapi
路由和回调函数, 注意oauth
模块是不包含kapis
组和版本信息的,同时实现了本身的metrics
接口,以及添加各调用链的拦截器;最后, 调用 Run
方法启动ks-apiserver
服务;
假定远程云主机的服务已经启动,服务端口在9090
,下面介绍下各个模块的逻辑。
显然,我们只需要关注各模块的AddToContainer
方法就行了。
iam.kubesphere.io
pkg/kapis/iam/v1alpha2/register.go
从代码注释来看,这个模块管理着users
、clustermembers
、globalroles
、clusterroles
、workspaceroles
、roles
、workspaces groups
、workspace members
、devops members
等账号角色的CRUD
。
现在我们可以在handler
中打上断点,去请求这些api
。
$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/users"$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/clustermembers"$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/users/admin/globalroles"...
oauth.kubesphere.io
pkg/kapis/oauth/register.go
主要进行身份验证和授权,通过第三方服务平台登录或退出,感兴趣可以深入了解。
顺带一提,oauth
里集成了/kapis/iam.kubesphere.io/v1alpha2/login
接口,我们尝试请求一个login API
:
$ curl -X POST -H "Content-Type: application/json" -d '{"username":"admin","password":"P@88w0rd"}' "http://localhost:9090/kapis/iam.kubesphere.io/v1alpha2/login"
最后会通过PasswordVerify(user.Spec.EncryptedPassword, password)
来比对验证的密码是否正确来返回access_token
。
kubeedge.kubesphere.io
pkg/kapis/kubeedge/v1alpha1/register.go
代码里面使用的代理转发请求:
func AddToContainer(container *restful.Container, endpoint string) error { proxy, err := generic.NewGenericProxy(endpoint, GroupVersion.Group, GroupVersion.Version) {if err != nil {return nil}return proxy.AddToContainer(container)}
也就是 kapis/kubeedge.kubesphere.io
的请求会转发到http://edge-watcher.kubeedge.svc/api/
,也就是kubeedge
下的service
,相关的接口集成在那里。
关于边缘计算平台的集成,顺带一提,如果需要支撑不止一家的平台,可以集成一个edge-shim
的适配器,大概需要集成以下几个接口:
check
接口:确保云端的组件已经成功部署;join
接口:用于边缘节点与云端节点加入的命令行返回,用来添加边缘节点到集群;验证接口:用于检查边缘节点名称和
ip
是否合法,目前耦合在join
接口中;监控接口:提供边缘
prometheus
的采集指标,或者使用metrics-server
来集成,像kubeedge
低版本依赖iptables
的转发,看能否需要将此扩展成接口,其他边缘计算平台或许有类似的需求;代理
endpoint
: 现在的kubeedge
就是使用代理模式转发,对应的代理服务至少需要实现join
接口;
notification.kubesphere.io
pkg/kapis/notification/v2beta1/register.go
这个组下的api
主要实现了notification
的全局或租户级别的config
和receivers
资源的CRUD
。
config资源
用于配置对接通知渠道相关参数的一些配置,分为全局的和租户级别的
config
资源;
reciever资源
用于配置接收者的一些配置信息,区分全局的和租户级别的接收者;
我们挑选一个回调函数进行剖析:
func (h *handler) ListResource(req *restful.Request, resp *restful.Response) {// 租户或用户的名称user := req.PathParameter("user")// 资源类型,configs/recievers/secretsresource := req.PathParameter("resources")// 通知渠道 dingtalk/slack/email/webhook/wechatsubresource := req.QueryParameter("type")q := query.ParseQueryParameter(req)if !h.operator.IsKnownResource(resource, subresource) {api.HandleBadRequest(resp, req, servererr.New("unknown resource type %s/%s", resource, subresource))return}objs, err := h.operator.List(user, resource, subresource, q)handleResponse(req, resp, objs, err)}
我们看下list object
的逻辑:
// List objectsfunc (o *operator) List(user, resource, subresource string, q *query.Query) (*api.ListResult, error) {if len(q.LabelSelector) > 0 {q.LabelSelector = q.LabelSelector + ","}filter := "" // 如果没有给定租户的名称,则获取全局的对象if user == "" {if isConfig(o.GetObject(resource)) {// type=default对config资源来说是全局的filter = "type=default"} else {// type=global对receiever资源来说是全局的filter = "type=global"}} else {// 否则就给过滤器绑定租户名称filter = "type=tenant,user=" + user}// 组装过滤标签q.LabelSelector = q.LabelSelector + filter...// 通过过滤标签获取cluster或者namespace下的指定资源res, err := o.resourceGetter.List(resource, ns, q)if err != nil {return nil, err}if subresource == "" || resource == Secret {return res, nil}results := &api.ListResult{}...}
这样一来,就实现了租户级别的通知告警CR
配置的CRUD
,这些CR
是这么分类的:
config
分为全局type = default
, 租户type = tenant
两种级别;reciever
分为全局type = global
, 租户type = tenant
两种级别;
那么config
和reciever
怎么相互绑定、告警是如何通过渠道给租户发消息的?
https://github.com/kubesphere/notification-manager/blob/master/pkg/webhook/v1/handler.go#L45
https://github.com/kubesphere/notification-manager/blob/master/pkg/notify/notify.go#L66
notification-manager
简称nm
,我这里断章取义地简要回答一下。
功能方面:
全局配置 reciever
通过配置的渠道将所有的alerts
发送给其定义好的接收者名单, 配置了租户信息的reciever
只能通过渠道发送当前ns
下的alerts
;reciever
中可以通过配置alertSelector
参数来进一步过滤告警消息;通过修改名为 notification-manager-template
的confimap
来定制发送消息模板;
告警到通知的流程:
nm
使用端口19093
和API
路径/api/v2/alerts
接收从Alertmanager
发送的告警;回调函数接受 alerts
转换为notification
模板数据,按照namespace
区分告警数据,没有ns
则将其key
置为""
;遍历所有 Recievers
,每个ns
下启动一个协程调用Notify
接口方法发送消息, 这里每个ns
对应多个通知渠道,也使用waitgroup
来并发完成任务;
monitoring.kubesphere.io
pkg/kapis/monitoring/v1alpha3/register.go
将监控指标分为平台级、节点级、workspaces
、namespaces
、pods
等级别,不仅可以获取总的统计,还能获取nodes/namespaces/workspaces
下的所有pods/containers
等监控指标。
我们查看回调函数,以handleNamedMetricsQuery
为例分析:
遍历给定指标级别下的合法 metric
指标,根据请求参数中metricFilter
的来过滤指标名;判断为范围查询还是实时查询,来调取 monitoring
包中相关方法,通过对应的client
请求后端获取结果返回;
代码如下:
func (h handler) handleNamedMetricsQuery(resp *restful.Response, q queryOptions) {var res model.Metricsvar metrics []string// q.namedMetrics 是一组按照监控指标级别分类好的拥有promsql expr定义的完整指标名数组// 监控指标级别分类是根据 monitoring.Levelxxx在上一个栈里细分的,i.e: monitoring.LevelPodfor _, metric := range q.namedMetrics {if strings.HasPrefix(metric, model.MetricMeterPrefix) {// skip meter metriccontinue}// 根据请求参数中的指标名来过滤ok, _ := regexp.MatchString(q.metricFilter, metric)if ok {metrics = append(metrics, metric)}}if len(metrics) == 0 {resp.WriteAsJson(res)return}// 判断是否是范围查询还是实时查询,继续调用相关函数// 主要还是用prometheus client去查询promsql, 边缘节点的指标目前通过metrics server来查询if q.isRangeQuery() {res = h.mo.GetNamedMetricsOverTime(metrics, q.start, q.end, q.step, q.option)} else {res = h.mo.GetNamedMetrics(metrics, q.time, q.option)if q.shouldSort() {res = *res.Sort(q.target, q.order, q.identifier).Page(q.page, q.limit)}}resp.WriteAsJson(res)}
现在,我们将视角移植到:
pkg/models/monitoring/monitoring.go:156
以GetNamedMetricsOverTime
为例,这里阐述了会合并prometheus
和metrics-server
的查询结果进行返回:
func (mo monitoringOperator) GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt monitoring.QueryOption) Metrics {// 获取prometheus client查询结果,主要使用sync.WaitGroup并发查询,每个指标启动一个goroutine,最后将结果和并返回ress := mo.prometheus.GetNamedMetricsOverTime(metrics, start, end, step, opt)// 如果metrics-server激活了if mo.metricsserver != nil {//合并边缘节点数据edgeMetrics := make(map[string]monitoring.MetricData)for i, ressMetric := range ress {metricName := ressMetric.MetricNameressMetricValues := ressMetric.MetricData.MetricValuesif len(ressMetricValues) == 0 {// this metric has no prometheus metrics dataif len(edgeMetrics) == 0 {// start to request monintoring metricsApi datamr := mo.metricsserver.GetNamedMetricsOverTime(metrics, start, end, step, opt)for _, mrMetric := range mr {edgeMetrics[mrMetric.MetricName] = mrMetric.MetricData}}if val, ok := edgeMetrics[metricName]; ok {ress[i].MetricData.MetricValues = append(ress[i].MetricData.MetricValues, val.MetricValues...)}}}}return Metrics{Results: ress}}
此外,monitoring
包还定义了各监控查询 client
的接口方法,可以按需探索:
GetMetric(expr string, time time.Time) MetricGetMetricOverTime(expr string, start, end time.Time, step time.Duration) MetricGetNamedMetrics(metrics []string, time time.Time, opt QueryOption) []MetricGetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt QueryOption) []MetricGetMetadata(namespace string) []MetadataGetMetricLabelSet(expr string, start, end time.Time) []map[string]string
tenant.kubesphere.io
再聊api
之前,顺带一提多租户在隔离的安全程度上,我们可以将其分为软隔离 (Soft Multi-tenancy) 和硬隔离 (Hard Multi-tenancy) 两种。
软隔离更多的是面向企业内部的多租需求; 硬隔离面向的更多是对外提供服务的服务供应商,需要更严格的隔离作为安全保障。
这个group
下比较重要的部分是实现租户查询logs/audits/events
:
以查询日志为例:
func (h *tenantHandler) QueryLogs(req *restful.Request, resp *restful.Response) {// 查询上下文中携带的租户信息user, ok := request.UserFrom(req.Request.Context())if !ok {err := fmt.Errorf("cannot obtain user info")klog.Errorln(err)api.HandleForbidden(resp, req, err)return}// 解析查询的参数,比如确定属于哪个ns/workload/pod/container的查询、时间段,是否为柱状查询等queryParam, err := loggingv1alpha2.ParseQueryParameter(req)if err != nil {klog.Errorln(err)api.HandleInternalError(resp, req, err)return}// 导出数据if queryParam.Operation == loggingv1alpha2.OperationExport {resp.Header().Set(restful.HEADER_ContentType, "text/plain")resp.Header().Set("Content-Disposition", "attachment")// 验证账号是否有权限// admin账号可以导出所有ns的日志,租户只能导出本ns的日志// 组装loggingclient进行日志导出err := h.tenant.ExportLogs(user, queryParam, resp)if err != nil {klog.Errorln(err)api.HandleInternalError(resp, req, err)return}} else {// 验证账号是否有权限// admin账号可以查看所有ns的日志,租户只能查看本ns的日志// 组装loggingclient进行日志返回result, err := h.tenant.QueryLogs(user, queryParam)if err != nil {klog.Errorln(err)api.HandleInternalError(resp, req, err)return}resp.WriteAsJson(result)}}
这里顺带一提,关于租户权限验证这块,可以看下这里接口定义:
// Authorizer makes an authorization decision based on information gained by making// zero or more calls to methods of the Attributes interface. It returns nil when an action is// authorized, otherwise it returns an error.type Authorizer interface {Authorize(a Attributes) (authorized Decision, reason string, err error)}
alwaysAllowAuthorizer/alwaysDenyAuthorizer/rabc
都实现该方法。
前面在ks-apiserver
启动的时候,有个buildHandlerChain
方法:
var authorizers authorizer.Authorizer// 默认是RBAC,可以通过authorization.mode在启动配置文件修改switch s.Config.AuthorizationOptions.Mode {case authorizationoptions.AlwaysAllow:authorizers = authorizerfactory.NewAlwaysAllowAuthorizer()case authorizationoptions.AlwaysDeny:authorizers = authorizerfactory.NewAlwaysDenyAuthorizer()default:fallthroughcase authorizationoptions.RBAC:excludedPaths := []string{"/oauth/*", "/kapis/config.kubesphere.io/*", "/kapis/version", "/kapis/metrics"}pathAuthorizer, _ := path.NewAuthorizer(excludedPaths)amOperator := am.NewReadOnlyOperator(s.InformerFactory)authorizers = unionauthorizer.New(pathAuthorizer, rbac.NewRBACAuthorizer(amOperator))}
由于篇幅有限,只对以上GVR
进行了调试,感兴趣可以深入了解~
码字不易,记得三连哦




