Kubernetes-CRD开发
CRD
Custom Resource Define
简称 CRD,是 Kubernetes(v1.7+)为提高可扩展性,让开发者去自定义资源的一种方式。CRD 资源可以动态注册到集群中,注册完毕后,用户可以通过 kubectl 来创建访问这个自定义的资源对象,类似于操作 Pod 一样。不过需要注意的是 CRD 仅仅是资源的定义而已,这个资源定义在 ETCD 数据库中。还需要一个 Controller 去监听 CRD 的各种事件来添加自定义的业务逻辑。
如果说只是对 CRD 资源本身进行 CRUD 操作的话,不需要 Controller 也是可以实现的,相当于就是只有数据存入了 etcd 中,而没有对这个数据的相关操作而已。
在开发自定义控制器是需要想好是需求,如此时需求就是:我需要定义一个控制器,它创建一个 Pod 的同时把 SVC 也创建完成。
定义期望状态的资源模板
首先我们要定义一下最终 yaml 效果与要实现的功能
最终目的是只在一个 yaml 里填写 image 与 port 就可以完成 pod 与 service 的创建,下面是根据期望目的设计的资源模板
# appservice-demo.yaml
apiVersion: stable.example.com/v1beta1
kind: AppService
metadata:
name: nginx-app
spec:
image: nginx:latest
labels:
app: nginx
ports:
- port: 80
targetPort: 80
nodePort: 30001
通过 CRD 创建资源
那么接下来就要定义一下 CRD
# crd-demo.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
# name 必须匹配下面的spec字段:<plural>.<group>
name: appservices.stable.example.com
spec:
# group 名用于 REST API 中的定义:/apis/<group>/<version>
group: stable.example.com
# 列出自定义资源的所有 API 版本
versions:
- name: v1beta1 # 版本名称,比如 v1、v2beta1 等等
served: true # 是否开启通过 REST APIs 访问 `/apis/<group>/<version>/...`
storage: true # 必须将一个且只有一个版本标记为存储版本
schema: # 定义自定义对象的声明规范
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
image:
type: string
labels:
type: object
additionalProperties:
type: string
ports:
type: array
items:
type: object
properties:
port:
type: integer
minimum: 1
maximum: 65535
targetPort:
type: integer
minimum: 1
maximum: 65535
nodePort:
type: integer
minimum: 30000
maximum: 32767
# 定义作用范围:Namespaced(命名空间级别)或者 Cluster(整个集群)
scope: Namespaced
names:
# kind 是 sigular 的一个驼峰形式定义,在资源清单中会使用
kind: AppService
# plural 名字用于 REST API 中的定义:/apis/<group>/<version>/<plural>
plural: appservices
# singular 名称用于 CLI 操作或显示的一个别名
singular: appservice
# shortNames 相当于缩写形式
shortNames:
- as
创建
先创建 crd 资源,发现资源定义已经创建成功了。
[root@kube01 crd]# kubectl apply -f crd-demo.yaml
customresourcedefinition.apiextensions.k8s.io/appservices.stable.example.com created
再执行创建 appservice 的资源,资源创建完成后可以通过 appservice 查看,也可以通过 as 缩写查看,这个时候资源已经创建了,但是没有实际的效果,因为没有 controller 控制器或者这个资源并进行操作,这个时候只是把信息存入了 etcd。所以后续要开发控制器。
[root@kube01 crd]# kubectl apply -f appservice-demo.yaml
appservice.stable.example.com/nginx-app created
[root@kube01 crd]# kubectl get appservice
NAME AGE
nginx-app 86s
[root@kube01 crd]# kubectl get as
NAME AGE
nginx-app 89s
控制器开发
先再开发之前我们需要看一张图,了解一下自定义控制器开发流程
通过中间横线,可以区分出控制器开发分为两个部分
Client-go部分
功能概要:
- Reflector:采用 List/Watch 机制, 可以 Watch 任何资源包括 CRD 添加 object 对象到 FIFO 队列,然后 Informer 会从队列里面取数据
- Informer:controller 机制的基础,循环处理 object 对象 从 Reflector 取出数据,然后将数据给到 Indexer 去缓存,提供对象事件的 handler 接口,只要给 Informer 添加 ResourceEventHandler 实例的回调函数,去实现
OnAdd(obj interface{})
、OnUpdate(oldObj, newObj interface{})
和OnDelete(obj interface{})
这三个方法,就可以处理好资源的创建、更新和删除操作了。 - Indexer:提供 object 对象的索引,是线程安全的,缓存对象信息
Controller部分
功能概要:
- Informer reference: controller 需要创建合适的 Informer 才能通过 Informer reference 操作资源对象
- Indexer reference: controller 创建 Indexer reference 然后去利用索引做相关处理
- Resource Event Handlers:Informer 会回调这些 handlers
- Work queue: Resource Event Handlers 被回调后将 key 写到工作队列,这里的 key 相当于事件通知,后面根据取出事件后,做后续的处理
- Process Item:从工作队列中取出 key 后进行后续处理,具体处理可以通过 Indexer reference controller 可以直接创建上述两个引用对象去处理,也可以采用工厂模式,官方都有相关示例
自主开发的逻辑部分
图上的三个黄色区域
就是自主开发部分,其他的可以用官方库实现。
具体开发代码
我们这里不通过开源的 Operator 工具开发,如 Kube Builder、Operator-SDK,而是根据官方案例全面了解开发过程。
创建项目
创建项目并进行初始化
# mkdir AppService-Controller
# cd AppService-Controller
# go mod init AppService-Controller
按照官方控制器案例创建文件夹
按照官方项目格式创建两个目录,hack 是存放执行代码生成器的脚本,pkg 是存放资源类型与代码生成器生成的代码。
# mkdir hack
# mkdir pkg
在 pkg 目录下创建文件夹与文件:
- apis:是根据 kubernetes 项目保持一致,文件结构也是 api 路由结构 /apis/stable/v1beta1/
- stable:是组名
- v1beta1:是版本名
- doc.go:存放全局 tag
- register.go:注册资源进 scheme
- types.go:存放资源类型
[root@localhost pkg]# tree
.
├── apis
│ └── stable
│ └── v1beta1
│ ├── doc.go
│ ├── register.go
│ └── types.go
在 hack 目录下创建文件:
- boilerplate.go.txt:开源协议
- tools.go:导入 code-generator 包
- update-codegen.sh:代码生成器执行脚本
[root@localhost hack]# tree
.
├── boilerplate.go.txt
├── tools.go
└── update-codegen.sh
doc.go
创建全局的代码生成器 tag
// 为包中任何类型生成深拷贝方法,可以在局部 tag 覆盖此默认行为
// +k8s:deepcopy-gen=package
// +groupName=stable.example.com
package v1beta1
types.go
types 中创建与 crd 对应的资源结构体,并且添加代码生成器,代码生成器功能如下:
- +genclient: 为这个 package 创建 client。
- +genclient:noStatus: 当创建 client 时,不存储 status。
- +k8s:deepcopy-- gen:interfaces=http://k8s.io/apimachinery/pkg/runtime.Object: 为结构体生成 deepcopy 的代码,实现了 runtime.Object 的 Interface。
package v1beta1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type AppService struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec AppServiceSpec `json:"spec"`
}
type AppServiceSpec struct {
Image string `json:"image"`
Labels map[string]string `json:"labels"`
Ports []Port `json:"ports"`
}
type Port struct {
Port int `json:"port"`
TargetPort int `json:"targetPort"`
NodePort int `json:"nodePort"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type AppServiceList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []AppService `json:"items"`
}
register.go
用来注册 crd 到 scheme,需要注意的是SchemeGroupVersion
里的组与版本换成自己的,addKnownTypes
注册资源与资源列表
package v1beta1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: "stable.example.com", Version: "v1beta1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
// SchemeBuilder initializes a scheme builder
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
// AddToScheme is a global function that registers this API group & version to a scheme
AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&AppService{},
&AppServiceList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
在完成三个文件创建完成之后,通过idea
编辑器会发现register.go
有红色报错,资源类型没有深拷贝方法,他们注册到scheme
中没办法通过接口检测。(鸭子类型)
tools.go
导入 code-generator 包
package tools
import _ "k8s.io/code-generator"
update-codegen.sh
#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
# 此处为变量,是 generate-groups.sh 的文件夹
CODEGEN_PKG="/root/go/pkg/mod/k8s.io/code-generator@v0.27.3"
# generate the code with:
# --output-base because this script should also be able to run inside the vendor dir of
# k8s.io/kubernetes. The output-base is needed for the generators to output into the vendor dir
# instead of the $GOPATH directly. For normal projects this can be dropped.
# 执行代码生成功能,生成 deepcopy,client,informer,lister
# AppService-Controller 是 go.mod 中的项目地址
# stable:v1beta1 根据自己设计的组与版本填写
# output-base 为输出目录
# go-header-file 为每个文件添加一个头文件,就是开源协议
"${CODEGEN_PKG}/generate-groups.sh" "deepcopy,client,informer,lister" \
AppService-Controller/pkg/generated \
AppService-Controller/pkg/apis \
stable:v1beta1 \
--output-base ../pkg/ \
--go-header-file ./boilerplate.go.txt
# To use your own boilerplate text append:
# --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt
boilerplate.go.txt
开源协议
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
执行 hack 里的脚本生成代码
[root@localhost hack]# sh update-codegen.sh
Generating deepcopy funcs
Generating clientset for stable:v1beta1 at AppService-Controller/pkg/generated/clientset
Generating listers for stable:v1beta1 at AppService-Controller/pkg/generated/listers
Generating informers for stable:v1beta1 at AppService-Controller/pkg/generated/informers
查看项目结构
[root@localhost AppService-Controller]# tree
.
├── go.mod
├── go.sum
├── hack
│ ├── boilerplate.go.txt
│ ├── tools.go
│ └── update-codegen.sh
└── pkg
├── apis
│ └── stable
│ └── v1beta1
│ ├── doc.go
│ ├── register.go
│ └── types.go
└── AppService-Controller
└── pkg
├── apis
│ └── stable
│ └── v1beta1
│ └── zz_generated.deepcopy.go
└── generated
├── clientset
│ └── versioned
│ ├── clientset.go
│ ├── fake
│ │ ├── clientset_generated.go
│ │ ├── doc.go
│ │ └── register.go
│ ├── scheme
│ │ ├── doc.go
│ │ └── register.go
│ └── typed
│ └── stable
│ └── v1beta1
│ ├── appservice.go
│ ├── doc.go
│ ├── fake
│ │ ├── doc.go
│ │ ├── fake_appservice.go
│ │ └── fake_stable_client.go
│ ├── generated_expansion.go
│ └── stable_client.go
├── informers
│ └── externalversions
│ ├── factory.go
│ ├── generic.go
│ ├── internalinterfaces
│ │ └── factory_interfaces.go
│ └── stable
│ ├── interface.go
│ └── v1beta1
│ ├── appservice.go
│ └── interface.go
└── listers
└── stable
└── v1beta1
├── appservice.go
└── expansion_generated.go
改变文件路径
需要把zz_generated.deepcopy.go
移动到 pkg/apis/stable/v1beta1 下面,移动完成后register.go
就不会报错了。
[root@localhost AppService-Controller]# mv pkg/AppService-Controller/pkg/apis/stable/v1beta1/zz_generated.deepcopy.go pkg/apis/stable/v1beta1/
需要把generated
目录移动到 pkg 下面
[root@localhost AppService-Controller]# mv pkg/AppService-Controller/pkg/generated/ pkg/
删除一些文件夹优化文件结构,因为里面内容已经移走了,所以可以删除
[root@localhost AppService-Controller]# rm -rf pkg/AppService-Controller/
编写代码
创建 main.go 文件
通过编写一个 demo 实现了监听自定义资源的作用,如果想要实现监听到资源创建 deployment 则可以通过完善 HandlerObject
完成。
package main
import (
"flag"
"fmt"
"path/filepath"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
clientset "AppService-Controller/pkg/generated/clientset/versioned"
"AppService-Controller/pkg/generated/informers/externalversions"
v1beta12 "AppService-Controller/pkg/generated/informers/externalversions/stable/v1beta1"
"AppService-Controller/pkg/generated/listers/stable/v1beta1"
)
// 定义一个 Controller 结构体
type Controller struct {
queue workqueue.RateLimitingInterface
appServiceLister v1beta1.AppServiceLister
appServiceSynced cache.InformerSynced
}
// New一个结构体
func NewController(queue workqueue.RateLimitingInterface, informer v1beta12.AppServiceInformer) *Controller {
controller := &Controller{
queue: queue,
appServiceLister: informer.Lister(),
appServiceSynced: informer.Informer().HasSynced,
}
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
}
controller.queue.Add(key)
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
runtime.HandleError(err)
}
controller.queue.Add(key)
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
}
controller.queue.Add(key)
},
})
return controller
}
// 携程执行初始化函数
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
defer runtime.HandleCrash()
// 关闭queue
defer c.queue.ShutDown()
fmt.Printf("Start Custom Controller")
//等待Informer刷新缓存
if !cache.WaitForCacheSync(stopCh, c.appServiceSynced) {
fmt.Printf("Time out waiting caches to sync")
return
}
//携程处理 queue 的程序数量
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
//chan 使 run 卡在这
<-stopCh
fmt.Printf("Stop Custom Controller")
}
func (c *Controller) runWorker() {
// 一直for 循环拿 queue 里面的 key,如果queue 数组没东西了,会卡住知道有数据进入 queue
for c.ProcessItem() {
}
}
// 获取 key
func (c *Controller) ProcessItem() bool {
// 从 queue 中拿 key
key, quit := c.queue.Get()
if quit {
return false
}
// 函数执行结束删除这个 key
defer c.queue.Done(key)
//执行函数的具体处理功能,如果执行失败,就放进 queue 等待下一次取出 queue 执行
if err := c.HandlerObject(key.(string)); err != nil {
if c.queue.NumRequeues(key) < 5 {
c.queue.Add(key)
}
}
return true
}
// 这边就是通过 queue 里面的 key 获取 indexer 里面的 object
func (c *Controller) HandlerObject(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
appService, err := c.appServiceLister.AppServices(namespace).Get(name)
if err != nil {
return err
}
fmt.Println(appService)
return nil
}
// 初始化 client
func initClient() (*kubernetes.Clientset, *clientset.Clientset, error) {
var err error
var config *rest.Config
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(可选) kubeconfig 文件的绝对路径")
} else {
kubeconfig = flag.String("kubeconfig", "", "kubeconfig 文件的绝对路径")
}
flag.Parse()
if config, err = rest.InClusterConfig(); err != nil {
if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
return nil, nil, err
}
}
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, nil, err
}
customClient, err := clientset.NewForConfig(config)
if err != nil {
return nil, nil, err
}
return k8sClient, customClient, nil
}
func main() {
_, customClient, err := initClient()
if err != nil {
klog.Fatal(err)
}
appServiceSharedInformerFactory := externalversions.NewSharedInformerFactory(customClient, 30*time.Second)
//生成默认的 queue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
//实例化
controller := NewController(queue, appServiceSharedInformerFactory.Stable().V1beta1().AppServices())
stopCh := make(chan struct{})
//启动 informer
appServiceSharedInformerFactory.Start(stopCh)
//执行 Run
go controller.Run(1, stopCh)
defer close(stopCh)
//select 卡住函数
select {}
}
FAQ
types.go 里类型不能有 interface{}
其中要注意的是如果结构体中有interface{}
类型,通过代码生成器会生成不了代码报错如下:
F0713 17:28:43.623119 15553 deepcopy.go:753] DeepCopy of "interface{}" is unsupported. Instead, use named interfaces with DeepCopy<named-interface> as one of the methods.
遇到这种问题,需要自己在 types 里面实现深拷贝方法
if in == nil {
return
}
b, err := yaml.Marshal(in.Values)
if err != nil {
return
}
var values helm.Values
err = yaml.Unmarshal(b, &values)
if err != nil {
return
}
out.Values = values
}```