Module 1.3: Building Controllers with client-go
Цей контент ще не доступний вашою мовою.
Complexity:
[COMPLEX]- Full controller implementation from scratchTime to Complete: 5 hours
Prerequisites: Module 1.1 (API Deep Dive), Module 1.2 (CRDs Advanced), intermediate Go programming
What You’ll Be Able to Do
Section titled “What You’ll Be Able to Do”After completing this module, you will be able to:
- Build a complete Kubernetes controller from scratch using client-go Informers, Listers, and Workqueues
- Implement a reconciliation loop that creates, updates, and deletes child resources based on a custom resource spec
- Apply owner references and garbage collection so child resources are cleaned up automatically when the parent is deleted
- Debug controller issues using event recording, structured logging, and workqueue retry metrics
Why This Module Matters
Section titled “Why This Module Matters”A Kubernetes controller is the engine that turns declarative intent into running reality. When you create a Deployment, it is a controller (the Deployment controller) that creates the ReplicaSet. When you create a Service, it is a controller (the Endpoints controller) that populates the Endpoints. Without controllers, Kubernetes is just a database of YAML documents.
In this module you will build a complete controller from scratch using only client-go — no frameworks, no scaffolding, no magic. You will implement every piece yourself: the Informer that watches resources, the Workqueue that buffers events, the reconciliation loop that creates child resources, and the error handling that makes it production-ready. This foundational knowledge is what separates someone who uses operators from someone who builds them.
The Thermostat Analogy
A Kubernetes controller works exactly like a thermostat. You set the desired temperature (spec). The thermostat continuously observes the current temperature (status). If there is a difference, it acts — turning on the heater or the AC. It does not remember what it did last time; it just compares desired vs. actual and takes the minimum action to converge. This is the Observe-Analyze-Act loop, and every controller follows it.
What You’ll Learn
Section titled “What You’ll Learn”By the end of this module, you will be able to:
- Implement the full controller pattern (Observe, Analyze, Act)
- Use SharedIndexInformer with DeltaFIFO
- Build a rate-limited Workqueue with retries
- Write idempotent reconciliation logic
- Handle controller shutdown gracefully
- Implement leader election for HA deployments
- Create child Kubernetes resources (Deployments, Services) from a CRD
Did You Know?
Section titled “Did You Know?”-
The Kubernetes controller-manager runs 37 controllers in a single binary. Each one follows the exact same pattern you will learn here. The Deployment controller, the Job controller, the Namespace controller — all use SharedInformers, Workqueues, and the reconcile loop.
-
“Level-triggered” beats “edge-triggered”: Kubernetes controllers do not react to individual events. They react to the current state. If your controller crashes and misses 50 events, it does not matter — on restart, it sees the current state and reconciles. This is why controllers are so resilient.
-
The average production controller handles 10,000+ resources with a single Informer cache consuming roughly 100MB of memory. The Watch protocol is remarkably efficient — the API Server only sends deltas, and the Informer deduplicates them.
Part 1: The Controller Pattern
Section titled “Part 1: The Controller Pattern”1.1 Observe-Analyze-Act
Section titled “1.1 Observe-Analyze-Act”Every Kubernetes controller follows this three-step loop:
┌─────────────────────────────────────────────────────────────────────┐│ Controller Loop ││ ││ ┌──────────────────────────────────────────────────────────┐ ││ │ OBSERVE │ ││ │ │ ││ │ Informer watches API Server for resource changes │ ││ │ Lister reads current state from local cache │ ││ │ Event handlers enqueue changed object keys │ ││ └────────────────────────┬─────────────────────────────────┘ ││ │ ││ ▼ ││ ┌──────────────────────────────────────────────────────────┐ ││ │ ANALYZE │ ││ │ │ ││ │ Dequeue object key from Workqueue │ ││ │ Read desired state (spec) from cache │ ││ │ Read actual state (owned resources) from cache │ ││ │ Compare desired vs actual — what needs to change? │ ││ └────────────────────────┬─────────────────────────────────┘ ││ │ ││ ▼ ││ ┌──────────────────────────────────────────────────────────┐ ││ │ ACT │ ││ │ │ ││ │ Create / Update / Delete child resources │ ││ │ Update status subresource │ ││ │ Emit Kubernetes Events │ ││ │ Re-enqueue on failure (with backoff) │ ││ └──────────────────────────────────────────────────────────┘ ││ ││ Then back to OBSERVE — the loop never ends ││ │└─────────────────────────────────────────────────────────────────────┘1.2 Level-Triggered vs Edge-Triggered
Section titled “1.2 Level-Triggered vs Edge-Triggered”This distinction is fundamental:
| Approach | Reacts To | Problem |
|---|---|---|
| Edge-triggered | Individual events (ADDED, MODIFIED, DELETED) | If you miss an event, state diverges forever |
| Level-triggered | Current state difference (desired vs actual) | Self-healing: always converges regardless of missed events |
Kubernetes controllers are level-triggered. Your reconcile function should never ask “what event happened?” It should ask “what is the current desired state, what is the current actual state, and what do I need to do to make them match?”
Pause and predict: Suppose your controller process is killed right after the API server emits an ADDED event for a new
WebAppresource, but before the controller processes it. When the controller restarts 5 minutes later, the ADDED event is gone from the API server’s watch stream. How does the controller know to create the associated Deployment?
1.3 Idempotency
Section titled “1.3 Idempotency”Every reconciliation must be idempotent: running it 1 time or 100 times produces the same result. This means:
- Use
Createwith conflict detection, not blind creates - Use
Updatewith resource version checks - Check if a resource already exists before creating it
- Make decisions based on current state, not event history
Stop and think: If your
syncHandlerfunction blindly callsCreateon a Deployment without checking if it exists, what happens on the second reconciliation loop for the sameWebApp?
Part 2: Controller Architecture
Section titled “Part 2: Controller Architecture”2.1 Component Overview
Section titled “2.1 Component Overview”┌─────────────────────────────────────────────────────────────────────┐│ Controller Components ││ ││ API Server ││ │ ││ │ WATCH (primary resource: WebApp) ││ ▼ ││ ┌───────────────┐ ┌──────────────┐ ┌──────────────────┐ ││ │ Informer │───▶│ DeltaFIFO │───▶│ Indexer/Cache │ ││ │ (WebApp) │ │ │ │ │ ││ └───────┬───────┘ └──────────────┘ └────────┬─────────┘ ││ │ │ ││ │ Event Handlers │ Lister ││ ▼ ▼ ││ ┌───────────────┐ ┌──────────────────┐ ││ │ Workqueue │ │ Read desired │ ││ │ (rate- │ │ state from │ ││ │ limited) │ │ cache │ ││ └───────┬───────┘ └──────────────────┘ ││ │ ││ │ Dequeue keys ││ ▼ ││ ┌───────────────────────────────────────────────────────────┐ ││ │ syncHandler │ ││ │ │ ││ │ 1. Get WebApp from Lister │ ││ │ 2. Get/Create owned Deployment │ ││ │ 3. Get/Create owned Service │ ││ │ 4. Update WebApp status │ ││ │ 5. Emit Events │ ││ │ │ ││ │ On error → re-enqueue with backoff │ ││ │ On success → forget (reset backoff) │ ││ └───────────────────────────────────────────────────────────┘ ││ ││ API Server ││ │ ││ │ WATCH (secondary resources: Deployment, Service) ││ ▼ ││ ┌───────────────┐ ││ │ Informers │── Event handlers look up ownerRef ││ │ (Deployment, │ and enqueue the parent WebApp key ││ │ Service) │ ││ └───────────────┘ ││ │└─────────────────────────────────────────────────────────────────────┘2.2 Watching Owned Resources
Section titled “2.2 Watching Owned Resources”When your controller creates a Deployment, you also need to know when that Deployment changes (e.g., it becomes ready, or someone deletes it). You watch Deployments too, but when an event fires, you look up the owner reference to find the parent WebApp, and enqueue that key.
Pause and predict: You manually delete a Deployment that is owned by a
WebAppcustom resource. Walk through the exact chain of events in the controller architecture diagram that leads to the Deployment being recreated.
// When a Deployment changes, enqueue the owning WebAppdeploymentInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.handleOwnedResource(obj) }, UpdateFunc: func(old, new interface{}) { controller.handleOwnedResource(new) }, DeleteFunc: func(obj interface{}) { controller.handleOwnedResource(obj) },})
func (c *Controller) handleOwnedResource(obj interface{}) { object, ok := obj.(metav1.Object) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { return } object, ok = tombstone.Obj.(metav1.Object) if !ok { return } }
// Look for an owner reference pointing to our CRD ownerRef := metav1.GetControllerOf(object) if ownerRef == nil || ownerRef.Kind != "WebApp" { return }
// Enqueue the parent WebApp webapp, err := c.webappLister.WebApps(object.GetNamespace()).Get(ownerRef.Name) if err != nil { return } c.enqueue(webapp)}Part 3: The Complete Controller
Section titled “Part 3: The Complete Controller”3.1 Project Structure
Section titled “3.1 Project Structure”webapp-controller/├── go.mod├── go.sum├── main.go # Entry point, signal handling, leader election├── controller.go # Controller struct and reconcile logic├── crd/│ └── webapp-crd.yaml # CRD definition from Module 1.2└── deploy/ └── rbac.yaml # RBAC for the controller ServiceAccount3.2 CRD Types (Simplified)
Section titled “3.2 CRD Types (Simplified)”Since we are not using code generation, we will work with unstructured objects. But first, let us define our Go types for clarity:
package main
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// WebApp represents our custom resource.type WebApp struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec WebAppSpec `json:"spec"` Status WebAppStatus `json:"status,omitempty"`}
type WebAppSpec struct { Image string `json:"image"` Replicas *int32 `json:"replicas,omitempty"` Port int32 `json:"port,omitempty"`}
type WebAppStatus struct { ReadyReplicas int32 `json:"readyReplicas,omitempty"` Phase string `json:"phase,omitempty"`}3.3 Controller Implementation
Section titled “3.3 Controller Implementation”package main
import ( "context" "encoding/json" "fmt" "time"
appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2")
var webappGVR = schema.GroupVersionResource{ Group: "apps.kubedojo.io", Version: "v1beta1", Resource: "webapps",}
const ( controllerName = "webapp-controller" maxRetries = 5)
// Controller manages WebApp resources.type Controller struct { kubeClient kubernetes.Interface dynamicClient dynamic.Interface
// Listers for reading from cache deploymentLister appslisters.DeploymentLister serviceLister corelisters.ServiceLister
// Informer synced functions deploymentSynced cache.InformerSynced serviceSynced cache.InformerSynced webappSynced cache.InformerSynced
// Dynamic informer for our CRD webappInformer cache.SharedIndexInformer
// Workqueue queue workqueue.TypedRateLimitingInterface[string]
// Event recorder recorder record.EventRecorder}
// NewController creates a new WebApp controller.func NewController( kubeClient kubernetes.Interface, dynamicClient dynamic.Interface, kubeInformerFactory informers.SharedInformerFactory, dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory, recorder record.EventRecorder,) *Controller {
// Get informers for owned resources deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() serviceInformer := kubeInformerFactory.Core().V1().Services()
// Get dynamic informer for our CRD webappInformer := dynamicInformerFactory.ForResource(webappGVR).Informer()
c := &Controller{ kubeClient: kubeClient, dynamicClient: dynamicClient, deploymentLister: deploymentInformer.Lister(), serviceLister: serviceInformer.Lister(), deploymentSynced: deploymentInformer.Informer().HasSynced, serviceSynced: serviceInformer.Informer().HasSynced, webappSynced: webappInformer.HasSynced, webappInformer: webappInformer, queue: workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{ Name: controllerName, }, ), recorder: recorder, }
// Set up event handlers for WebApp (primary resource) webappInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.enqueueWebApp(obj) }, UpdateFunc: func(old, new interface{}) { c.enqueueWebApp(new) }, DeleteFunc: func(obj interface{}) { c.enqueueWebApp(obj) }, })
// Set up event handlers for owned Deployments deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.handleOwnedObject(obj) }, UpdateFunc: func(old, new interface{}) { c.handleOwnedObject(new) }, DeleteFunc: func(obj interface{}) { c.handleOwnedObject(obj) }, })
// Set up event handlers for owned Services serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.handleOwnedObject(obj) }, UpdateFunc: func(old, new interface{}) { c.handleOwnedObject(new) }, DeleteFunc: func(obj interface{}) { c.handleOwnedObject(obj) }, })
return c}
func (c *Controller) enqueueWebApp(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { utilruntime.HandleError(fmt.Errorf("getting key for object: %v", err)) return } c.queue.Add(key)}
func (c *Controller) handleOwnedObject(obj interface{}) { var object metav1.Object var ok bool
if object, ok = obj.(metav1.Object); !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) return } object, ok = tombstone.Obj.(metav1.Object) if !ok { utilruntime.HandleError(fmt.Errorf("error decoding tombstone, invalid type")) return } }
ownerRef := metav1.GetControllerOf(object) if ownerRef == nil { return }
if ownerRef.Kind != "WebApp" { return }
// Enqueue the parent WebApp key := object.GetNamespace() + "/" + ownerRef.Name c.queue.Add(key)}
// Run starts the controller.func (c *Controller) Run(ctx context.Context, workers int) error { defer utilruntime.HandleCrash() defer c.queue.ShutDown()
klog.Infof("Starting %s", controllerName)
// Wait for all caches to sync klog.Info("Waiting for informer caches to sync") if ok := cache.WaitForCacheSync(ctx.Done(), c.deploymentSynced, c.serviceSynced, c.webappSynced, ); !ok { return fmt.Errorf("failed to wait for caches to sync") }
klog.Infof("Starting %d workers", workers) for i := 0; i < workers; i++ { go c.runWorker(ctx) }
klog.Info("Controller started") <-ctx.Done() klog.Info("Shutting down controller") return nil}
func (c *Controller) runWorker(ctx context.Context) { for c.processNextWorkItem(ctx) { }}
func (c *Controller) processNextWorkItem(ctx context.Context) bool { key, shutdown := c.queue.Get() if shutdown { return false } defer c.queue.Done(key)
err := c.syncHandler(ctx, key) if err == nil { // Success — reset the rate limiter for this key c.queue.Forget(key) return true }
// Failure — re-enqueue with rate limiting if c.queue.NumRequeues(key) < maxRetries { klog.Warningf("Error syncing %q (retry %d/%d): %v", key, c.queue.NumRequeues(key)+1, maxRetries, err) c.queue.AddRateLimited(key) return true }
// Too many retries — give up on this key klog.Errorf("Dropping %q after %d retries: %v", key, maxRetries, err) c.queue.Forget(key) utilruntime.HandleError(err) return true}
// syncHandler is the core reconciliation logic.func (c *Controller) syncHandler(ctx context.Context, key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return fmt.Errorf("invalid resource key: %s", key) }
// OBSERVE: Get the WebApp from the cache unstructuredObj, err := c.webappInformer.GetIndexer().ByIndex( cache.NamespaceIndex, namespace) if err != nil { return err }
// Find the specific WebApp var webapp *WebApp for _, item := range unstructuredObj { u := item.(*unstructured.Unstructured) if u.GetName() == name && u.GetNamespace() == namespace { webapp, err = unstructuredToWebApp(u) if err != nil { return fmt.Errorf("converting unstructured to WebApp: %v", err) } break } }
if webapp == nil { // WebApp was deleted — owned resources will be garbage collected // via OwnerReferences klog.Infof("WebApp %s deleted, owned resources will be GC'd", key) return nil }
// Set defaults replicas := int32(2) if webapp.Spec.Replicas != nil { replicas = *webapp.Spec.Replicas } port := int32(8080) if webapp.Spec.Port > 0 { port = webapp.Spec.Port }
// ANALYZE + ACT: Ensure Deployment exists and matches spec deploymentName := webapp.Name deployment, err := c.deploymentLister.Deployments(namespace).Get(deploymentName) if errors.IsNotFound(err) { // Create the Deployment deployment, err = c.kubeClient.AppsV1().Deployments(namespace).Create( ctx, c.newDeployment(webapp, deploymentName, replicas, port), metav1.CreateOptions{}, ) if err != nil { return fmt.Errorf("creating deployment: %v", err) } klog.Infof("Created Deployment %s/%s", namespace, deploymentName) c.recorder.Eventf(webapp, corev1.EventTypeNormal, "DeploymentCreated", "Created Deployment %s", deploymentName) } else if err != nil { return fmt.Errorf("getting deployment: %v", err) } else { // Deployment exists — check if it needs updating if *deployment.Spec.Replicas != replicas || deployment.Spec.Template.Spec.Containers[0].Image != webapp.Spec.Image { deploymentCopy := deployment.DeepCopy() deploymentCopy.Spec.Replicas = &replicas deploymentCopy.Spec.Template.Spec.Containers[0].Image = webapp.Spec.Image _, err = c.kubeClient.AppsV1().Deployments(namespace).Update( ctx, deploymentCopy, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("updating deployment: %v", err) } klog.Infof("Updated Deployment %s/%s", namespace, deploymentName) c.recorder.Eventf(webapp, corev1.EventTypeNormal, "DeploymentUpdated", "Updated Deployment %s (replicas=%d, image=%s)", deploymentName, replicas, webapp.Spec.Image) } }
// Ensure Service exists serviceName := webapp.Name _, err = c.serviceLister.Services(namespace).Get(serviceName) if errors.IsNotFound(err) { _, err = c.kubeClient.CoreV1().Services(namespace).Create( ctx, c.newService(webapp, serviceName, port), metav1.CreateOptions{}, ) if err != nil { return fmt.Errorf("creating service: %v", err) } klog.Infof("Created Service %s/%s", namespace, serviceName) c.recorder.Eventf(webapp, corev1.EventTypeNormal, "ServiceCreated", "Created Service %s", serviceName) } else if err != nil { return fmt.Errorf("getting service: %v", err) }
// Update status err = c.updateStatus(ctx, webapp, deployment) if err != nil { return fmt.Errorf("updating status: %v", err) }
return nil}
func (c *Controller) newDeployment(webapp *WebApp, name string, replicas int32, port int32) *appsv1.Deployment { labels := map[string]string{ "app": name, "app.kubernetes.io/managed-by": controllerName, }
return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: webapp.Namespace, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(webapp, schema.GroupVersionKind{ Group: "apps.kubedojo.io", Version: "v1beta1", Kind: "WebApp", }), }, Labels: labels, }, Spec: appsv1.DeploymentSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ MatchLabels: labels, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: "app", Image: webapp.Spec.Image, Ports: []corev1.ContainerPort{ { ContainerPort: port, Protocol: corev1.ProtocolTCP, }, }, }, }, }, }, }, }}
func (c *Controller) newService(webapp *WebApp, name string, port int32) *corev1.Service { return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: webapp.Namespace, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(webapp, schema.GroupVersionKind{ Group: "apps.kubedojo.io", Version: "v1beta1", Kind: "WebApp", }), }, Labels: map[string]string{ "app": name, "app.kubernetes.io/managed-by": controllerName, }, }, Spec: corev1.ServiceSpec{ Selector: map[string]string{ "app": name, }, Ports: []corev1.ServicePort{ { Port: port, TargetPort: intstr.FromInt32(port), Protocol: corev1.ProtocolTCP, }, }, Type: corev1.ServiceTypeClusterIP, }, }}
func (c *Controller) updateStatus(ctx context.Context, webapp *WebApp, deployment *appsv1.Deployment) error { readyReplicas := int32(0) phase := "Pending"
if deployment != nil { readyReplicas = deployment.Status.ReadyReplicas if deployment.Status.ReadyReplicas == *deployment.Spec.Replicas { phase = "Running" } else if deployment.Status.ReadyReplicas > 0 { phase = "Deploying" } }
// Build the status patch patch := map[string]interface{}{ "status": map[string]interface{}{ "readyReplicas": readyReplicas, "phase": phase, }, }
patchBytes, err := json.Marshal(patch) if err != nil { return err }
_, err = c.dynamicClient.Resource(webappGVR).Namespace(webapp.Namespace).Patch( ctx, webapp.Name, "application/merge-patch+json", patchBytes, metav1.PatchOptions{}, "status", ) return err}
// unstructuredToWebApp converts an unstructured object to a WebApp.func unstructuredToWebApp(u *unstructured.Unstructured) (*WebApp, error) { data, err := json.Marshal(u.Object) if err != nil { return nil, err } var webapp WebApp if err := json.Unmarshal(data, &webapp); err != nil { return nil, err } // Copy ObjectMeta fields that are needed webapp.Name = u.GetName() webapp.Namespace = u.GetNamespace() webapp.UID = u.GetUID() return &webapp, nil}3.4 Main Entry Point
Section titled “3.4 Main Entry Point”package main
import ( "context" "fmt" "os" "os/signal" "path/filepath" "syscall" "time"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" "k8s.io/klog/v2")
func main() { klog.InitFlags(nil)
// Build config (supports both in-cluster and kubeconfig) config, err := buildConfig() if err != nil { klog.Fatalf("Error building config: %v", err) }
// Create clients kubeClient, err := kubernetes.NewForConfig(config) if err != nil { klog.Fatalf("Error creating kubernetes client: %v", err) }
dynamicClient, err := dynamic.NewForConfig(config) if err != nil { klog.Fatalf("Error creating dynamic client: %v", err) }
// Create informer factories kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, 30*time.Second) dynamicInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory( dynamicClient, 30*time.Second)
// Create event recorder eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{ Interface: kubeClient.CoreV1().Events(""), }) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{ Component: controllerName, })
// Create controller controller := NewController( kubeClient, dynamicClient, kubeInformerFactory, dynamicInformerFactory, recorder, )
// Set up shutdown context ctx, cancel := context.WithCancel(context.Background()) defer cancel()
sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) go func() { sig := <-sigCh klog.Infof("Received signal %v, initiating shutdown", sig) cancel() }()
// Start informer factories kubeInformerFactory.Start(ctx.Done()) dynamicInformerFactory.Start(ctx.Done())
// Run controller with 2 workers if err := controller.Run(ctx, 2); err != nil { klog.Fatalf("Error running controller: %v", err) }}
func buildConfig() (*rest.Config, error) { // Try in-cluster config first config, err := rest.InClusterConfig() if err == nil { return config, nil }
// Fall back to kubeconfig home, _ := os.UserHomeDir() kubeconfig := filepath.Join(home, ".kube", "config") return clientcmd.BuildConfigFromFlags("", kubeconfig)}Note: The
main.gofile referencescorev1— you will need to addcorev1 "k8s.io/api/core/v1"to the imports. Your IDE orgoimportswill handle this.
Part 4: Rate Limiting and Retry Strategies
Section titled “Part 4: Rate Limiting and Retry Strategies”4.1 Built-in Rate Limiters
Section titled “4.1 Built-in Rate Limiters”client-go provides several rate limiters:
// Default: combines exponential backoff with a bucket rate limiterqueue := workqueue.NewTypedRateLimitingQueue( workqueue.DefaultTypedControllerRateLimiter[string](),)
// Custom: exponential backoff (5ms base, 1000s max)queue := workqueue.NewTypedRateLimitingQueue( workqueue.NewTypedItemExponentialFailureRateLimiter[string]( 5*time.Millisecond, // base delay 1000*time.Second, // max delay ),)
// Custom: fixed rate (10 items/sec, burst of 100)queue := workqueue.NewTypedRateLimitingQueue( &workqueue.TypedBucketRateLimiter[string]{ Limiter: rate.NewLimiter(rate.Limit(10), 100), },)
// Combine multiple limiters (all must allow)queue := workqueue.NewTypedRateLimitingQueue( workqueue.NewTypedMaxOfRateLimiter( workqueue.NewTypedItemExponentialFailureRateLimiter[string]( 5*time.Millisecond, 60*time.Second), &workqueue.TypedBucketRateLimiter[string]{ Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ),)4.2 Retry Best Practices
Section titled “4.2 Retry Best Practices”| Practice | Why |
|---|---|
| Cap max retries (e.g., 5-15) | Prevents infinite retry loops |
| Use exponential backoff | Prevents thundering herd on transient failures |
| Log retries with count | Enables monitoring and alerting |
| Forget on success | Resets backoff for next failure |
| Distinguish retryable vs fatal errors | Do not retry validation errors |
Stop and think: A user creates a
WebAppwith a spec that contains a syntax error, causing the Deployment creation to fail API validation. Should your controller retry this operation with exponential backoff? Why or why not?
func (c *Controller) processNextWorkItem(ctx context.Context) bool { key, shutdown := c.queue.Get() if shutdown { return false } defer c.queue.Done(key)
err := c.syncHandler(ctx, key)
switch { case err == nil: c.queue.Forget(key) case errors.IsConflict(err): // Resource version conflict — retry immediately klog.V(4).Infof("Conflict on %s, retrying", key) c.queue.AddRateLimited(key) case errors.IsNotFound(err): // Resource gone — no point retrying klog.V(4).Infof("Resource %s not found, skipping", key) c.queue.Forget(key) case c.queue.NumRequeues(key) < maxRetries: klog.Warningf("Error syncing %s (attempt %d): %v", key, c.queue.NumRequeues(key)+1, err) c.queue.AddRateLimited(key) default: klog.Errorf("Dropping %s after %d attempts: %v", key, maxRetries, err) c.queue.Forget(key) }
return true}Part 5: Graceful Shutdown
Section titled “Part 5: Graceful Shutdown”5.1 Shutdown Sequence
Section titled “5.1 Shutdown Sequence”A controller must shut down cleanly to avoid data loss and duplicate processing:
Signal received (SIGTERM/SIGINT) │ ├── 1. Cancel context → informers stop watching │ ├── 2. queue.ShutDown() → workers drain remaining items │ ├── 3. Workers finish current item → return false │ ├── 4. Event broadcaster stops │ └── 5. Process exits5.2 Implementation
Section titled “5.2 Implementation”The graceful shutdown is already built into our controller via the context cancellation pattern. The key points are:
ctx.Done()stops the informersdefer c.queue.ShutDown()inRun()drains the queue- Workers check
shutdownfromqueue.Get()and exit defer cancel()inmain()ensures cleanup on any exit path
Part 6: Leader Election
Section titled “Part 6: Leader Election”6.1 Why Leader Election?
Section titled “6.1 Why Leader Election?”When you run multiple replicas of your controller for high availability, only one should be actively reconciling at a time. Leader election uses a Kubernetes Lease resource to coordinate:
// main.go — add leader electionimport ( "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock")
func runWithLeaderElection(ctx context.Context, kubeClient kubernetes.Interface, startFunc func(ctx context.Context)) {
id, _ := os.Hostname()
lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: "webapp-controller-leader", Namespace: "webapp-system", }, Client: kubeClient.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: id, }, }
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: lock, LeaseDuration: 15 * time.Second, RenewDeadline: 10 * time.Second, RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { klog.Info("Became leader, starting controller") startFunc(ctx) }, OnStoppedLeading: func() { klog.Info("Lost leadership, shutting down") os.Exit(0) }, OnNewLeader: func(identity string) { if identity == id { return } klog.Infof("New leader elected: %s", identity) }, }, ReleaseOnCancel: true, })}6.2 Leader Election Parameters
Section titled “6.2 Leader Election Parameters”| Parameter | Typical Value | Description |
|---|---|---|
| LeaseDuration | 15s | How long a non-leader waits before trying to acquire |
| RenewDeadline | 10s | How long the leader has to renew before losing the lease |
| RetryPeriod | 2s | How often to retry acquiring the lease |
| ReleaseOnCancel | true | Release lease on graceful shutdown |
Pause and predict: You have two replicas of your controller running. Replica A is the leader. Replica A experiences a network partition and cannot reach the API server, but its process is still running. What happens to Replica B, and how long does it take?
Common Mistakes
Section titled “Common Mistakes”| Mistake | Problem | Solution |
|---|---|---|
| Processing events directly in handlers | Blocks the informer, dropped events | Always enqueue keys; process in workers |
| Not setting OwnerReferences | Orphaned resources on CRD deletion | Always set controller owner reference |
| Comparing events instead of states | Misses events on restart | Compare desired vs actual state only |
| No rate limiting on queue | Overwhelms API Server on failure loops | Use NewRateLimitingQueue |
| Single worker thread | Slow reconciliation under load | Use 2-4 workers for production |
| Not handling tombstones | Panic on DeletedFinalStateUnknown | Type-check and unwrap tombstones |
| Hardcoded namespace | Controller only works in one namespace | Use namespace from object key |
| No graceful shutdown | Lost in-flight work, duplicate processing | Handle SIGTERM, drain queue |
Ignoring IsNotFound errors | Retrying forever for deleted resources | Check error type, skip not-found |
| Direct API calls in hot paths | High API Server load | Use Listers from informer cache |
-
Scenario: Your controller has been down for 10 minutes due to a node failure. During this time, users created 50
WebAppresources and deleted 20. When your controller restarts, it does not receive the historical stream of ADDED and DELETED events. How does it still manage to converge the cluster to the correct state?Answer
Kubernetes controllers use level-triggered reconciliation rather than edge-triggered logic, meaning they react to the current state difference rather than individual change events. When the controller restarts, its Informers perform a LIST operation to populate the local cache with the exact current state of all `WebApp` resources. The controller then compares this desired state against the actual state of the cluster (existing Deployments and Services). Because it doesn't rely on historical event replays, it inherently self-heals and processes only the net result of all changes that occurred during its downtime. -
Scenario: A user runs a script that patches the same
WebAppresource 100 times in 5 seconds to update various annotations. Your controller’s Informer receives 100 MODIFIED events. Why doesn’t your controller attempt to reconcile the object 100 times?Answer
Controllers enqueue string keys (like `namespace/name`) rather than passing full resource objects directly to the workqueue. The workqueue inherently deduplicates identical keys that are added before they are processed by a worker. By the time a worker dequeues the key and fetches the object from the Informer's local cache, it reads the absolute latest version of the resource. This ensures the controller only performs the computationally expensive Observe-Analyze-Act loop on the final state, effectively ignoring the intermediate noise. -
Scenario: You decide to decommission a
WebAppnamedfrontend-appand runkubectl delete webapp frontend-app. Your controller’ssyncHandlerlogs show that it noticed the deletion, but its code does not contain any explicit API calls to delete the associated Deployment and Service. How do the child resources get cleaned up?Answer
The child resources are automatically cleaned up by the Kubernetes Garbage Collector, not by your custom controller. When your controller initially created the Deployment and Service, it attached an `OwnerReference` pointing back to the parent `WebApp` resource. When the API Server processes the deletion of the `WebApp`, the Garbage Collector detects these references and automatically initiates a cascading deletion of all dependent resources. This mechanism ensures reliable cleanup without requiring complex deletion logic or finalizers in your controller code. -
Scenario: You remove the
cache.WaitForCacheSynccall from your controller’sRunmethod to speed up startup time. Upon restarting the controller, it immediately begins processingWebAppitems from the workqueue. Suddenly, the controller starts creating duplicate Deployments forWebAppresources that already have running Deployments. Why did this happen?Answer
Without waiting for cache synchronization, the controller's workers begin executing the Analyze phase while the local Informer caches are still empty or partially populated. When the `syncHandler` asks the `deploymentLister` if a Deployment exists for a given `WebApp`, the cache incorrectly returns a "Not Found" error because it hasn't finished pulling state from the API server. The controller interprets this as a missing resource and erroneously issues a Create call, leading to duplicates or conflicts. `WaitForCacheSync` guarantees the Observe phase has an accurate worldview before any Actions are taken. -
Scenario: Your controller attempts to create a Deployment for a
WebApp, but the API server rejects the request due to a transient webhook timeout. ThesyncHandlerreturns an error. How does the controller ensure thisWebAppis eventually processed without overwhelming the API server?Answer
When the `syncHandler` returns an error, the controller's worker calls `AddRateLimited` to place the key back into the workqueue. The workqueue applies an exponential backoff algorithm, delaying the key's availability for reprocessing (e.g., waiting 5ms, then 10ms, then 20ms). This prevents a thundering herd scenario where a failing controller endlessly hammers the API server in a tight loop. If the reconciliation continues to fail and exceeds the configured `maxRetries` (e.g., 5 attempts), the key is dropped from the queue via `Forget` to prevent infinite poison-pill processing. -
Scenario: A junior admin accidentally runs
kubectl scale deployment my-webapp --replicas=0, overriding theWebAppcustom resource which specifies 3 replicas. Within milliseconds, the Deployment scales back up to 3 replicas automatically. How did your controller detect and fix this drift so quickly?Answer
The controller maintains Informers not just on the primary `WebApp` resource, but also on secondary resources like Deployments and Services. When the admin scaled the Deployment, the API server emitted a MODIFIED event for that Deployment, which the controller's Deployment Informer intercepted. The controller's event handler examined the Deployment's `OwnerReference`, identified the parent `WebApp`, and enqueued the parent's key. The subsequent reconciliation loop compared the desired state (3 replicas) against the new actual state (0 replicas) and immediately issued an Update to correct the drift. -
Scenario: A worker successfully reconciles a
WebAppand callsqueue.Done(key)but forgets to callqueue.Forget(key). Later, the sameWebAppis modified, enqueued, and fails reconciliation once due to a minor conflict. Instead of a quick 5ms retry, the queue delays the retry for 10 seconds. What caused this unexpected delay?Answer
`queue.Done(key)` only signals that the worker has finished processing the item, allowing the queue to make that key available for processing again if it was re-added. `queue.Forget(key)`, however, is responsible for clearing the item's failure history and resetting its exponential backoff counter. Because `Forget` was omitted after the previous success, the rate limiter remembered the old failure count and applied a much larger, compounded backoff penalty when the new transient failure occurred. You must always call `Forget` upon success to ensure future failures start with a fresh, minimal backoff delay.
Hands-On Exercise
Section titled “Hands-On Exercise”Task: Build, deploy, and test a complete custom controller that watches WebApp CRs and creates Deployments and Services.
Setup:
# Create a clusterkind create cluster --name controller-lab
# Apply the WebApp CRD from Module 1.2# (use the simplified version below)cat << 'EOF' | k apply -f -apiVersion: apiextensions.k8s.io/v1kind: CustomResourceDefinitionmetadata: name: webapps.apps.kubedojo.iospec: group: apps.kubedojo.io names: kind: WebApp listKind: WebAppList plural: webapps singular: webapp shortNames: ["wa"] scope: Namespaced versions: - name: v1beta1 served: true storage: true subresources: status: {} additionalPrinterColumns: - name: Image type: string jsonPath: .spec.image - name: Replicas type: integer jsonPath: .spec.replicas - name: Ready type: integer jsonPath: .status.readyReplicas - name: Phase type: string jsonPath: .status.phase - name: Age type: date jsonPath: .metadata.creationTimestamp schema: openAPIV3Schema: type: object properties: spec: type: object required: ["image"] properties: image: type: string replicas: type: integer minimum: 1 maximum: 50 default: 2 port: type: integer minimum: 1 maximum: 65535 default: 8080 status: type: object properties: readyReplicas: type: integer phase: type: stringEOFSteps:
- Create the project:
mkdir -p ~/extending-k8s/webapp-controller && cd ~/extending-k8s/webapp-controllergo mod init github.com/example/webapp-controllergo get k8s.io/client-go@latest k8s.io/apimachinery@latest k8s.io/api@latest k8s.io/klog/v2@latest-
Create the source files from the code in Parts 3.2, 3.3, and 3.4
-
Build and run:
go build -o webapp-controller ../webapp-controller -v=2- In another terminal, create a WebApp:
cat << 'EOF' | k apply -f -apiVersion: apps.kubedojo.io/v1beta1kind: WebAppmetadata: name: demo-appspec: image: nginx:1.27 replicas: 3 port: 80EOF- Verify the controller creates resources:
# Check WebApp statusk get webapp demo-app
# Check created Deploymentk get deployment demo-appk describe deployment demo-app | grep "Controlled By"
# Check created Servicek get svc demo-app
# Check eventsk get events --sort-by=.lastTimestamp | grep webapp- Test self-healing:
# Delete the Deployment — controller should recreate itk delete deployment demo-appsleep 5k get deployment demo-app
# Scale the WebAppk patch webapp demo-app --type=merge -p '{"spec":{"replicas":5}}'sleep 5k get deployment demo-app- Test deletion cascade:
k delete webapp demo-appsleep 5k get deployment demo-app # Should be gone (GC'd via OwnerRef)k get svc demo-app # Should be gone- Cleanup:
kind delete cluster --name controller-labSuccess Criteria:
- Controller compiles and starts without errors
- Cache sync completes (check logs)
- Creating a WebApp triggers Deployment + Service creation
- Deployment has correct OwnerReference pointing to WebApp
- Deleting the Deployment triggers controller to recreate it
- Updating WebApp replicas updates the Deployment
- Deleting the WebApp cascades deletion to Deployment + Service
- Kubernetes Events are recorded for create/update actions
- Ctrl+C triggers graceful shutdown
Next Module
Section titled “Next Module”Module 1.4: The Operator Pattern & Kubebuilder - Use the Kubebuilder framework to build operators with less boilerplate and more structure.