Flux GitOps Toolkit: Extending Functionality for Platform Engineering
Flux is an open-source GitOps toolkit designed for managing Kubernetes clusters. It enables developers and platform engineers to automate the deployment, update, and management of applications and infrastructure using a Git repository as the single source of truth. In this blog post, we will explore how to extend the functionality of Flux using its custom resources and controllers.
Custom Resources and Controllers
Flux uses custom resources and controllers to extend its functionality. Custom resources are Kubernetes API resources that are defined by the user and are not part of the core Kubernetes API. Controllers are Kubernetes controllers that watch for changes to custom resources and take appropriate actions based on those changes.
Flux provides several built-in custom resources, including:
HelmRelease
: Represents a Helm release and its associated configuration.Kustomization
: Represents a Kustomize configuration and its associated resources.Source
: Represents a Git repository and its associated branches and paths.
Flux also provides several built-in controllers, including:
HelmController
: Watches for changes toHelmRelease
custom resources and manages Helm releases.KustomizeController
: Watches for changes toKustomization
custom resources and manages Kubernetes resources.SourceController
: Watches for changes toSource
custom resources and manages Git repositories.
Extending Flux with Custom Resources and Controllers
To extend the functionality of Flux, you can define your own custom resources and controllers. This enables you to integrate Flux with other tools and services, and to automate custom workflows.
Defining Custom Resources
To define a custom resource, you need to create a CustomResourceDefinition (CRD) that defines the schema for the resource. The schema specifies the fields and their types, as well as any validation rules. Here is an example of a CRD for a custom resource called MyApp
:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: myapps.example.com
spec:
group: example.com
names:
kind: MyApp
listKind: MyAppList
plural: myapps
singular: myapp
scope: Namespaced
subresources:
status: {}
version: v1alpha1
This CRD defines a custom resource called MyApp
that belongs to the example.com
group and has a plural name of myapps
. It also defines a status
subresource that can be used to store the current state of the resource.
Once you have created the CRD, you can create instances of the custom resource using a YAML manifest. Here is an example of a MyApp
custom resource:
apiVersion: example.com/v1alpha1
kind: MyApp
metadata:
name: my-app
spec:
image: my-registry/my-app:latest
replicas: 3
This custom resource specifies the image and replica count for the MyApp
deployment.
Defining Custom Controllers
To define a custom controller, you need to create a Kubernetes controller that watches for changes to your custom resource and takes appropriate actions. You can use any programming language to write your controller, but Go is the most common choice due to its strong support for Kubernetes APIs.
Flux provides a library called flux-sdk
that makes it easy to write custom controllers in Go. The flux-sdk
library provides a set of interfaces and utilities for interacting with Flux and its custom resources.
Here is an example of a custom controller written using the flux-sdk
library:
package main
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
examplev1alpha1 "github.com/example/myapp/api/v1alpha1"
fluxv1 "github.com/weaveworks/flux/api/v1"
)
const (
appLabel = "app.example.com/name"
)
func main() {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
panic(err)
}
err = examplev1alpha1.AddToScheme(mgr.GetScheme())
if err != nil {
panic(err)
}
err = fluxv1.AddToScheme(mgr.GetScheme())
if err != nil {
panic(err)
}
err = mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, appLabel, func(rawObj client.Object) []string {
pod := rawObj.(*corev1.Pod)
return []string{pod.Labels[appLabel]}
})
if err != nil {
panic(err)
}
ctrl.NewControllerManagedBy(mgr).
For(&examplev1alpha1.MyApp{}).
Watches(&corev1.Pod{}, &handler{}).
Complete(myAppReconciler{client: mgr.GetClient()})
go func() {
for {
time.Sleep(10 * time.Minute)
fmt.Println("Checking for new MyApp resources...")
list := &examplev1alpha1.MyAppList{}
err := mgr.GetClient().List(context.Background(), list)
if err != nil {
panic(err)
}
for _, myApp := range list.Items {
fmt.Printf("Found MyApp resource: %s\n", myApp.Name)
}
}
}()
mgr.Start(ctrl.SetupSignalHandler())
}
type handler struct{}
func (h *handler) Create(ctx context.Context, event ctrl.Event, r controllerRuntime.Reconciler) error {
// Handle creation of Pods for new MyApp resources
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: event.Meta.GetName(),
Namespace: event.Meta.GetNamespace(),
Labels: map[string]string{
appLabel: event.Meta.GetName(),
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: event.Meta.GetName(),
Image: event.Meta.GetLabels()["image"],
},
},
},
}
err := r.GetClient().Create(ctx, pod)
if err != nil {
return err
}
return nil
}
func (h *handler) Update(ctx context.Context, event ctrl.Event, r controllerRuntime.Reconciler) error {
// Handle updates to Pods for existing MyApp resources
pod := &corev1.Pod{}
err := r.GetClient().Get(ctx, client.ObjectKey{Namespace: event.Meta.GetNamespace(), Name: event.Meta.GetName()}, pod)
if err != nil {
return err
}
pod.Spec.Containers.Image = event.Meta.GetLabels()["image"]
err = r.GetClient().Update(ctx, pod)
if err != nil {
return err
}
return nil
}
func (h *handler) Delete(ctx context.Context, event ctrl.Event, r controllerRuntime.Reconciler) error {
// Handle deletion of Pods for deleted MyApp resources
pod := &corev1.Pod{}
err := r.GetClient().Get(ctx, client.ObjectKey{Namespace: event.Meta.GetNamespace(), Name: event.Meta.GetName()}, pod)
if err != nil {
return err
}
err = r.GetClient().Delete(ctx, pod)
if err != nil {
return err
}
return nil
}
type myAppReconciler struct {
client client.Client
}
func (r myAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Get the MyApp resource
myApp := &examplev1alpha1.MyApp{}
err := r.client.Get(ctx, req.NamespacedName, myApp)
if err != nil {
return ctrl.Result{}, err
}
// Check if the MyApp resource is marked for deletion
if myApp.DeletionTimestamp != nil {
return ctrl.Result{}, nil
}
// Get the Pod for the MyApp resource
podList := &corev1.PodList{}
err = r.client.List(ctx, podList, client.MatchingLabels{appLabel: myApp.Name})
if err != nil {
return ctrl.Result{}, err
}
// Create a new Pod if none exist
if len(podList.Items) == 0 {
err = r.handlePodCreation(ctx, myApp)
if err != nil {
return ctrl.Result{}, err
}
} else {
// Update the existing Pod if necessary
err = r.handlePodUpdate(ctx, myApp, &podList.Items)
if err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
func (r myAppReconciler) handlePodCreation(ctx context.Context, myApp *examplev1alpha1.MyApp) error {
// Create the Pod for the MyApp resource
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: myApp.Name,
Namespace: myApp.Namespace,
Labels: map[string]string{
appLabel: myApp.Name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: myApp.Name,
Image: myApp.Spec.Image,
},
},
},
}
err := r.client.Create(ctx, pod)
if err != nil {
return err
}
return nil
}
func (r myAppReconciler) handlePodUpdate(ctx context.Context, myApp *examplev1alpha1.MyApp, pod *corev1.Pod) error {
// Update the Pod for the MyApp resource
pod.Spec.Containers.Image = myApp.Spec.Image
err := r.client.Update(ctx, pod)
if err != nil {
return err
}
return nil
}
This custom controller watches for changes to MyApp
custom resources and manages the associated Pods. It uses the flux-sdk
library to interact with Flux and its custom resources, and the controller-runtime
library to implement the controller.
The controller defines three handlers for creating, updating, and deleting Pods. These handlers are called by the controller-runtime
library when a MyApp
custom resource is created, updated, or deleted.
The Reconcile
method is the main entry point for the controller. It retrieves the MyApp
custom resource and checks if it is marked for deletion. If it is not marked for deletion, the controller checks if a Pod exists for the resource. If a Pod does not exist, the controller creates a new Pod. If a Pod does exist, the controller updates the Pod if necessary.
Conclusion
Flux is a powerful GitOps toolkit for managing Kubernetes clusters. By defining custom resources and controllers, you can extend the functionality of Flux to integrate with other tools and services, and to automate custom workflows. The flux-sdk
library makes it easy to write custom controllers in Go, and the controller-runtime
library provides a robust framework for implementing controllers. With these tools, you can build a highly automated and scalable platform engineering infrastructure.