~75 min read · updated 2026-05-12

Pipelines (advanced): control flow, parallelism, schedules, lineage

Move beyond linear DAGs — Condition, ParallelFor with Collected, nested pipelines, scheduled retraining, secrets, multi-tenant runs, lineage for compliance, and the bottlenecks you'll hit at scale.

A two-step train-then-eval pipeline is the warm-up. Real pipelines have branches that fire only when a metric passes a threshold, fan out over a hyperparameter grid and aggregate the results, run on a cron, pull secrets without leaking them into the metadata DB, and write enough lineage that an auditor in 18 months can answer “where did this prediction come from?” without calling you on a Sunday.

This module is the v2 features that get you there — and the bottlenecks that show up the first time you push the pipeline harder than a hello-world.

Control flow — dsl.Condition

Gate a step on a previous step’s output. The common shape: only deploy a model if eval accuracy exceeds a threshold.

from kfp import dsl
from kfp.dsl import Metrics, Output

@dsl.component
def evaluate(model_path: str, metrics: Output[Metrics]) -> float:
    acc = compute_accuracy(model_path)
    metrics.log_metric("accuracy", acc)
    return acc

@dsl.pipeline(name="train-eval-deploy")
def pipeline(threshold: float = 0.85):
    train_task = train_model()
    eval_task  = evaluate(model_path=train_task.outputs["model_path"])
    with dsl.If(eval_task.output >= threshold):
        deploy_model(model_path=train_task.outputs["model_path"])
    with dsl.Else():
        alert_team(reason="accuracy below threshold")

A few rules worth knowing. One: the v2 idiom is dsl.If / dsl.Elif / dsl.Else. Earlier v2 betas used dsl.Condition; both exist for now, but the If/Else form is what the docs recommend. Two: the condition expression has to reduce to a primitive comparable value at run time — you can compare task outputs to constants, or to pipeline parameters. You can’t run arbitrary Python in the condition; the compiler has to translate it to the IR. Three: a step inside a dsl.If only runs when the condition is true. Its downstream consumers in the outer pipeline have to handle the case where it didn’t produce its output. KFP marks the downstream step as “skipped” instead of “failed” in that case, but only if the dependency is properly typed.

Parallel for-loops — dsl.ParallelFor

The fan-out primitive. Iterate over a list, running N copies of a step concurrently. Useful for HPO-lite, cross-region evaluation, multi-seed training:

@dsl.pipeline(name="multi-seed-train")
def pipeline():
    prep = prepare_data()
    seeds = [1, 2, 3, 4, 5]
    with dsl.ParallelFor(items=seeds, parallelism=3) as seed:
        train_model(
            dataset=prep.outputs["output_dataset"],
            seed=seed,
        )

parallelism=3 caps concurrent execution at 3 — useful when you don’t want 50 simultaneous GPU asks to overwhelm the scheduler. Without it, KFP launches all iterations at once and the cluster decides. For GPU work, always set parallelism to the number of GPUs you’re willing to commit to this run.

The iterated value can be a static list, a pipeline parameter, or — more interestingly — the output of an upstream step. A step that returns List[str] becomes the items the ParallelFor iterates over. That’s how you build a pipeline whose fan-out width is decided at run time.

Loops with Collected — gather outputs

ParallelFor produces N parallel branches; dsl.Collected aggregates their outputs into a single downstream step’s input. The shape is “train N models, pick the best, deploy it”:

from kfp.dsl import Collected, Model, Input

@dsl.component
def pick_best(models: Input[List[Model]]) -> str:
    return max(models, key=lambda m: float(m.metadata["accuracy"])).uri

@dsl.pipeline(name="multi-seed-pick-best")
def pipeline():
    prep = prepare_data()
    with dsl.ParallelFor(items=[1,2,3,4,5], parallelism=3) as seed:
        train = train_model(dataset=prep.outputs["output_dataset"], seed=seed)
    best_uri = pick_best(models=Collected(train.outputs["model"]))
    deploy_model(model_uri=best_uri.output)

Collected(...) is what tells the compiler “this isn’t a single artifact; it’s the list of artifacts produced by each iteration of the enclosing ParallelFor.” The downstream pick_best step receives List[Model] and can decide which one wins. This is the canonical pattern for any “train many, deploy one” loop — much cleaner than rolling your own array-of-runs juggling.

Reading the diagram: prepare_data emits one dataset; ParallelFor fans out 5 training runs over different seeds, capped at 3 concurrent; Collected[Model] aggregates the 5 models into pick_best, which selects one; a Condition gates the deploy on accuracy and routes failures to an alert step. That single DAG covers the four advanced features stacked.

Nested pipelines

A @dsl.pipeline-decorated function can itself be used as a component inside another pipeline. The shape is pipeline-as-component:

@dsl.pipeline(name="train-and-eval")
def train_and_eval(input_path: str, threshold: float):
    prep = prepare_data(input_path=input_path)
    train = train_model(dataset=prep.outputs["output_dataset"])
    eval_ = evaluate(model_path=train.outputs["model_path"])
    return eval_.output  # accuracy

@dsl.pipeline(name="orchestrator")
def outer(paths: List[str]):
    with dsl.ParallelFor(items=paths) as p:
        train_and_eval(input_path=p, threshold=0.85)

Why this matters: a team library can export named pipelines (train_and_eval, deploy_to_staging, monitor_drift) and other teams compose them. Reuse stops being copy-paste. The compiled IR includes the nested pipeline expanded inline, so there’s no run-time penalty.

Scheduled pipelines

Cron-style schedules via ScheduledWorkflow (the underlying CR on Argo-backed installs) or the UI’s Recurring Run feature. The common case: retrain weekly from the latest snapshot.

In the UI, you pick a pipeline, give it parameter values, and set a cron expression. KFP creates a ScheduledWorkflow and the scheduledworkflow-controller pod fires runs on the schedule.

From the SDK:

from kfp import Client
client = Client()
client.create_recurring_run(
    experiment_id=client.get_experiment(experiment_name="churn-retrain").experiment_id,
    job_name="churn-retrain-weekly",
    cron_expression="0 2 * * 0",  # Sundays 02:00 UTC
    pipeline_id=client.get_pipeline_id("churn-train"),
    params={"input_path": "s3://bucket/churn-latest.csv"},
)

Watch out for: the scheduler is a separate controller. If scheduledworkflow-controller is CrashLoopBackOff (most often a MariaDB connection error or a schema-version mismatch after a KFP upgrade), scheduled runs don’t fire and the UI doesn’t tell you why. Always health-check the controller after any KFP upgrade.

Pipeline secrets and credentials

The trap is putting secrets in pipeline parameters. KFP records parameter values in MLMD plaintext — your DB password becomes a row in the metadata table that any auditor with SELECT access reads in the clear. Don’t.

Three patterns that work:

  • Env var from a Secret in the step’s namespace. Reference an existing Secret and project a key into an env var:

    @dsl.component
    def pull_from_db(...):
        import os
        pw = os.environ["DB_PASSWORD"]
        ...
    
    pull = pull_from_db(...)
    pull.set_env_variable(name="DB_PASSWORD", value=dsl.SecretAsEnv(
        secret_name="db-creds", secret_key="password",
    ))
  • ServiceAccount token mount. Set kubernetes_service_account_name="my-sa" on the task; the SA’s automounted token is at the pod’s standard /var/run/secrets/kubernetes.io/serviceaccount/token and you can use it to call other in-cluster APIs (S3 via IRSA-style mappings, Vault via Kubernetes auth, etc.).

  • ESO-materialised Secret. The External Secrets Operator pulls from Vault / AWS Secrets Manager / GCP Secret Manager and creates a Secret in the namespace. The pipeline step references it by name. The pipeline definition never touches the secret value; only the namespace’s RBAC has access.

The right default in a regulated environment is the third one — secrets live in Vault, ESO syncs them on a schedule, the pipeline composes them in by reference. The pipeline definition is safe to commit to Git.

Multi-tenant pipelines

A pipeline run lives in a Kubernetes namespace — by default the Profile namespace of the user who submitted it. Profile Controller pre-provisions a kfp-launcher ServiceAccount and the RoleBindings KFP needs in that namespace, so Alice’s pipeline runs as Alice’s tenant and can only touch what Alice can touch.

Cross-namespace work — a step in team-a reading from team-b/datasets — is not free:

  • The step’s SA needs explicit get/list on the relevant resources in team-b.
  • If you’ve enforced Istio mTLS with strict cross-namespace AuthorizationPolicy, you need an AuthorizationPolicy rule in team-b allowing the source SA.
  • The KFP launcher’s image needs to be reachable from team-a’s pull secrets (it usually is, since it comes from the platform registry).

Don’t paper this over with a global cluster-admin RoleBinding. The cost of getting the cross-namespace RBAC right once is much less than the cost of the audit finding when a single compromised pipeline can read every team’s data.

Metadata and lineage

Every pipeline run writes structured events into ML Metadata: which step produced which artifact, which artifact was consumed by which downstream step, what parameters were in scope. The Central Dashboard’s Runs → Artifacts view renders this as a clickable graph.

This is the property compliance cares about most. A financial-services or healthcare ML deployment has to be able to answer, for any inference the model produced:

  • Which model version generated this prediction?
  • Which training run produced that model?
  • Which training dataset version was used?
  • Which code version (Git SHA, container digest) produced the dataset?
  • Which human approved the deployment of the model?

KFP’s MLMD captures the first four cleanly if you discipline the pipeline to use typed Dataset / Model artifacts and include the Git SHA / image digest as artifact metadata. The fifth (human approval) is outside KFP and lives in your change-management tooling.

For BFSI readers in particular, this is the spine of model risk management (FFIEC, SR 11-7, BCBS 239 equivalents). The lab’s BFSI readiness page covers the broader compliance shape — see /docs/openshift-platform/foundations/bfsi-readiness-review — but the pipeline-level discipline is what makes the lineage credible in the first place.

CI/CD for pipelines

GitOps the pipeline definition. The flow that holds up:

  • Pipeline source in git — Python files with @dsl.component and @dsl.pipeline decorators.
  • CI compiles on every merge — python -m kfp_pipeline.py emits pipeline.yaml; CI uploads it via kfp pipeline create --pipeline-version to a stable named pipeline in the KFP backend.
  • A scheduled run (or an Argo Events trigger on the artifact-bucket landing of new data) submits a run of the latest pipeline version with the right parameters.
  • Pipeline version pinning — every model in production is bound to a specific pipeline version, not “latest.” When the model misbehaves, you have a deterministic re-run path.

Argo CD or Tekton can both hold the wire here. The simplest shape is a GitHub Actions / GitLab CI job that calls kfp pipeline upload-version on push to main, plus a recurring run pinned to a named pipeline. No special CI tooling required.

Pipeline observability

Each step is a pod; each pod’s stdout/stderr goes to the cluster’s log aggregator (Loki on the lab, CloudWatch on EKS, Stackdriver on GKE). The KFP UI’s per-step log view is a thin wrapper over Kubernetes’ pod-log API; for retention past pod termination, you need the aggregator.

For metrics, KFP exports run-status counters and a duration histogram from the API server (Prometheus-scrapeable). Per-step custom metrics — “this training step’s epoch loss” — go through the Metrics artifact type:

@dsl.component
def train(metrics: Output[Metrics]):
    for epoch in range(epochs):
        loss = step()
        metrics.log_metric(f"loss_epoch_{epoch}", loss)

The KFP UI’s Metrics tab renders these; you can also export them to a longer-term store via the MlflowTrackingURI / TensorBoard wiring KFP supports.

Tracing: KFP doesn’t emit OpenTelemetry spans out of the box. If your platform mandates a single tracing pane (OpenTelemetry → Tempo / Jaeger / Honeycomb), wire in spans from your Python code with the OTel SDK and an OTLP exporter pointed at your collector. The pipeline-level span tree is yours to design.

Performance — common bottlenecks

BottleneckSymptomFix
Object-storage bandwidthTraining step spends minutes downloading dataset before doing useful workUse a local cache PVC; pre-stage data with a one-time DataLoader step; switch to streaming reads
Image-pull timeEach step’s pod sits in ContainerCreating for a minute on a cold nodePin to a custom base image with packages_to_install baked in; pre-warm images on training nodes
Single-VM MinIOAt ~500 ops/s artifact writes start queuing; long pipelines slowMove to a real S3-compatible store or scale MinIO to a multi-node deployment
MLMD DB contentionscheduledworkflow-controller and the API server compete on the same MariaDBRun MariaDB with sufficient connections and IOPS; for large fleets, split lineage DB from KFP-internal DB
ParallelFor fan-out exhausts schedulerparallelism=20 runs with 20 GPU asks; nothing schedules; runs sit PendingAlways set parallelism= to capacity you have, not capacity you want

The single most common one in the wild is image pull. Build a custom base image; pin it to a digest; pre-warm it. Caching alone makes most pipelines feel an order of magnitude snappier.

The lab posture

Kubeflow isn’t installed today. The track is portable: KFP IR runs on open-source Kubeflow, Google Vertex AI Pipelines, AWS SageMaker for KFP, and Azure ML pipelines without rewriting your @dsl.component decorators. The integration points that vary — object storage, OIDC IdP, secret manager — are the ones you’d customise per cluster anyway.

Try this

1. Add a dsl.If to your three-step pipeline. Skip deploy if eval accuracy is < 0.80. Submit two runs — one with synthetic data that produces a good model and one with random data — and confirm the deploy step runs in the first case and is skipped in the second.

2. Fan out training over 5 seeds. Use dsl.ParallelFor(items=[1,2,3,4,5], parallelism=3) and aggregate the resulting models with Collected[Model] into a pick_best step. Watch the KFP UI render the fan-out tree. Tune parallelism to your GPU capacity.

3. Schedule the pipeline to run nightly at 02:00 UTC. From the SDK, call client.create_recurring_run(...) with cron_expression="0 2 * * *". Confirm in the UI that the recurring run appears with the right next-run timestamp, and that runs created by it tag back to the same pipeline version.

Common failure modes

ParallelFor with parallelism=20 OOMs the cluster. The GPU scheduler can’t satisfy 20 simultaneous asks, but the pods don’t fail — they sit Pending and you wait. Reduce parallelism to a number your cluster can actually run; if you want true fan-out, expand the GPU pool first.

Conditional steps don’t fire as expected. KFP v2’s expression evaluation is strict. A condition like with dsl.If(eval_task.output == "True") will not match the boolean True returned by the upstream — strings are not booleans. Print the value of the conditional expression as the eval step’s stdout and compare it byte-for-byte to what you wrote in the condition.

Scheduled run doesn’t trigger. kubectl get pod -n kubeflow -l app=scheduledworkflow — if the controller is CrashLoopBackOff, scheduled runs are dead. The most common cause is a MariaDB schema-version mismatch after a KFP upgrade (the controller’s expected schema is one version ahead of what’s deployed). Fix: run the KFP DB migration job; the operator’s release notes name the right one.

Collected[Model] is empty in the downstream step. Almost always: one of the ParallelFor iterations failed (an OOM, a flaky external service) and KFP excluded its output from the Collected list. The downstream step gets len(models) < expected. Either make the training step retry on transient failures, or check the list length in the downstream step and degrade gracefully.

Nested pipeline compiles but the parent run errors with “no such input”. The parent passed an output that the nested pipeline doesn’t declare as an input. KFP’s typed IR is strict — the nested pipeline’s @dsl.pipeline-decorated function signature is the contract. Add the missing parameter to the nested pipeline’s signature and recompile.

A pipeline run leaks an API key into the metadata DB. You passed it as a pipeline parameter instead of as an env-var-from-Secret. Rotate the key, then audit MLMD: SELECT * FROM ParameterValue WHERE value LIKE 'sk-%'. Wire the secret through set_env_variable(value=dsl.SecretAsEnv(...)) and never accept the parameter shape for credentials in code review again.

Where this is heading

You now have the full pipeline toolkit: linear DAGs, conditionals, parallel fan-out with aggregation, nested reuse, schedules, secrets done properly, and the lineage discipline an auditor will accept. The next module shifts from “how do I orchestrate?” to “how do I actually train a model that doesn’t fit on one node?” — the Training Operators that wrap distributed PyTorch / TensorFlow / MPI as Kubernetes CRDs.

Next: Module 06 — Training Operators — distributed training via PyTorchJob, TFJob, MPIJob, gang scheduling, elastic training, and the patterns for surviving spot-node preemption.

References