Workflows
A workflow groups jobs into a named DAG and declares when it fires — by cron, by upstream completion, by message arrival, or any composite of those.
The flow in ematix-flow is the workflow. It’s the central organising concept of the project: a named DAG of jobs, plus a trigger that says when the whole thing fires.
A workflow groups jobs into a named DAG. The workflow itself declares
when it fires (its trigger). Each member job declares where it sits
inside the DAG (its depends_on).
The minimum surface
from ematix_flow import ematix
@ematix.job(name="extract", target=OrdersExtracted, mode="merge", keys=("order_id",))
def extract(conn): return "SELECT ..."
@ematix.job(name="enrich",
target=OrdersEnriched, mode="merge", keys=("order_id",),
depends_on=["extract"])
def enrich(conn): return "SELECT ..."
@ematix.job(name="aggregate",
target=OrdersByCustomer, mode="merge", keys=("customer_id",),
depends_on=["extract"])
def aggregate(conn): return "SELECT ..."
@ematix.job(name="report",
target=OrdersReport, mode="merge", keys=("customer_id",),
depends_on=["enrich", "aggregate"])
def report(conn): return "SELECT ..."
ematix.workflow(
name="orders_etl",
schedule="*/5 * * * *",
jobs=["extract", "enrich", "aggregate", "report"],
)
Reading the DAG: extract is the root, enrich and aggregate run in
parallel after it, report waits for both. The workflow fires every
five minutes. When it fires, the framework topo-sorts member jobs by
their depends_on and runs the DAG.
The trigger surface
A workflow can declare any combination of trigger conditions. All
declared conditions are AND-conjoined against
last_successful_run_of_self:
| Kwarg | Type | Meaning |
|---|---|---|
triggered_by | list[str] | Workflow/job names that must have succeeded since this workflow last succeeded. |
schedule | str (cron) | Next cron tick after the last self-run must have reached. |
timezone | str (IANA) | Tz the cron is interpreted in. Requires schedule. |
on_message | source | Per-message firing. Exclusive with triggered_by / schedule. |
At least one trigger is required, unless the workflow contains a streaming pipeline — in which case it’s implicitly streaming and the consumer drives execution.
Composite triggers
The interesting case is composite: a workflow that needs both other workflows to have completed AND the cron tick to have reached.
ematix.workflow(
name="evening_combined_report",
triggered_by=["workflow_A", "workflow_B"],
schedule="0 21 * * *",
timezone="America/New_York",
jobs=["extract", "enrich", "aggregate", "report"],
)
What that means in practice:
| State | workflow_A | workflow_B | 21:00 tick | Fires? |
|---|---|---|---|---|
| Normal evening | done at 18:00 | done at 19:30 | reached | yes, at 21:00 |
| B is late | done at 18:00 | done at 22:00 | reached | yes, at 22:00 (immediately) |
| A failed | done at 18:00 | failed | reached | no — waits for A to succeed |
| Both done early | done at 12:00 | done at 14:00 | not yet | no — waits for 21:00 |
The workflow announces “around 9 PM” externally, and the trigger model matches that promise.
Trigger state + Run now
The Workflows tab renders each declared trigger as a coloured-dot pill so you can see which conditions are currently satisfied without reading the run log:
- 🟢 ready — the condition is satisfied right now
- 🟡 pending — waiting for the condition (cron tick in the future, or upstream hasn’t completed since the last self-run)
- 🔴 failed — upstream’s most recent terminal run since the last self-run was a failure, blocking firing
Schedule pills show the actual tick time the next firing is anchored to — “21:00 EDT” if today, “2026-05-22 21:00 EDT” otherwise — in the workflow’s configured timezone.
Every workflow and standalone job has a ▶ Run now button:
- On a workflow card the button opens a modal with checkboxes for each member job. By default all are selected; uncheck to scope the immediate run to a subset. Trigger gates (cron, upstream events) are bypassed for the manual run.
- On a job card (Jobs tab) the button opens a smaller modal with a
“Trigger downstream dependents” checkbox. Off by default — just runs
this one job. On — when this job succeeds, every job that has it in
its
depends_onis also enqueued.
Both paths write a status="requested" rich-history record that the
scheduler picks up on its next tick.
Standalone jobs
A job that isn’t a member of any workflow can declare its own trigger:
@ematix.job(
name="ingest_events",
target=Events, target_connection="warehouse",
schedule="*/5 * * * *",
mode="append",
)
def ingest_events(conn): return "SELECT ..."
These show up on the Workflows tab as kind: "single" cards. Inside a
declared workflow, per-job schedule is ignored (the workflow’s
trigger supersedes); a warning is emitted at registration if both are
set.
Streaming workflows
A workflow that contains an @ematix.streaming_pipeline is implicitly
streaming — the consumer drives execution, so no schedule /
triggered_by / on_message is needed (or allowed) on the workflow.
The UI renders these with the ▶ LIVE STREAMING pill instead of a
trigger summary.