貔貅云原生

貔貅云原生

Admission 准入控制器

153
0
0
2023-09-26
Admission 准入控制器

准入控制器

什么是准入控制器?就是 API 请求进来后,准许它进入或者丰富进来的 API 请求的控制器。

如图所示,MutatingAdmissionWebhook 和 ValidatingAdmissionWebhook 是由人为设置的。
Mutating 是人为丰富 API 接口数据,比如现在流行的 sidecar 边车服务,就是通过在准入控制器阶段,读到了 API 接口请求,然后再往请求里添加了自己的服务然后再持久化
Validating 是校验 API 接口数据,它会检查 API 中有没有特定的值或者检查格式对不对

k8s-api-request-lifecycle_1686797759.png

在 Kubernetes apiserver 中包含两个特殊的准入控制器:MutatingAdmissionWebhook 和 ValidatingAdmissionWebhook,这两个控制器将发送准入请求到外部的 HTTP 回调服务并接收一个准入响应。如果启用了这两个准入控制器,Kubernetes 管理员可以在集群中创建和配置一个 admission webhook。

创建配置一个 Admission Webhook

通过 kubeadm 部署的 kubernetes 默认开启了准入控制器,这个时候我们只需要写一个 http 服务,用来处理转发而来的请求就可以了。那么为什么 api 请求会转发给 http 服务呢?需要编写 admission 的编排文件,定义好哪些请求需要转发到 http 服务。

编写一个 Admission 服务

现在拟定一个简单的功能需求:对 deployment 服务做准入控制,如果 deployment 有注解为 changeImage: "yes",那么我就将它的镜像改为 nginx:1.7.9,如果注解为 checkImage: "yes",那么判断它的镜像版本是不是latest,不是则不通过。

定义一个 main.go 作为程序入口

package main

import (
	"flag"
	"go-learning/ch37/pkg"
	"k8s.io/klog"
	"net/http"
)

const (
	HealthzHost = "127.0.0.1"
	HealthzPort = "10259"

	CertFile = "/run/secrets/tls/tls.crt"
	KeyFile  = "/run/secrets/tls/tls.key"
)

func main() {
	klog.InitFlags(nil)
	// 接收参数
	flag.Parse()

	// HTTP 服务
	// 重点在于 pkg.Serve
	mux := http.NewServeMux()
	mux.HandleFunc("/mutate", pkg.Serve)
	mux.HandleFunc("/validate", pkg.Serve)
	server := &http.Server{
		Addr:    ":8443",
		Handler: mux,
	}

	// Start Heathz Check
	go pkg.StartHealthzServer(healthzHost, healthzPort)

	klog.Fatal(server.ListenAndServeTLS(CertFile, KeyFile))
}

var (
	healthzHost string // The host of Healthz
	healthzPort string // The port of Healthz to listen on
	certFile    string // path to the x509 certificate for https
	keyFile     string // path to the x509 private key matching `CertFile`
)

func init() {
	flag.StringVar(&healthzHost, "healthz-host", HealthzHost, "The host of Healthz.")
	flag.StringVar(&healthzPort, "healthz-port", HealthzPort, "The port of Healthz to listen on.")
	flag.StringVar(&certFile, "cert-file", CertFile, "File containing the x509 Certificate for HTTPS.")
	flag.StringVar(&keyFile, "key-file", KeyFile, "File containing the x509 private key to --tlsCertFile.")
}

创建一个 pkg 目录,定义一个 helper.go,用作健康检查服务

package pkg

import (
	"k8s.io/klog"
	"net/http"
)

// 启动健康检测
func StartHealthzServer(healthzHost string, healthzPort string) {
	http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
		w.WriteHeader(200)
		w.Write([]byte("ok"))
	})

	klog.Infof("Starting Healthz Server...")
	klog.Fatal(http.ListenAndServe(healthzHost+":"+healthzPort, nil))
}

创建一个 pkg 目录,定义一个 webhook.go,用作核心程序

package pkg

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	v1 "k8s.io/api/apps/v1"
	"net/http"
	"strings"

	"k8s.io/api/admission/v1beta1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/serializer"
	"k8s.io/klog"
)

var (
	universalDeserializer = serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer()
)

// 定义注解名,它的 key 是 yes、no
const (
	admissionWebhookAnnotationValidateKey = "checkImage"
	admissionWebhookAnnotationMutateKey   = "changeImage"
)

// 定义那些命名空间不走准入控制器
var (
	ignoredNamespaces = []string{
		metav1.NamespaceSystem,
		metav1.NamespacePublic,
	}
)

// 定义 api 路由格式
type patchOperation struct {
	Op    string      `json:"op"`
	Path  string      `json:"path"`
	Value interface{} `json:"value,omitempty"`
}

// 创建路由
func createPatch(deploymentSpec *v1.DeploymentSpec) ([]byte, error) {
	var patches []patchOperation
	containers := deploymentSpec.Template.Spec.Containers

	for i := range containers {
		path := fmt.Sprintf("/spec/template/spec/containers/%d/image", i)
		op := patchOperation{
			Op:    "replace",
			Path:  path,
			Value: "nginx:1.7.9",
		}
		patches = append(patches, op)
	}
	return json.Marshal(patches)
}

// 检查注解的值是不是 yes 或 no
func admissionRequired(ignoredList []string, admissionAnnotationKey string, metadata *metav1.ObjectMeta) bool {
	// skip special kubernetes system namespaces
	for _, namespace := range ignoredList {
		if metadata.Namespace == namespace {
			klog.Infof("Skip validation for %v for it's in special namespace:%v", metadata.Name, metadata.Namespace)
			return false
		}
	}

	annotations := metadata.GetAnnotations()
	switch strings.ToLower(annotations[admissionAnnotationKey]) {
	case "n", "no", "false", "off":
		return false
	case "y", "yes", "true", "on":
		return true
	}
	return false
}

// 检查忽略哪些命名空间
func mutationRequired(ignoredList []string, metadata *metav1.ObjectMeta) bool {
	required := admissionRequired(ignoredList, admissionWebhookAnnotationMutateKey, metadata)
	klog.Infof("Mutation policy for %v/%v: required:%v", metadata.Namespace, metadata.Name, required)
	return required
}

// 检查忽略哪些命名空间
func validationRequired(ignoredList []string, metadata *metav1.ObjectMeta) bool {
	required := admissionRequired(ignoredList, admissionWebhookAnnotationValidateKey, metadata)
	klog.Infof("Validation policy for %v/%v: required:%v", metadata.Namespace, metadata.Name, required)
	return required
}

// main mutation process
// 准入函数
func mutate(ar *v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
	req := ar.Request
	var (
		deploymentSpec                  *v1.DeploymentSpec
		objectMeta                      *metav1.ObjectMeta
		resourceNamespace, resourceName string
	)

	klog.Infof("AdmissionReview for Kind=%v, Namespace=%v Name=%v (%v) UID=%v patchOperation=%v UserInfo=%v",
		req.Kind, req.Namespace, req.Name, req.UID, req.Operation, req.UserInfo)

	// 判断是不是 deployment,不是则忽略准入修改
	switch req.Kind.Kind {
	case "Deployment":
		var dp v1.Deployment
		if err := json.Unmarshal(req.Object.Raw, &dp); err != nil {
			klog.Errorf("Could not unmarshal raw object: %v", err)
			return &v1beta1.AdmissionResponse{
				Result: &metav1.Status{
					Message: err.Error(),
				},
			}
		}
		resourceName, resourceNamespace = dp.Name, dp.Namespace
		objectMeta, deploymentSpec = &dp.ObjectMeta, &dp.Spec
	}

	if !mutationRequired(ignoredNamespaces, objectMeta) {
		klog.Infof("Skipping validation for %s/%s due to policy check", resourceNamespace, resourceName)
		return &v1beta1.AdmissionResponse{
			Allowed: true,
		}
	}

	patchBytes, err := createPatch(deploymentSpec)
	if err != nil {
		return &v1beta1.AdmissionResponse{
			Result: &metav1.Status{
				Message: err.Error(),
			},
		}
	}

	klog.Infof("AdmissionResponse: patch=%v\n", string(patchBytes))
	// 返回了修改后的 api
	return &v1beta1.AdmissionResponse{
		Allowed: true,
		Patch:   patchBytes,
		PatchType: func() *v1beta1.PatchType {
			pt := v1beta1.PatchTypeJSONPatch
			return &pt
		}(),
	}
}

// 准入校验函数
// validate deployments and services
func validate(ar *v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
	req := ar.Request
	var (
		deploymentSpec                  *v1.DeploymentSpec
		objectMeta                      *metav1.ObjectMeta
		resourceNamespace, resourceName string
	)

	klog.Infof("AdmissionReview for Kind=%v, Namespace=%v Name=%v (%v) UID=%v patchOperation=%v UserInfo=%v",
		req.Kind, req.Namespace, req.Name, req.UID, req.Operation, req.UserInfo)

	switch req.Kind.Kind {
	case "Deployment":
		var dp v1.Deployment
		if err := json.Unmarshal(req.Object.Raw, &dp); err != nil {
			klog.Errorf("Could not unmarshal raw object: %v", err)
			return &v1beta1.AdmissionResponse{
				Result: &metav1.Status{
					Message: err.Error(),
				},
			}
		}
		resourceName, resourceNamespace = dp.Name, dp.Namespace
		objectMeta, deploymentSpec = &dp.ObjectMeta, &dp.Spec
	}

	if !validationRequired(ignoredNamespaces, objectMeta) {
		klog.Infof("Skipping validation for %s/%s due to policy check", resourceNamespace, resourceName)
		return &v1beta1.AdmissionResponse{
			Allowed: true,
		}
	}

	allowed := true
	var result *metav1.Status

	for _, v := range deploymentSpec.Template.Spec.Containers {
		if !(v.Image == "nginx:latest") {
			allowed = false
			result = &metav1.Status{
				Reason: "this image is not latest",
			}
			break
		}
	}

	return &v1beta1.AdmissionResponse{
		Allowed: allowed,
		Result:  result,
	}
}

// Serve method for webhook server
// 核心程序,固定写法
func Serve(w http.ResponseWriter, r *http.Request) {
	var body []byte
	if r.Body != nil {
		if data, err := ioutil.ReadAll(r.Body); err == nil {
			body = data
		}
	}
	if len(body) == 0 {
		klog.Error("empty body")
		http.Error(w, "empty body", http.StatusBadRequest)
		return
	}

	// verify the content type is accurate
	contentType := r.Header.Get("Content-Type")
	if contentType != "application/json" {
		klog.Errorf("Content-Type=%s, expect application/json", contentType)
		http.Error(w, "invalid Content-Type, expect `application/json`", http.StatusUnsupportedMediaType)
		return
	}

	var admissionResponse *v1beta1.AdmissionResponse
	ar := v1beta1.AdmissionReview{}
	if _, _, err := universalDeserializer.Decode(body, nil, &ar); err != nil {
		klog.Errorf("Can't decode body: %v", err)
		admissionResponse = &v1beta1.AdmissionResponse{
			Result: &metav1.Status{
				Message: err.Error(),
			},
		}
	} else {
		if r.URL.Path == "/mutate" {
			admissionResponse = mutate(&ar)
		} else if r.URL.Path == "/validate" {
			admissionResponse = validate(&ar)
		}
	}

	admissionReview := v1beta1.AdmissionReview{}
	if admissionResponse != nil {
		admissionReview.Response = admissionResponse
		if ar.Request != nil {
			admissionReview.Response.UID = ar.Request.UID
		}
	}

	resp, err := json.Marshal(admissionReview)
	if err != nil {
		klog.Errorf("Can't encode response: %v", err)
		http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)
	}
	klog.Infof("Ready to write reponse ...")
	if _, err := w.Write(resp); err != nil {
		klog.Errorf("Can't write response: %v", err)
		http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
	}
}

代码写完后还需要有 TLS 证书以及准入控制器的资源,需要现有证书然后创建 secret 给服务使用,通过 deployment 部署此服务,下面是我整理的创建证书、secret、deployment 的脚本

# cat generate-keys.sh

#!/usr/bin/env bash

: ${1?'missing key directory'}

key_dir="$1"

chmod 0700 "$key_dir"
cd "$key_dir"

# Generate the CA cert and private key
openssl req -nodes -new -x509 -keyout ca.key -out ca.crt -subj "/CN= Admission Controller Webhook CA"
# Generate the private key for the webhook server
openssl genrsa -out webhook-server-tls.key 2048
# Generate a Certificate Signing Request (CSR) for the private key, and sign it with the private key of the CA.
openssl req -new -key webhook-server-tls.key -subj "/CN=image-admission-webhook.kube-system.svc" \
    | openssl x509 -req -CA ca.crt -CAkey ca.key -CAcreateserial \
     -extfile <(printf "subjectAltName=DNS:image-admission-webhook,DNS:image-admission-webhook.kube-system,DNS:image-admission-webhook.kube-system.svc") \
     -out webhook-server-tls.crt

创建部署准入控制器的资源

# cat image-admission-webhook.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: image-admission-webhook
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: image-admission-webhook
  template:
    metadata:
      labels:
        app: image-admission-webhook
    spec:
      containers:
        - name: image-admission-webhook
          image: image:v0.0.1
          command:
            - image-admission-webhook
            - --cert-file=/run/secrets/tls/tls.crt
            - --key-file=/run/secrets/tls/tls.key
          imagePullPolicy: IfNotPresent
          volumeMounts:
            - name: webhook-tls-certs
              mountPath: /run/secrets/tls
              readOnly: true
      volumes:
        - name: webhook-tls-certs
          secret:
            secretName: webhook-server-tls
---
apiVersion: v1
kind: Service
metadata:
  name: image-admission-webhook
  namespace: kube-system
spec:
  selector:
    app: image-admission-webhook
  ports:
    - port: 443
      targetPort: 8443
---
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
  name: image-webhook-mutate
webhooks:
  - name: image-admission-webhook.kube-system.svc
    clientConfig:
      service:
        name: image-admission-webhook
        namespace: kube-system
        path: "/mutate"
      caBundle: ${CA_PEM_B64}
    rules:
      - operations: [ "CREATE","UPDATE" ]
        apiGroups: ["apps"]
        apiVersions: ["v1"]
        resources: ["deployments"]
    sideEffects: None
    admissionReviewVersions: [ "v1beta1" ]
    timeoutSeconds: 5
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
  name: image-webhook-validata
webhooks:
  - name: image-admission-webhook.kube-system.svc
    clientConfig:
      service:
        name: image-admission-webhook
        namespace: kube-system
        path: "/validate"
      caBundle: ${CA_PEM_B64}
    rules:
      - operations: [ "CREATE","UPDATE" ]
        apiGroups: ["apps"]
        apiVersions: ["v1"]
        resources: ["deployments"]
    sideEffects: None
    admissionReviewVersions: [ "v1beta1" ]
    timeoutSeconds: 5

创建一键部署的脚本

# cat deploy.sh
#!/usr/bin/env bash

set -euo pipefail

keydir="$(mktemp -d)"

# Generate keys into a temporary directory.
echo "Generating TLS keys ..."
"./generate-keys.sh" "$keydir"

# Create the TLS secret for the generated keys.
kubectl -n kube-system create secret tls webhook-server-tls \
    --cert "${keydir}/webhook-server-tls.crt" \
    --key "${keydir}/webhook-server-tls.key" \
    --dry-run=client -o yaml | kubectl apply -f -

# Read the PEM-encoded CA certificate, base64 encode it, and replace the `${CA_PEM_B64}` placeholder in the YAML
# template with it. Then, create the Kubernetes resources.
ca_pem_b64="$(openssl base64 -A <"${keydir}/ca.crt")"
sed -e 's@${CA_PEM_B64}@'"$ca_pem_b64"'@g' <"image-admission-webhook.yaml" | kubectl apply -f -

# Delete the key directory to prevent abuse (DO NOT USE THESE KEYS ANYWHERE ELSE).
rm -rf "$keydir"

echo "The webhook server has been deployed and configured!"

现在有三个文件,分别为 generate-keys.sh、deploy.sh、 image-admission-webhook.yaml,把他们放到同一个目录下面执行 deploy.sh 命令,在执行之前,需要用代码构建一个为 image:v0.0.1 的镜像

测试

部署完成项目后,部署一个 deployment 服务查看效果

apiVersion: apps/v1
kind: Deployment
metadata:
  name: test-nginx
  annotations:
    changeImage: "yes"
spec:
  selector:
    matchLabels:
      apps: test-nginx
  template:
    metadata:
      labels:
        apps: test-nginx
    spec:
      containers:
      - name: test-nginx
        image: nginx:latest

部署服务发现部署完成,查看服务,发现镜像版本已经被修改了。

[root@kube01 hpa]# kubectl get pod -l test-nginx -o yaml
apiVersion: v1
items: []
kind: List
metadata:
  resourceVersion: ""
  selfLink: ""
[root@kube01 hpa]# kubectl apply -f deployment.yaml ^C
[root@kube01 hpa]# kubectl get pod -l apps: test-nginx -o yaml
error: name cannot be provided when a selector is specified
[root@kube01 hpa]# kubectl get pod -l apps=test-nginx -o yaml
apiVersion: v1
items:
- apiVersion: v1
  kind: Pod
  metadata:
    creationTimestamp: "2023-06-15T08:03:25Z"
    generateName: test-nginx-6c6fb7db4b-
    labels:
      apps: test-nginx
      pod-template-hash: 6c6fb7db4b
    name: test-nginx-6c6fb7db4b-nkkht
    namespace: default
    ownerReferences:
    - apiVersion: apps/v1
      blockOwnerDeletion: true
      controller: true
      kind: ReplicaSet
      name: test-nginx-6c6fb7db4b
      uid: 8354a36d-3acd-4ca7-897b-eaf429aaedaa
    resourceVersion: "1581971"
    uid: 01367ac9-5457-486f-bdb7-da55e11eebc4
  spec:
    containers:
    - image: nginx:1.7.9
      imagePullPolicy: Always