Training Operators — distributed training on Kubernetes
Run multi-worker PyTorch, TensorFlow, and MPI jobs on Kubernetes with the unified Kubeflow Training Operator — CRDs, rendezvous, gang scheduling, elastic training.
Module 05 covered notebooks and the pipeline runtime. Both run a single Python process per pod. That works fine for prototyping; it stops working the moment your model needs more GPU memory than one node has, or the dataset needs more cores than one node has.
This module is about the next step: declaring a distributed training job and letting Kubernetes execute it. The Kubeflow Training Operator is the piece that turns “five pods that need to find each other and exchange gradients” into a single CR you kubectl apply.
The distributed-training problem on Kubernetes
Single-node training is python train.py. Multi-node training is several processes that need four things from the platform:
- Rendezvous. Worker 0 must publish an address; workers 1..N must find it.
- Shared storage. Datasets and checkpoints have to be reachable from every worker.
- Gradient exchange. Each step, workers all-reduce or send gradients to a parameter server.
- Fault tolerance. A node dies; the job has to either survive or restart cleanly from a checkpoint.
Hand-rolling this on Kubernetes is painful. You’d write a StatefulSet for the workers, a headless Service so pods can resolve each other, an init container that waits for rank-0 to come up, a ConfigMap with MASTER_ADDR, and a livenessProbe that doesn’t false-fire during a multi-minute training step. Then you’d do all of it again for TensorFlow’s coordinator/worker shape. Then again for MPI’s launcher/worker shape.
The Training Operator wraps those conventions in CRDs. You declare what the job is; the controller handles the Service, the env injection, the lifecycle, the status conditions. The CRDs are framework-flavoured — PyTorchJob, TFJob, MPIJob — because the conventions differ by framework, but you only ever look at one operator pod’s logs.
The unified Training Operator
Pre-v1.5 Kubeflow shipped a separate operator per framework: tf-operator, pytorch-operator, mpi-operator, mxnet-operator, and so on. Each was its own deployment, its own webhook, its own metrics endpoint, and its own bug surface. Cluster admins maintained five things.
Since v1.5, a single Training Operator pod multiplexes all of them. The controller watches every kind: PyTorchJob, TFJob, MPIJob, MXJob, XGBoostJob, PaddleJob, JAXJob. The reconciliation logic per kind is still distinct — a PyTorch job is not a TF job — but they share lifecycle, status conditions, gang-scheduling integration, and the runPolicy block.
What this means in practice: when a job hangs, you tail one controller’s logs, not five. When you upgrade Kubeflow, one operator moves. When you write RBAC for a tenant team, one ClusterRole covers every job type. The original per-framework operator repos still exist on GitHub, but the unified training-operator repo (github.com/kubeflow/training-operator) is the live one.
Reading the diagram: the user applies a PyTorchJob; the Training Operator watches the CR, creates the master + worker pods with rendezvous env injected, the workers establish their c10d rendezvous, and every pod reads/writes to shared storage for the dataset and checkpoints.
A PyTorchJob walkthrough
The canonical multi-worker PyTorch job: one master, two workers, GPUs on each, DDP for gradient exchange.
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: resnet50-ddp
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: registry.lab/ml/resnet50-train:v3
command: ["python", "train.py", "--epochs=20"]
resources:
limits: { nvidia.com/gpu: 1 }
Worker:
replicas: 2
template:
spec:
containers:
- name: pytorch
image: registry.lab/ml/resnet50-train:v3
command: ["python", "train.py", "--epochs=20"]
resources:
limits: { nvidia.com/gpu: 1 }
Three things to note. First, pytorchReplicaSpecs.Master is one replica by convention — DDP needs exactly one rank-0 coordinator. Second, the worker template is identical to the master apart from the role; PyTorch DDP runs the same code in every process. Third, you don’t set MASTER_ADDR / MASTER_PORT / RANK / WORLD_SIZE in the spec — the operator injects them into each pod’s environment based on the replica’s role and index, and PyTorch’s torch.distributed.init_process_group() picks them up.
The headless Service the operator creates resolves <jobname>-master-0.<jobname> and <jobname>-worker-<i>.<jobname>, which is what the rendezvous handshake uses. If you oc get svc -n <ns> after applying, you’ll see it.
PyTorch elastic training
The job above assumes every worker stays alive for the duration. On spot/preemptible GPUs that assumption breaks the first time a node disappears. PyTorch’s answer is elastic training: workers can join and leave; the rendezvous backend (c10d or etcd) handles membership; the training loop restarts from the last checkpoint when membership changes.
The Training Operator surfaces this via spec.elasticPolicy:
spec:
elasticPolicy:
minReplicas: 2
maxReplicas: 4
rdzvBackend: c10d
maxRestarts: 100
pytorchReplicaSpecs:
Worker:
replicas: 4
template: { ... }
There is no Master block in elastic mode — every pod is symmetric, rank assignment happens through rendezvous at startup. The operator launches up to maxReplicas workers; as long as at least minReplicas are alive, the job keeps training. If a node dies and only two workers remain, the surviving pair rendezvouses afresh, picks up the last checkpoint, and continues at half the throughput. When the operator schedules a replacement, the rendezvous joins it at the next step boundary.
The piece you still have to write: rank-0 checkpointing every N steps to durable storage. Without that, elasticity buys you nothing — every membership change loses progress to the last full checkpoint.
TFJob
TensorFlow’s distribution conventions are older and have more shapes. TFJob exposes them via spec.tfReplicaSpecs:
| Role | What it does |
|---|---|
| Chief | The coordinator. One replica. Runs the training loop and writes checkpoints. |
| Worker | Additional replicas that participate in gradient computation. |
| PS | Parameter server. Holds variables, receives gradients, applies updates. Legacy. |
| Evaluator | Reads checkpoints, runs eval, writes summaries. Optional. |
The big shift in modern TensorFlow is away from PS. tf.distribute.experimental.ParameterServerStrategy still works, but MultiWorkerMirroredStrategy (all-reduce, no PS) is now the default for synchronous data-parallel training and matches PyTorch DDP in shape. A modern TFJob has Chief + Workers and no PS spec at all; the all-reduce happens between chief and workers directly.
The chief is special only in that it owns checkpoint writes and summary writes. If you have three replicas with PS gone, that’s one chief plus two workers, all doing identical compute, all participating in the same MultiWorkerMirroredStrategy.run().
MPIJob
For workloads that use MPI primitives directly — Horovod, NCCL allreduce called from C++, classical HPC libraries — the rendezvous shape is different. There’s a single launcher process that SSHes (or mpiruns) into a fixed list of worker hosts and starts the per-host processes.
MPIJob matches that. The operator creates one Launcher pod and N Worker pods, generates an MPI_HOSTFILE ConfigMap listing the worker DNS names, and mounts it into the launcher. The launcher runs mpirun -np N --hostfile /etc/mpi/hostfile python train.py. The workers are mostly passive — they sit and wait for SSH connections from the launcher.
Two operational notes. The Launcher image needs openssh-client and an SSH keypair (the operator generates the keypair and injects it). The Worker image needs openssh-server. Public Horovod images already ship both; if you build your own, copy them in. The second note: MPIJob v2 (under kubeflow.org/v2beta1) uses mpirun over the Kubernetes API instead of SSH, which removes the openssh dependency. New jobs should target v2.
Resource scheduling — Volcano vs Coscheduling vs default
The default Kubernetes scheduler is not gang-scheduling aware. It schedules pods one at a time, in priority order. Apply a PyTorchJob asking for five GPUs across five workers in a cluster with four GPUs free, and the scheduler will happily place four pods and leave the fifth pending. The four placed pods sit at the rendezvous step waiting for rank 4 to join — forever. The job appears Running from the operator’s point of view; the workload deadlocks silently.
Two production fixes ship with Kubeflow’s manifests:
- Volcano. A separate scheduler that supports
PodGroupsemantics — “schedule these N pods all-or-nothing.” Training Operator integrates viaspec.runPolicy.schedulingPolicyorspec.runPolicy.queue; the operator creates a PodGroup withminMember=Nand Volcano refuses to start any pod until all N can land. Kubeflow’s installation manifests recommend Volcano for production. - Coscheduling plugin. The upstream
kube-schedulerscheduler-plugins repo ships a coscheduling plugin that does the same thing with less surface area. Lighter footprint, fewer features.
Either works. The trap is running neither — the operator does not gang-schedule on your behalf with the default scheduler. If your jobs ever wedge in Created or Running with one pod Pending and a quiet rendezvous, install Volcano.
Storage for distributed training
Two patterns dominate.
Pattern A — shared PVC (ReadWriteMany). A single PVC backed by NFS, CephFS, EFS, or GPFS; every worker mounts the same volume read-write. Datasets live there; rank-0 writes checkpoints there. Simplest model and matches what people did on bare-metal HPC for decades. Downside: ReadWriteMany requires a storage class that supports it — block storage doesn’t, and not every cluster ships an NFS-backed one. The other downside is checkpoint contention: every rank writing simultaneously to the same FS amplifies write load and can hit filesystem-level locking.
Pattern B — S3-backed. Datasets and checkpoints live in object storage (S3, MinIO, GCS). Workers read shards directly via boto3/s5cmd/goofys/mountpoint-s3, or via a FUSE mount that exposes a bucket as a filesystem. The training code is unchanged because the FUSE mount looks like a directory. Faster than NFS for many-reader workloads, no contention because every worker pulls independently, and air-gap-friendly with MinIO.
The pattern most modern setups land on is hybrid: dataset shards in S3 (read-only, sharded by rank to avoid all workers reading the same bytes), checkpoints written by rank 0 only to S3, the optimiser state pickle written to the local node SSD for hot-resume and only periodically uploaded.
Spot instances and fault tolerance
Cheap GPU capacity is preemptible. AWS spot, GCP preemptible, on-prem reclaim — same idea, hours of training cost cut by 60–80% as long as you can survive the eviction.
Elastic PyTorch (above) survives single-worker preemption out of the box, provided you checkpoint every N steps to durable storage. The numbers that matter: checkpoint frequency vs preemption frequency. If spot evictions average every 90 minutes and a checkpoint takes 60 seconds to write, checkpointing every 10 minutes loses at most 10 minutes of work per eviction. Checkpointing every step is overkill and serialises GPU work behind storage I/O.
spec.runPolicy.cleanPodPolicy controls what happens to pods after job termination. None keeps everything, Running reaps only succeeded pods (useful — you can still kubectl logs the failed ones), All cleans up everything. For debugging, Running is the right default. Production reporting tools want All so completed jobs don’t pile up.
Job lifecycle and monitoring
The CR’s status block tells you what’s happening. Conditions march through Created → Running → Succeeded (or Failed/Restarting). replicaStatuses shows per-role active/succeeded/failed counts. kubectl get pytorchjob -A -o wide is the operational query.
spec.runPolicy.backoffLimit is how many times the operator restarts the whole job on failure. Set it to a small number — 3 to 5 — because if your job is failing at step 0 every time, retrying 50 times won’t help, it’ll just waste GPU hours.
The Central Dashboard surfaces a Jobs view that reads the CRs and renders progress. For tenants who don’t have console access, kubectl describe pytorchjob <name> is enough; the events section captures pod scheduling failures and operator decisions.
GPU sharing patterns
A single A100 or H100 has 40–80 GB of HBM, which is more than many training jobs need. Three patterns let multiple jobs share one card:
| Pattern | Isolation | Density | When |
|---|---|---|---|
| MIG | Hardware-isolated partitions | Up to 7 instances per A100, 7 per H100 | Production multi-tenant; predictable QoS. |
| Time-slicing | None (cooperative) | Unlimited “virtual GPUs” | Dev/research; jobs polite to each other. |
| MPS | Soft (process-level) | Up to ~48 contexts | One trusted workload group; latency-sensitive. |
MIG (Multi-Instance GPU) is the right answer for production with untrusted tenants — hardware partitions mean one tenant’s OOM cannot affect another. Time-slicing gives the highest density at the cost of zero isolation; a hostile or buggy job evicts everyone else. MPS sits in the middle. The NVIDIA GPU Operator configures all three via the ClusterPolicy CR. Pick MIG unless you have a specific reason.
Try this
- Write a
PyTorchJobspec for 1 master + 4 workers, each requesting 1 GPU, running a stock DDP image. Apply it;oc get pytorchjob -n <ns> -wand watch the conditions march. - Add
runPolicy.backoffLimit: 3andrunPolicy.cleanPodPolicy: Runningto the spec. Force a failure (exit 1in the command). Verify the operator retries up to 3 times and leaves the failed pods behind for log inspection. - Convert the job to elastic with
minReplicas: 2,maxReplicas: 4.oc delete pod <one-worker>mid-training; the surviving workers should rendezvous afresh and continue, and the operator should schedule a replacement.
Common failure modes
Job stuck Created forever, one pod Pending. Gang scheduling is not active. Install Volcano (or coscheduling) and recreate the job. Default kube-scheduler will keep half-scheduling forever.
Workers cannot reach the master. MASTER_ADDR resolves to the headless Service DNS name; if that doesn’t work, kube-dns / CoreDNS is broken in the namespace, or a NetworkPolicy is blocking pod-to-pod within the job. kubectl exec into a worker and nslookup <jobname>-master-0.<jobname>.
NCCL deadlock at step 0. The classic cause is CUDA/NCCL version mismatch across workers — workers are running different driver versions because the image was built against one CUDA and deployed to nodes with a different driver. Pin one image, set imagePullPolicy: Always, and verify every worker pod is using the same digest.
Checkpoint write fails on shared PVC. Filesystem-level locking on NFS bites hard when every rank writes the same path. Switch to rank-0-only checkpointing (the standard PyTorch DDP idiom) or move checkpoints to S3.
Job runs but throughput is half of expected. A node landed in a non-NVLink, non-IB topology and is doing all-reduce over TCP. Look at nvidia-smi topo -m from a worker pod; if you see SYS or NODE links between GPUs, the operator placed your pods badly. Use Volcano’s topology-aware-scheduling or a nodeSelector to pin pods to NVLink-connected GPUs.
References
- Kubeflow Training docs —
https://www.kubeflow.org/docs/components/training/ - Training Operator on GitHub —
https://github.com/kubeflow/training-operator - PyTorch Distributed (DDP) —
https://pytorch.org/docs/stable/notes/ddp.html - PyTorch elastic / torchrun —
https://pytorch.org/docs/stable/elastic/run.html - TensorFlow
MultiWorkerMirroredStrategy—https://www.tensorflow.org/api_docs/python/tf/distribute/MultiWorkerMirroredStrategy - Horovod —
https://horovod.ai/ - Volcano scheduler —
https://volcano.sh/ - NVIDIA GPU Operator (MIG, time-slicing, MPS) —
https://docs.nvidia.com/datacenter/cloud-native/gpu-operator/
Next: Module 07 — Katib covers the next problem once your training job runs: which hyperparameters to use.