Flux GitOps Toolkit: Extending Functionality for Platform Engineering

Flux is an open-source GitOps toolkit designed for managing Kubernetes clusters. It enables developers and platform engineers to automate the deployment, update, and management of applications and infrastructure using a Git repository as the single source of truth. In this blog post, we will explore how to extend the functionality of Flux using its custom resources and controllers.

Custom Resources and Controllers

Flux uses custom resources and controllers to extend its functionality. Custom resources are Kubernetes API resources that are defined by the user and are not part of the core Kubernetes API. Controllers are Kubernetes controllers that watch for changes to custom resources and take appropriate actions based on those changes.

Flux provides several built-in custom resources, including:

  • HelmRelease: Represents a Helm release and its associated configuration.

  • Kustomization: Represents a Kustomize configuration and its associated resources.

  • Source: Represents a Git repository and its associated branches and paths.

Flux also provides several built-in controllers, including:

  • HelmController: Watches for changes to HelmRelease custom resources and manages Helm releases.

  • KustomizeController: Watches for changes to Kustomization custom resources and manages Kubernetes resources.

  • SourceController: Watches for changes to Source custom resources and manages Git repositories.

Extending Flux with Custom Resources and Controllers

To extend the functionality of Flux, you can define your own custom resources and controllers. This enables you to integrate Flux with other tools and services, and to automate custom workflows.

Defining Custom Resources

To define a custom resource, you need to create a CustomResourceDefinition (CRD) that defines the schema for the resource. The schema specifies the fields and their types, as well as any validation rules. Here is an example of a CRD for a custom resource called MyApp:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: myapps.example.com
spec:
  group: example.com
  names:
    kind: MyApp
    listKind: MyAppList
    plural: myapps
    singular: myapp
  scope: Namespaced
  subresources:
    status: {}
  version: v1alpha1

This CRD defines a custom resource called MyApp that belongs to the example.com group and has a plural name of myapps. It also defines a status subresource that can be used to store the current state of the resource.

Once you have created the CRD, you can create instances of the custom resource using a YAML manifest. Here is an example of a MyApp custom resource:

apiVersion: example.com/v1alpha1
kind: MyApp
metadata:
  name: my-app
spec:
  image: my-registry/my-app:latest
  replicas: 3

This custom resource specifies the image and replica count for the MyApp deployment.

Defining Custom Controllers

To define a custom controller, you need to create a Kubernetes controller that watches for changes to your custom resource and takes appropriate actions. You can use any programming language to write your controller, but Go is the most common choice due to its strong support for Kubernetes APIs.

Flux provides a library called flux-sdk that makes it easy to write custom controllers in Go. The flux-sdk library provides a set of interfaces and utilities for interacting with Flux and its custom resources.

Here is an example of a custom controller written using the flux-sdk library:

package main

import (
  "context"
  "fmt"
  "time"

  corev1 "k8s.io/api/core/v1"
  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  "k8s.io/apimachinery/pkg/runtime"
  ctrl "sigs.k8s.io/controller-runtime"
  "sigs.k8s.io/controller-runtime/pkg/client"

  examplev1alpha1 "github.com/example/myapp/api/v1alpha1"
  fluxv1 "github.com/weaveworks/flux/api/v1"
)

const (
  appLabel = "app.example.com/name"
)

func main() {
  mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
  if err != nil {
    panic(err)
  }

  err = examplev1alpha1.AddToScheme(mgr.GetScheme())
  if err != nil {
    panic(err)
  }

  err = fluxv1.AddToScheme(mgr.GetScheme())
  if err != nil {
    panic(err)
  }

  err = mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, appLabel, func(rawObj client.Object) []string {
    pod := rawObj.(*corev1.Pod)
    return []string{pod.Labels[appLabel]}
  })
  if err != nil {
    panic(err)
  }

  ctrl.NewControllerManagedBy(mgr).
    For(&examplev1alpha1.MyApp{}).
    Watches(&corev1.Pod{}, &handler{}).
    Complete(myAppReconciler{client: mgr.GetClient()})

  go func() {
    for {
      time.Sleep(10 * time.Minute)
      fmt.Println("Checking for new MyApp resources...")
      list := &examplev1alpha1.MyAppList{}
      err := mgr.GetClient().List(context.Background(), list)
      if err != nil {
        panic(err)
      }
      for _, myApp := range list.Items {
        fmt.Printf("Found MyApp resource: %s\n", myApp.Name)
      }
    }
  }()

  mgr.Start(ctrl.SetupSignalHandler())
}

type handler struct{}

func (h *handler) Create(ctx context.Context, event ctrl.Event, r controllerRuntime.Reconciler) error {
// Handle creation of Pods for new MyApp resources
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: event.Meta.GetName(),
Namespace: event.Meta.GetNamespace(),
Labels: map[string]string{
appLabel: event.Meta.GetName(),
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: event.Meta.GetName(),
Image: event.Meta.GetLabels()["image"],
},
},
},
}
err := r.GetClient().Create(ctx, pod)
if err != nil {
return err
}
return nil
}
func (h *handler) Update(ctx context.Context, event ctrl.Event, r controllerRuntime.Reconciler) error {
// Handle updates to Pods for existing MyApp resources
pod := &corev1.Pod{}
err := r.GetClient().Get(ctx, client.ObjectKey{Namespace: event.Meta.GetNamespace(), Name: event.Meta.GetName()}, pod)
if err != nil {
return err
}
pod.Spec.Containers.Image = event.Meta.GetLabels()["image"]
err = r.GetClient().Update(ctx, pod)
if err != nil {
return err
}
return nil
}
func (h *handler) Delete(ctx context.Context, event ctrl.Event, r controllerRuntime.Reconciler) error {
// Handle deletion of Pods for deleted MyApp resources
pod := &corev1.Pod{}
err := r.GetClient().Get(ctx, client.ObjectKey{Namespace: event.Meta.GetNamespace(), Name: event.Meta.GetName()}, pod)
if err != nil {
return err
}
err = r.GetClient().Delete(ctx, pod)
if err != nil {
return err
}
return nil
}
type myAppReconciler struct {
client client.Client
}
func (r myAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Get the MyApp resource
myApp := &examplev1alpha1.MyApp{}
err := r.client.Get(ctx, req.NamespacedName, myApp)
if err != nil {
return ctrl.Result{}, err
}
// Check if the MyApp resource is marked for deletion
if myApp.DeletionTimestamp != nil {
return ctrl.Result{}, nil
}
// Get the Pod for the MyApp resource
podList := &corev1.PodList{}
err = r.client.List(ctx, podList, client.MatchingLabels{appLabel: myApp.Name})
if err != nil {
return ctrl.Result{}, err
}
// Create a new Pod if none exist
if len(podList.Items) == 0 {
err = r.handlePodCreation(ctx, myApp)
if err != nil {
return ctrl.Result{}, err
}
} else {
// Update the existing Pod if necessary
err = r.handlePodUpdate(ctx, myApp, &podList.Items)
if err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
func (r myAppReconciler) handlePodCreation(ctx context.Context, myApp *examplev1alpha1.MyApp) error {
// Create the Pod for the MyApp resource
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: myApp.Name,
Namespace: myApp.Namespace,
Labels: map[string]string{
appLabel: myApp.Name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: myApp.Name,
Image: myApp.Spec.Image,
},
},
},
}
err := r.client.Create(ctx, pod)
if err != nil {
return err
}
return nil
}
func (r myAppReconciler) handlePodUpdate(ctx context.Context, myApp *examplev1alpha1.MyApp, pod *corev1.Pod) error {
// Update the Pod for the MyApp resource
pod.Spec.Containers.Image = myApp.Spec.Image
err := r.client.Update(ctx, pod)
if err != nil {
return err
}
return nil
}

This custom controller watches for changes to MyApp custom resources and manages the associated Pods. It uses the flux-sdk library to interact with Flux and its custom resources, and the controller-runtime library to implement the controller.

The controller defines three handlers for creating, updating, and deleting Pods. These handlers are called by the controller-runtime library when a MyApp custom resource is created, updated, or deleted.

The Reconcile method is the main entry point for the controller. It retrieves the MyApp custom resource and checks if it is marked for deletion. If it is not marked for deletion, the controller checks if a Pod exists for the resource. If a Pod does not exist, the controller creates a new Pod. If a Pod does exist, the controller updates the Pod if necessary.

Conclusion

Flux is a powerful GitOps toolkit for managing Kubernetes clusters. By defining custom resources and controllers, you can extend the functionality of Flux to integrate with other tools and services, and to automate custom workflows. The flux-sdk library makes it easy to write custom controllers in Go, and the controller-runtime library provides a robust framework for implementing controllers. With these tools, you can build a highly automated and scalable platform engineering infrastructure.