Write and run a pipeline¶
A pipeline is a small YAML file describing a deterministic, multi-step control flow. This guide walks through writing one, dropping it into your project, and invoking it — plus the ad-hoc, no-registration alternative for a one-off procedure an agent generates on the fly. For the full grammar and invocation-tool reference, see the Pipeline DSL reference; for the why/architecture, see Pipelines.
1. Write the pipeline¶
Create a pipelines/ directory at your project root (the default scan
directory) and drop in a *.yaml file. This one takes a name, greets it,
and writes the greeting to a file:
# pipelines/greet.yaml
pipeline: greet
description: Greet a name and write the greeting to a file.
steps:
- transform: {value: "'Hello, ' + ctx.name + '!'", output: greeting}
- tool: {name: file__write, args: {path: "greeting.txt", content: !expr greeting}, output: written}
A few things worth noting about this file:
- The pipeline registers under the name in its
pipeline:key (greet), not the file name —pipelines/greet.yamlcould be renamed to anything and still register asgreet. - Each step is a single-key mapping naming its kind (
transform,tool,agent, or one of the compositional primitives). ctx.nameis the seed input this pipeline expects;greeting, once written by the first step'soutput, becomes available asctx.greetingin later steps — here referenced bare asgreetinginside the!exprstring-concat, since the second step's context still exposes it as a named store.!exprmarkscontentas an expression to evaluate, not a literal string — see Literals vs!expr.
shell is not yet functional at runtime
This guide uses file__write rather than the shell step-kind sugar
deliberately — shell currently parses but fails at every invocation
(no "shell" tool is registered yet). See the
reference doc's warning
for details; use a tool step naming a real registered tool/action
instead until this closes.
2. Start (or restart) the session¶
Pipelines are registered from disk at session start — there's no separate
"install" step and no reyn.yaml entry required for the default pipelines/
directory. Restart your session (or start a fresh one) and greet is
registered.
If a file fails to parse, or two files declare the same pipeline: name,
session start fails loudly, naming the offending file — a typo never
silently drops a pipeline you meant to ship. See
Pipeline registration § Failure behavior
for the full table.
3. Invoke it¶
An agent can launch greet either through the plain tool call:
or the qualified catalog verb the action catalog surfaces for every registered pipeline:
Both block until the pipeline finishes and return its final output — here,
the file__write result confirming greeting.txt was written. Live
step-progress is visible in the TUI for the
duration of the run, and Ctrl-C stops it cleanly at the next step boundary
rather than killing it mid-step.
Sync vs async¶
If the procedure is long-running and you don't want to block on it, use the async form instead:
This returns {status: "started", run_id: "..."} immediately; the result
arrives later as a [pipeline] message in your conversation. Use run_pipeline
when you want the result inline and are fine waiting; use run_pipeline_async
for a fire-and-forget launch. Both are equally crash-recoverable — a process
restart mid-run resumes exactly where it left off rather than re-running
completed steps (see Pipelines § Crash recovery).
4. Ad-hoc, no-registration alternative¶
Sometimes a procedure is one-off — worth writing as a pipeline for its
crash-recovery and structural safety properties, but not worth registering as
a file. run_pipeline_inline (and its async counterpart
run_pipeline_inline_async) take the same DSL as a pipeline: document, but
as a string an agent generates at call time:
run_pipeline_inline(
definition="""
pipeline: adhoc_greet
steps:
- transform: {value: "'Hi, ' + ctx.name", output: greeting}
""",
input={name: "Reyn"},
)
The definition is parsed and run through a static-analysis gate — schema
references resolve, tool names resolve, no step launches another pipeline or
delegates, and any agent step runs only under the invoker's own identity —
before anything is spawned. A bad definition fails clearly and spawns
nothing; a good one is exactly as crash-recoverable as a registered pipeline,
since its full definition travels with the run's own recovery state. See
Ad-hoc inline launch
for the complete gate checklist.
A worked end-to-end example: fan out then merge¶
A slightly larger example putting for_each and match to use — review a
document with several reviewers in parallel, then branch on whether they all
agreed:
# pipelines/review.yaml
pipeline: review
description: Fan a document out to reviewers, then branch on the verdict.
steps:
- for_each:
over: ctx.reviewers
max_parallel: 4
on_error: "retry(1)"
do:
agent:
prompt: "Review this document as {item}: {ctx.doc}. Reply with passed (bool) and notes (string)."
schema: Review
collect: {transform: {value: "pipe"}}
output: reviews
- transform: {value: "all(reviews, r -> r.passed)", output: all_passed}
- match:
on: all_passed
cases:
"True": {pipeline: report_pass, pass: [reviews]}
"False": {pipeline: report_fail, pass: [reviews]}
output: report
---
schema: Review
fields:
passed: {type: bool}
notes: {type: string}
Launch it with a list of reviewer identities and a document:
Each reviewer runs as an isolated, concurrent agent step (up to 4 at once,
each retried once on failure); once all have landed, all_passed folds them
into one boolean via the R1 all() combinator, and match routes to a
report_pass or report_fail sub-pipeline accordingly (both would need to
be registered separately, or replaced with plain transform/tool steps for
a self-contained single-file version).