准入控制器
什么是准入控制器?就是 API 请求进来后,准许它进入或者丰富进来的 API 请求的控制器。
如图所示,MutatingAdmissionWebhook 和 ValidatingAdmissionWebhook 是由人为设置的。
Mutating 是人为丰富 API 接口数据,比如现在流行的 sidecar 边车服务,就是通过在准入控制器阶段,读到了 API 接口请求,然后再往请求里添加了自己的服务然后再持久化
Validating 是校验 API 接口数据,它会检查 API 中有没有特定的值或者检查格式对不对
在 Kubernetes apiserver 中包含两个特殊的准入控制器:MutatingAdmissionWebhook 和 ValidatingAdmissionWebhook,这两个控制器将发送准入请求到外部的 HTTP 回调服务并接收一个准入响应。如果启用了这两个准入控制器,Kubernetes 管理员可以在集群中创建和配置一个 admission webhook。
创建配置一个 Admission Webhook
通过 kubeadm 部署的 kubernetes 默认开启了准入控制器,这个时候我们只需要写一个 http 服务,用来处理转发而来的请求就可以了。那么为什么 api 请求会转发给 http 服务呢?需要编写 admission 的编排文件,定义好哪些请求需要转发到 http 服务。
编写一个 Admission 服务
现在拟定一个简单的功能需求:对 deployment 服务做准入控制,如果 deployment 有注解为 changeImage: "yes",那么我就将它的镜像改为 nginx:1.7.9,如果注解为 checkImage: "yes",那么判断它的镜像版本是不是latest,不是则不通过。
定义一个 main.go 作为程序入口
package main
import (
"flag"
"go-learning/ch37/pkg"
"k8s.io/klog"
"net/http"
)
const (
HealthzHost = "127.0.0.1"
HealthzPort = "10259"
CertFile = "/run/secrets/tls/tls.crt"
KeyFile = "/run/secrets/tls/tls.key"
)
func main() {
klog.InitFlags(nil)
// 接收参数
flag.Parse()
// HTTP 服务
// 重点在于 pkg.Serve
mux := http.NewServeMux()
mux.HandleFunc("/mutate", pkg.Serve)
mux.HandleFunc("/validate", pkg.Serve)
server := &http.Server{
Addr: ":8443",
Handler: mux,
}
// Start Heathz Check
go pkg.StartHealthzServer(healthzHost, healthzPort)
klog.Fatal(server.ListenAndServeTLS(CertFile, KeyFile))
}
var (
healthzHost string // The host of Healthz
healthzPort string // The port of Healthz to listen on
certFile string // path to the x509 certificate for https
keyFile string // path to the x509 private key matching `CertFile`
)
func init() {
flag.StringVar(&healthzHost, "healthz-host", HealthzHost, "The host of Healthz.")
flag.StringVar(&healthzPort, "healthz-port", HealthzPort, "The port of Healthz to listen on.")
flag.StringVar(&certFile, "cert-file", CertFile, "File containing the x509 Certificate for HTTPS.")
flag.StringVar(&keyFile, "key-file", KeyFile, "File containing the x509 private key to --tlsCertFile.")
}
创建一个 pkg 目录,定义一个 helper.go,用作健康检查服务
package pkg
import (
"k8s.io/klog"
"net/http"
)
// 启动健康检测
func StartHealthzServer(healthzHost string, healthzPort string) {
http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
w.Write([]byte("ok"))
})
klog.Infof("Starting Healthz Server...")
klog.Fatal(http.ListenAndServe(healthzHost+":"+healthzPort, nil))
}
创建一个 pkg 目录,定义一个 webhook.go,用作核心程序
package pkg
import (
"encoding/json"
"fmt"
"io/ioutil"
v1 "k8s.io/api/apps/v1"
"net/http"
"strings"
"k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/klog"
)
var (
universalDeserializer = serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer()
)
// 定义注解名,它的 key 是 yes、no
const (
admissionWebhookAnnotationValidateKey = "checkImage"
admissionWebhookAnnotationMutateKey = "changeImage"
)
// 定义那些命名空间不走准入控制器
var (
ignoredNamespaces = []string{
metav1.NamespaceSystem,
metav1.NamespacePublic,
}
)
// 定义 api 路由格式
type patchOperation struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}
// 创建路由
func createPatch(deploymentSpec *v1.DeploymentSpec) ([]byte, error) {
var patches []patchOperation
containers := deploymentSpec.Template.Spec.Containers
for i := range containers {
path := fmt.Sprintf("/spec/template/spec/containers/%d/image", i)
op := patchOperation{
Op: "replace",
Path: path,
Value: "nginx:1.7.9",
}
patches = append(patches, op)
}
return json.Marshal(patches)
}
// 检查注解的值是不是 yes 或 no
func admissionRequired(ignoredList []string, admissionAnnotationKey string, metadata *metav1.ObjectMeta) bool {
// skip special kubernetes system namespaces
for _, namespace := range ignoredList {
if metadata.Namespace == namespace {
klog.Infof("Skip validation for %v for it's in special namespace:%v", metadata.Name, metadata.Namespace)
return false
}
}
annotations := metadata.GetAnnotations()
switch strings.ToLower(annotations[admissionAnnotationKey]) {
case "n", "no", "false", "off":
return false
case "y", "yes", "true", "on":
return true
}
return false
}
// 检查忽略哪些命名空间
func mutationRequired(ignoredList []string, metadata *metav1.ObjectMeta) bool {
required := admissionRequired(ignoredList, admissionWebhookAnnotationMutateKey, metadata)
klog.Infof("Mutation policy for %v/%v: required:%v", metadata.Namespace, metadata.Name, required)
return required
}
// 检查忽略哪些命名空间
func validationRequired(ignoredList []string, metadata *metav1.ObjectMeta) bool {
required := admissionRequired(ignoredList, admissionWebhookAnnotationValidateKey, metadata)
klog.Infof("Validation policy for %v/%v: required:%v", metadata.Namespace, metadata.Name, required)
return required
}
// main mutation process
// 准入函数
func mutate(ar *v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
req := ar.Request
var (
deploymentSpec *v1.DeploymentSpec
objectMeta *metav1.ObjectMeta
resourceNamespace, resourceName string
)
klog.Infof("AdmissionReview for Kind=%v, Namespace=%v Name=%v (%v) UID=%v patchOperation=%v UserInfo=%v",
req.Kind, req.Namespace, req.Name, req.UID, req.Operation, req.UserInfo)
// 判断是不是 deployment,不是则忽略准入修改
switch req.Kind.Kind {
case "Deployment":
var dp v1.Deployment
if err := json.Unmarshal(req.Object.Raw, &dp); err != nil {
klog.Errorf("Could not unmarshal raw object: %v", err)
return &v1beta1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}
resourceName, resourceNamespace = dp.Name, dp.Namespace
objectMeta, deploymentSpec = &dp.ObjectMeta, &dp.Spec
}
if !mutationRequired(ignoredNamespaces, objectMeta) {
klog.Infof("Skipping validation for %s/%s due to policy check", resourceNamespace, resourceName)
return &v1beta1.AdmissionResponse{
Allowed: true,
}
}
patchBytes, err := createPatch(deploymentSpec)
if err != nil {
return &v1beta1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}
klog.Infof("AdmissionResponse: patch=%v\n", string(patchBytes))
// 返回了修改后的 api
return &v1beta1.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
PatchType: func() *v1beta1.PatchType {
pt := v1beta1.PatchTypeJSONPatch
return &pt
}(),
}
}
// 准入校验函数
// validate deployments and services
func validate(ar *v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
req := ar.Request
var (
deploymentSpec *v1.DeploymentSpec
objectMeta *metav1.ObjectMeta
resourceNamespace, resourceName string
)
klog.Infof("AdmissionReview for Kind=%v, Namespace=%v Name=%v (%v) UID=%v patchOperation=%v UserInfo=%v",
req.Kind, req.Namespace, req.Name, req.UID, req.Operation, req.UserInfo)
switch req.Kind.Kind {
case "Deployment":
var dp v1.Deployment
if err := json.Unmarshal(req.Object.Raw, &dp); err != nil {
klog.Errorf("Could not unmarshal raw object: %v", err)
return &v1beta1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}
resourceName, resourceNamespace = dp.Name, dp.Namespace
objectMeta, deploymentSpec = &dp.ObjectMeta, &dp.Spec
}
if !validationRequired(ignoredNamespaces, objectMeta) {
klog.Infof("Skipping validation for %s/%s due to policy check", resourceNamespace, resourceName)
return &v1beta1.AdmissionResponse{
Allowed: true,
}
}
allowed := true
var result *metav1.Status
for _, v := range deploymentSpec.Template.Spec.Containers {
if !(v.Image == "nginx:latest") {
allowed = false
result = &metav1.Status{
Reason: "this image is not latest",
}
break
}
}
return &v1beta1.AdmissionResponse{
Allowed: allowed,
Result: result,
}
}
// Serve method for webhook server
// 核心程序,固定写法
func Serve(w http.ResponseWriter, r *http.Request) {
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}
if len(body) == 0 {
klog.Error("empty body")
http.Error(w, "empty body", http.StatusBadRequest)
return
}
// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
klog.Errorf("Content-Type=%s, expect application/json", contentType)
http.Error(w, "invalid Content-Type, expect `application/json`", http.StatusUnsupportedMediaType)
return
}
var admissionResponse *v1beta1.AdmissionResponse
ar := v1beta1.AdmissionReview{}
if _, _, err := universalDeserializer.Decode(body, nil, &ar); err != nil {
klog.Errorf("Can't decode body: %v", err)
admissionResponse = &v1beta1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
} else {
if r.URL.Path == "/mutate" {
admissionResponse = mutate(&ar)
} else if r.URL.Path == "/validate" {
admissionResponse = validate(&ar)
}
}
admissionReview := v1beta1.AdmissionReview{}
if admissionResponse != nil {
admissionReview.Response = admissionResponse
if ar.Request != nil {
admissionReview.Response.UID = ar.Request.UID
}
}
resp, err := json.Marshal(admissionReview)
if err != nil {
klog.Errorf("Can't encode response: %v", err)
http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)
}
klog.Infof("Ready to write reponse ...")
if _, err := w.Write(resp); err != nil {
klog.Errorf("Can't write response: %v", err)
http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
}
}
代码写完后还需要有 TLS 证书以及准入控制器的资源,需要现有证书然后创建 secret 给服务使用,通过 deployment 部署此服务,下面是我整理的创建证书、secret、deployment 的脚本
# cat generate-keys.sh
#!/usr/bin/env bash
: ${1?'missing key directory'}
key_dir="$1"
chmod 0700 "$key_dir"
cd "$key_dir"
# Generate the CA cert and private key
openssl req -nodes -new -x509 -keyout ca.key -out ca.crt -subj "/CN= Admission Controller Webhook CA"
# Generate the private key for the webhook server
openssl genrsa -out webhook-server-tls.key 2048
# Generate a Certificate Signing Request (CSR) for the private key, and sign it with the private key of the CA.
openssl req -new -key webhook-server-tls.key -subj "/CN=image-admission-webhook.kube-system.svc" \
| openssl x509 -req -CA ca.crt -CAkey ca.key -CAcreateserial \
-extfile <(printf "subjectAltName=DNS:image-admission-webhook,DNS:image-admission-webhook.kube-system,DNS:image-admission-webhook.kube-system.svc") \
-out webhook-server-tls.crt
创建部署准入控制器的资源
# cat image-admission-webhook.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: image-admission-webhook
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: image-admission-webhook
template:
metadata:
labels:
app: image-admission-webhook
spec:
containers:
- name: image-admission-webhook
image: image:v0.0.1
command:
- image-admission-webhook
- --cert-file=/run/secrets/tls/tls.crt
- --key-file=/run/secrets/tls/tls.key
imagePullPolicy: IfNotPresent
volumeMounts:
- name: webhook-tls-certs
mountPath: /run/secrets/tls
readOnly: true
volumes:
- name: webhook-tls-certs
secret:
secretName: webhook-server-tls
---
apiVersion: v1
kind: Service
metadata:
name: image-admission-webhook
namespace: kube-system
spec:
selector:
app: image-admission-webhook
ports:
- port: 443
targetPort: 8443
---
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: image-webhook-mutate
webhooks:
- name: image-admission-webhook.kube-system.svc
clientConfig:
service:
name: image-admission-webhook
namespace: kube-system
path: "/mutate"
caBundle: ${CA_PEM_B64}
rules:
- operations: [ "CREATE","UPDATE" ]
apiGroups: ["apps"]
apiVersions: ["v1"]
resources: ["deployments"]
sideEffects: None
admissionReviewVersions: [ "v1beta1" ]
timeoutSeconds: 5
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: image-webhook-validata
webhooks:
- name: image-admission-webhook.kube-system.svc
clientConfig:
service:
name: image-admission-webhook
namespace: kube-system
path: "/validate"
caBundle: ${CA_PEM_B64}
rules:
- operations: [ "CREATE","UPDATE" ]
apiGroups: ["apps"]
apiVersions: ["v1"]
resources: ["deployments"]
sideEffects: None
admissionReviewVersions: [ "v1beta1" ]
timeoutSeconds: 5
创建一键部署的脚本
# cat deploy.sh
#!/usr/bin/env bash
set -euo pipefail
keydir="$(mktemp -d)"
# Generate keys into a temporary directory.
echo "Generating TLS keys ..."
"./generate-keys.sh" "$keydir"
# Create the TLS secret for the generated keys.
kubectl -n kube-system create secret tls webhook-server-tls \
--cert "${keydir}/webhook-server-tls.crt" \
--key "${keydir}/webhook-server-tls.key" \
--dry-run=client -o yaml | kubectl apply -f -
# Read the PEM-encoded CA certificate, base64 encode it, and replace the `${CA_PEM_B64}` placeholder in the YAML
# template with it. Then, create the Kubernetes resources.
ca_pem_b64="$(openssl base64 -A <"${keydir}/ca.crt")"
sed -e 's@${CA_PEM_B64}@'"$ca_pem_b64"'@g' <"image-admission-webhook.yaml" | kubectl apply -f -
# Delete the key directory to prevent abuse (DO NOT USE THESE KEYS ANYWHERE ELSE).
rm -rf "$keydir"
echo "The webhook server has been deployed and configured!"
现在有三个文件,分别为 generate-keys.sh、deploy.sh、 image-admission-webhook.yaml,把他们放到同一个目录下面执行 deploy.sh 命令,在执行之前,需要用代码构建一个为 image:v0.0.1 的镜像
测试
部署完成项目后,部署一个 deployment 服务查看效果
apiVersion: apps/v1
kind: Deployment
metadata:
name: test-nginx
annotations:
changeImage: "yes"
spec:
selector:
matchLabels:
apps: test-nginx
template:
metadata:
labels:
apps: test-nginx
spec:
containers:
- name: test-nginx
image: nginx:latest
部署服务发现部署完成,查看服务,发现镜像版本已经被修改了。
[root@kube01 hpa]# kubectl get pod -l test-nginx -o yaml
apiVersion: v1
items: []
kind: List
metadata:
resourceVersion: ""
selfLink: ""
[root@kube01 hpa]# kubectl apply -f deployment.yaml ^C
[root@kube01 hpa]# kubectl get pod -l apps: test-nginx -o yaml
error: name cannot be provided when a selector is specified
[root@kube01 hpa]# kubectl get pod -l apps=test-nginx -o yaml
apiVersion: v1
items:
- apiVersion: v1
kind: Pod
metadata:
creationTimestamp: "2023-06-15T08:03:25Z"
generateName: test-nginx-6c6fb7db4b-
labels:
apps: test-nginx
pod-template-hash: 6c6fb7db4b
name: test-nginx-6c6fb7db4b-nkkht
namespace: default
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: ReplicaSet
name: test-nginx-6c6fb7db4b
uid: 8354a36d-3acd-4ca7-897b-eaf429aaedaa
resourceVersion: "1581971"
uid: 01367ac9-5457-486f-bdb7-da55e11eebc4
spec:
containers:
- image: nginx:1.7.9
imagePullPolicy: Always