Go Client 深度实战
Go Client 深度实战
client-go 介绍
client-go 是 Kubernetes 官方的 Go 语言客户端库,提供了与 Kubernetes API Server 交互的完整功能。
项目结构
k8s.io/client-go/
├── kubernetes/ # Clientset(类型化客户端)
├── dynamic/ # Dynamic Client(动态客户端)
├── rest/ # REST Client(低级 HTTP 客户端)
├── tools/clientcmd/ # kubeconfig 解析
├── tools/cache/ # Informer 框架
├── util/workqueue/ # 工作队列
└── informers/ # Shared Informer Factory
环境准备
安装依赖
# 创建项目
mkdir k8s-client-demo
cd k8s-client-demo
go mod init k8s-client-demo
# 安装 client-go
go get k8s.io/client-go@latest
go get k8s.io/apimachinery@latest
go get k8s.io/api@latest
版本对应关系
| Kubernetes | client-go |
|---|---|
| 1.28 | v0.28.x |
| 1.27 | v0.27.x |
| 1.26 | v0.26.x |
| 1.25 | v0.25.x |
初始化客户端
方式 1:从 kubeconfig 初始化(集群外)
package main
import (
"flag"
"fmt"
"path/filepath"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
var kubeconfig *string
// 默认使用 ~/.kube/config
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig",
filepath.Join(home, ".kube", "config"),
"(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "",
"absolute path to the kubeconfig file")
}
flag.Parse()
// 从 kubeconfig 构建配置
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
}
// 创建 Clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
fmt.Println("Connected to Kubernetes cluster!")
// 验证连接
version, err := clientset.Discovery().ServerVersion()
if err != nil {
panic(err.Error())
}
fmt.Printf("Kubernetes version: %s\n", version.String())
}
方式 2:从集群内初始化(Pod 内)
package main
import (
"fmt"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func main() {
// 从 Pod 内的 ServiceAccount 自动获取配置
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
// 创建 Clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
fmt.Println("Running inside Kubernetes cluster!")
version, err := clientset.Discovery().ServerVersion()
if err != nil {
panic(err.Error())
}
fmt.Printf("Kubernetes version: %s\n", version.String())
}
方式 3:自适应初始化(推荐)
package main
import (
"flag"
"os"
"path/filepath"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
// GetKubernetesClient 自适应获取 Kubernetes 客户端
func GetKubernetesClient() (*kubernetes.Clientset, error) {
var config *rest.Config
var err error
// 尝试从集群内配置
config, err = rest.InClusterConfig()
if err != nil {
// 如果失败,尝试从 kubeconfig
var kubeconfig string
if envKubeconfig := os.Getenv("KUBECONFIG"); envKubeconfig != "" {
kubeconfig = envKubeconfig
} else if home := homedir.HomeDir(); home != "" {
kubeconfig = filepath.Join(home, ".kube", "config")
}
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
}
return kubernetes.NewForConfig(config)
}
func main() {
clientset, err := GetKubernetesClient()
if err != nil {
panic(err.Error())
}
version, _ := clientset.Discovery().ServerVersion()
fmt.Printf("Connected! Kubernetes version: %s\n", version.String())
}
Pod 操作
列出 Pods
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
func ListPods(clientset *kubernetes.Clientset, namespace string) error {
// 获取 Pod 列表
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
}
fmt.Printf("Found %d pods in namespace %s:\n", len(pods.Items), namespace)
for _, pod := range pods.Items {
fmt.Printf("- %s (Phase: %s, IP: %s)\n",
pod.Name,
pod.Status.Phase,
pod.Status.PodIP,
)
// 列出容器
for _, container := range pod.Spec.Containers {
fmt.Printf(" Container: %s (Image: %s)\n",
container.Name,
container.Image,
)
}
}
return nil
}
// 使用 Label Selector 过滤
func ListPodsWithSelector(clientset *kubernetes.Clientset, namespace string, selector string) error {
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: selector, // 例如:app=nginx,env=production
})
if err != nil {
return err
}
fmt.Printf("Found %d pods matching selector '%s':\n", len(pods.Items), selector)
for _, pod := range pods.Items {
fmt.Printf("- %s\n", pod.Name)
}
return nil
}
// 使用 Field Selector 过滤
func ListRunningPods(clientset *kubernetes.Clientset, namespace string) error {
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
FieldSelector: "status.phase=Running",
})
if err != nil {
return err
}
fmt.Printf("Found %d running pods:\n", len(pods.Items))
for _, pod := range pods.Items {
fmt.Printf("- %s\n", pod.Name)
}
return nil
}
获取单个 Pod
func GetPod(clientset *kubernetes.Clientset, namespace, name string) error {
pod, err := clientset.CoreV1().Pods(namespace).Get(
context.TODO(),
name,
metav1.GetOptions{},
)
if err != nil {
return err
}
fmt.Printf("Pod Name: %s\n", pod.Name)
fmt.Printf("Namespace: %s\n", pod.Namespace)
fmt.Printf("Status: %s\n", pod.Status.Phase)
fmt.Printf("Node: %s\n", pod.Spec.NodeName)
fmt.Printf("IP: %s\n", pod.Status.PodIP)
fmt.Printf("Created: %s\n", pod.CreationTimestamp)
// Labels
fmt.Println("Labels:")
for k, v := range pod.Labels {
fmt.Printf(" %s: %s\n", k, v)
}
// Containers
fmt.Println("Containers:")
for _, container := range pod.Spec.Containers {
fmt.Printf(" Name: %s\n", container.Name)
fmt.Printf(" Image: %s\n", container.Image)
fmt.Printf(" Ports: %v\n", container.Ports)
}
// Conditions
fmt.Println("Conditions:")
for _, condition := range pod.Status.Conditions {
fmt.Printf(" %s: %s (Reason: %s)\n",
condition.Type,
condition.Status,
condition.Reason,
)
}
return nil
}
创建 Pod
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func CreatePod(clientset *kubernetes.Clientset, namespace string) error {
// 定义 Pod
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-pod",
Namespace: namespace,
Labels: map[string]string{
"app": "nginx",
"env": "test",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:1.21",
Ports: []corev1.ContainerPort{
{
Name: "http",
ContainerPort: 80,
Protocol: corev1.ProtocolTCP,
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("128Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("200m"),
corev1.ResourceMemory: resource.MustParse("256Mi"),
},
},
Env: []corev1.EnvVar{
{
Name: "ENV",
Value: "production",
},
},
},
},
RestartPolicy: corev1.RestartPolicyAlways,
},
}
// 创建 Pod
result, err := clientset.CoreV1().Pods(namespace).Create(
context.TODO(),
pod,
metav1.CreateOptions{},
)
if err != nil {
return err
}
fmt.Printf("Pod created: %s\n", result.Name)
return nil
}
更新 Pod
func UpdatePod(clientset *kubernetes.Clientset, namespace, name string) error {
// 1. 获取现有 Pod
pod, err := clientset.CoreV1().Pods(namespace).Get(
context.TODO(),
name,
metav1.GetOptions{},
)
if err != nil {
return err
}
// 2. 修改 Pod(只能修改部分字段,如 labels、annotations)
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels["updated"] = "true"
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations["updated-at"] = time.Now().Format(time.RFC3339)
// 3. 更新 Pod
_, err = clientset.CoreV1().Pods(namespace).Update(
context.TODO(),
pod,
metav1.UpdateOptions{},
)
if err != nil {
return err
}
fmt.Printf("Pod updated: %s\n", name)
return nil
}
删除 Pod
func DeletePod(clientset *kubernetes.Clientset, namespace, name string) error {
// 删除策略
deletePolicy := metav1.DeletePropagationForeground
gracePeriodSeconds := int64(30)
err := clientset.CoreV1().Pods(namespace).Delete(
context.TODO(),
name,
metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
PropagationPolicy: &deletePolicy,
},
)
if err != nil {
return err
}
fmt.Printf("Pod deleted: %s\n", name)
return nil
}
// 批量删除
func DeletePodsWithSelector(clientset *kubernetes.Clientset, namespace, selector string) error {
err := clientset.CoreV1().Pods(namespace).DeleteCollection(
context.TODO(),
metav1.DeleteOptions{},
metav1.ListOptions{
LabelSelector: selector,
},
)
if err != nil {
return err
}
fmt.Printf("Deleted pods matching selector: %s\n", selector)
return nil
}
获取 Pod 日志
import (
"io"
)
func GetPodLogs(clientset *kubernetes.Clientset, namespace, podName, containerName string) error {
// 日志选项
logOptions := &corev1.PodLogOptions{
Container: containerName,
Follow: false, // 是否持续跟踪
Timestamps: true, // 是否显示时间戳
TailLines: int64Ptr(100), // 最后 100 行
}
// 获取日志流
req := clientset.CoreV1().Pods(namespace).GetLogs(podName, logOptions)
podLogs, err := req.Stream(context.TODO())
if err != nil {
return err
}
defer podLogs.Close()
// 读取日志
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return err
}
fmt.Println("Pod Logs:")
fmt.Println(buf.String())
return nil
}
// 实时跟踪日志
func FollowPodLogs(clientset *kubernetes.Clientset, namespace, podName, containerName string) error {
logOptions := &corev1.PodLogOptions{
Container: containerName,
Follow: true, // 实时跟踪
Timestamps: true,
}
req := clientset.CoreV1().Pods(namespace).GetLogs(podName, logOptions)
stream, err := req.Stream(context.TODO())
if err != nil {
return err
}
defer stream.Close()
// 持续读取日志
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
return scanner.Err()
}
func int64Ptr(i int64) *int64 {
return &i
}
Pod Exec(执行命令)
import (
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"
)
func ExecInPod(clientset *kubernetes.Clientset, config *rest.Config,
namespace, podName, containerName string, command []string) error {
// 创建 Exec 请求
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec")
// 设置参数
req.VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: command,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
// 执行命令
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return err
}
var stdout, stderr bytes.Buffer
err = exec.Stream(remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
})
if err != nil {
return err
}
fmt.Println("STDOUT:")
fmt.Println(stdout.String())
if stderr.Len() > 0 {
fmt.Println("STDERR:")
fmt.Println(stderr.String())
}
return nil
}
// 使用示例
func main() {
clientset, config := GetKubernetesClient()
// 执行 ls 命令
err := ExecInPod(clientset, config, "default", "nginx-pod", "nginx",
[]string{"ls", "-la", "/etc/nginx"})
if err != nil {
panic(err)
}
}
Deployment 操作
创建 Deployment
import (
appsv1 "k8s.io/api/apps/v1"
)
func CreateDeployment(clientset *kubernetes.Clientset, namespace string) error {
replicas := int32(3)
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-deployment",
Namespace: namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "nginx",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "nginx",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:1.21",
Ports: []corev1.ContainerPort{
{
ContainerPort: 80,
},
},
},
},
},
},
},
}
result, err := clientset.AppsV1().Deployments(namespace).Create(
context.TODO(),
deployment,
metav1.CreateOptions{},
)
if err != nil {
return err
}
fmt.Printf("Deployment created: %s\n", result.Name)
return nil
}
更新 Deployment(滚动更新)
func UpdateDeploymentImage(clientset *kubernetes.Clientset, namespace, name, newImage string) error {
// 1. 获取现有 Deployment
deployment, err := clientset.AppsV1().Deployments(namespace).Get(
context.TODO(),
name,
metav1.GetOptions{},
)
if err != nil {
return err
}
// 2. 更新镜像
deployment.Spec.Template.Spec.Containers[0].Image = newImage
// 3. 应用更新
_, err = clientset.AppsV1().Deployments(namespace).Update(
context.TODO(),
deployment,
metav1.UpdateOptions{},
)
if err != nil {
return err
}
fmt.Printf("Deployment updated with new image: %s\n", newImage)
return nil
}
扩缩容 Deployment
func ScaleDeployment(clientset *kubernetes.Clientset, namespace, name string, replicas int32) error {
// 方式 1:使用 Scale 子资源(推荐)
scale, err := clientset.AppsV1().Deployments(namespace).GetScale(
context.TODO(),
name,
metav1.GetOptions{},
)
if err != nil {
return err
}
scale.Spec.Replicas = replicas
_, err = clientset.AppsV1().Deployments(namespace).UpdateScale(
context.TODO(),
name,
scale,
metav1.UpdateOptions{},
)
if err != nil {
return err
}
fmt.Printf("Deployment scaled to %d replicas\n", replicas)
return nil
}
查看 Deployment 状态
func GetDeploymentStatus(clientset *kubernetes.Clientset, namespace, name string) error {
deployment, err := clientset.AppsV1().Deployments(namespace).Get(
context.TODO(),
name,
metav1.GetOptions{},
)
if err != nil {
return err
}
fmt.Printf("Deployment: %s\n", deployment.Name)
fmt.Printf("Desired Replicas: %d\n", *deployment.Spec.Replicas)
fmt.Printf("Current Replicas: %d\n", deployment.Status.Replicas)
fmt.Printf("Ready Replicas: %d\n", deployment.Status.ReadyReplicas)
fmt.Printf("Available Replicas: %d\n", deployment.Status.AvailableReplicas)
fmt.Printf("Updated Replicas: %d\n", deployment.Status.UpdatedReplicas)
// Conditions
fmt.Println("Conditions:")
for _, condition := range deployment.Status.Conditions {
fmt.Printf(" Type: %s, Status: %s, Reason: %s\n",
condition.Type,
condition.Status,
condition.Reason,
)
}
return nil
}
Deployment 回滚
import (
"k8s.io/apimachinery/pkg/util/intstr"
)
func RollbackDeployment(clientset *kubernetes.Clientset, namespace, name string, revision int64) error {
// 创建 Rollback 对象
rollback := &appsv1.DeploymentRollback{
Name: name,
RollbackTo: appsv1.RollbackConfig{
Revision: revision, // 0 表示回滚到上一个版本
},
}
// 执行回滚(通过 RESTClient)
err := clientset.AppsV1().RESTClient().Post().
Namespace(namespace).
Resource("deployments").
Name(name).
SubResource("rollback").
Body(rollback).
Do(context.TODO()).
Error()
if err != nil {
return err
}
fmt.Printf("Deployment rolled back to revision %d\n", revision)
return nil
}
Service 操作
创建 Service
func CreateService(clientset *kubernetes.Clientset, namespace string) error {
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-service",
Namespace: namespace,
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "nginx",
},
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{
Name: "http",
Protocol: corev1.ProtocolTCP,
Port: 80,
TargetPort: intstr.FromInt(80),
},
},
},
}
result, err := clientset.CoreV1().Services(namespace).Create(
context.TODO(),
service,
metav1.CreateOptions{},
)
if err != nil {
return err
}
fmt.Printf("Service created: %s (ClusterIP: %s)\n", result.Name, result.Spec.ClusterIP)
return nil
}
// 创建 NodePort Service
func CreateNodePortService(clientset *kubernetes.Clientset, namespace string) error {
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-nodeport",
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "nginx",
},
Type: corev1.ServiceTypeNodePort,
Ports: []corev1.ServicePort{
{
Port: 80,
TargetPort: intstr.FromInt(80),
NodePort: 30080, // 指定 NodePort(可选)
},
},
},
}
result, err := clientset.CoreV1().Services(namespace).Create(
context.TODO(),
service,
metav1.CreateOptions{},
)
if err != nil {
return err
}
fmt.Printf("NodePort Service created: %s (NodePort: %d)\n",
result.Name,
result.Spec.Ports[0].NodePort,
)
return nil
}
ConfigMap & Secret 操作
ConfigMap
func CreateConfigMap(clientset *kubernetes.Clientset, namespace string) error {
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "app-config",
Namespace: namespace,
},
Data: map[string]string{
"app.properties": "key1=value1\nkey2=value2",
"config.json": `{"setting": "value"}`,
},
BinaryData: map[string][]byte{
"binary-file": []byte{0x48, 0x65, 0x6c, 0x6c, 0x6f}, // "Hello"
},
}
result, err := clientset.CoreV1().ConfigMaps(namespace).Create(
context.TODO(),
configMap,
metav1.CreateOptions{},
)
if err != nil {
return err
}
fmt.Printf("ConfigMap created: %s\n", result.Name)
return nil
}
Secret
func CreateSecret(clientset *kubernetes.Clientset, namespace string) error {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "app-secret",
Namespace: namespace,
},
Type: corev1.SecretTypeOpaque,
StringData: map[string]string{
"username": "admin",
"password": "secret123",
},
// 或使用 Data(base64 编码)
// Data: map[string][]byte{
// "username": []byte(base64.StdEncoding.EncodeToString([]byte("admin"))),
// },
}
result, err := clientset.CoreV1().Secrets(namespace).Create(
context.TODO(),
secret,
metav1.CreateOptions{},
)
if err != nil {
return err
}
fmt.Printf("Secret created: %s\n", result.Name)
return nil
}
错误处理
判断错误类型
import (
"k8s.io/apimachinery/pkg/api/errors"
)
func HandleError(err error) {
if err == nil {
return
}
// 判断是否是 Kubernetes API 错误
if statusError, isStatus := err.(*errors.StatusError); isStatus {
fmt.Printf("Error from server: %s\n", statusError.ErrStatus.Message)
switch {
case errors.IsNotFound(err):
fmt.Println("Resource not found")
case errors.IsAlreadyExists(err):
fmt.Println("Resource already exists")
case errors.IsForbidden(err):
fmt.Println("Forbidden: insufficient permissions")
case errors.IsUnauthorized(err):
fmt.Println("Unauthorized: authentication required")
case errors.IsConflict(err):
fmt.Println("Conflict: resource version mismatch")
case errors.IsInvalid(err):
fmt.Println("Invalid: validation failed")
case errors.IsTimeout(err):
fmt.Println("Timeout")
case errors.IsServerTimeout(err):
fmt.Println("Server timeout")
case errors.IsTooManyRequests(err):
fmt.Println("Too many requests: rate limited")
default:
fmt.Printf("Unknown error: %v\n", err)
}
} else {
fmt.Printf("Error: %v\n", err)
}
}
// 使用示例
func GetPodSafe(clientset *kubernetes.Clientset, namespace, name string) (*corev1.Pod, error) {
pod, err := clientset.CoreV1().Pods(namespace).Get(
context.TODO(),
name,
metav1.GetOptions{},
)
if errors.IsNotFound(err) {
fmt.Printf("Pod %s not found in namespace %s\n", name, namespace)
return nil, nil
} else if err != nil {
return nil, err
}
return pod, nil
}
总结
Clientset 核心对象
clientset.CoreV1() // Pod, Service, ConfigMap, Secret, Namespace...
clientset.AppsV1() // Deployment, StatefulSet, DaemonSet, ReplicaSet
clientset.BatchV1() // Job, CronJob
clientset.NetworkingV1() // Ingress, NetworkPolicy
clientset.RbacV1() // Role, RoleBinding, ClusterRole, ClusterRoleBinding
clientset.StorageV1() // StorageClass, VolumeAttachment
最佳实践
✅ 使用 context.Context: 支持超时和取消
✅ 错误处理: 使用 errors.Is*() 判断错误类型
✅ ResourceVersion: 实现乐观锁防止并发冲突
✅ Namespace: 明确指定 namespace(避免使用 "")
✅ Label Selector: 高效过滤资源
✅ Retry: 对临时错误实现重试机制
下一节将深入讲解 Python Client 的使用。