Workflows

A workflow groups jobs into a named DAG and declares when it fires — by cron, by upstream completion, by message arrival, or any composite of those.

The flow in ematix-flow is the workflow. It’s the central organising concept of the project: a named DAG of jobs, plus a trigger that says when the whole thing fires.

A workflow groups jobs into a named DAG. The workflow itself declares when it fires (its trigger). Each member job declares where it sits inside the DAG (its depends_on).

The minimum surface

from ematix_flow import ematix

@ematix.job(name="extract", target=OrdersExtracted, mode="merge", keys=("order_id",))
def extract(conn): return "SELECT ..."

@ematix.job(name="enrich",
            target=OrdersEnriched, mode="merge", keys=("order_id",),
            depends_on=["extract"])
def enrich(conn): return "SELECT ..."

@ematix.job(name="aggregate",
            target=OrdersByCustomer, mode="merge", keys=("customer_id",),
            depends_on=["extract"])
def aggregate(conn): return "SELECT ..."

@ematix.job(name="report",
            target=OrdersReport, mode="merge", keys=("customer_id",),
            depends_on=["enrich", "aggregate"])
def report(conn): return "SELECT ..."

ematix.workflow(
    name="orders_etl",
    schedule="*/5 * * * *",
    jobs=["extract", "enrich", "aggregate", "report"],
)

Reading the DAG: extract is the root, enrich and aggregate run in parallel after it, report waits for both. The workflow fires every five minutes. When it fires, the framework topo-sorts member jobs by their depends_on and runs the DAG.

The trigger surface

A workflow can declare any combination of trigger conditions. All declared conditions are AND-conjoined against last_successful_run_of_self:

KwargTypeMeaning
triggered_bylist[str]Workflow/job names that must have succeeded since this workflow last succeeded.
schedulestr (cron)Next cron tick after the last self-run must have reached.
timezonestr (IANA)Tz the cron is interpreted in. Requires schedule.
on_messagesourcePer-message firing. Exclusive with triggered_by / schedule.

At least one trigger is required, unless the workflow contains a streaming pipeline — in which case it’s implicitly streaming and the consumer drives execution.

Composite triggers

The interesting case is composite: a workflow that needs both other workflows to have completed AND the cron tick to have reached.

ematix.workflow(
    name="evening_combined_report",
    triggered_by=["workflow_A", "workflow_B"],
    schedule="0 21 * * *",
    timezone="America/New_York",
    jobs=["extract", "enrich", "aggregate", "report"],
)

What that means in practice:

Stateworkflow_Aworkflow_B21:00 tickFires?
Normal eveningdone at 18:00done at 19:30reachedyes, at 21:00
B is latedone at 18:00done at 22:00reachedyes, at 22:00 (immediately)
A faileddone at 18:00failedreachedno — waits for A to succeed
Both done earlydone at 12:00done at 14:00not yetno — waits for 21:00

The workflow announces “around 9 PM” externally, and the trigger model matches that promise.

Trigger state + Run now

The Workflows tab renders each declared trigger as a coloured-dot pill so you can see which conditions are currently satisfied without reading the run log:

  • 🟢 ready — the condition is satisfied right now
  • 🟡 pending — waiting for the condition (cron tick in the future, or upstream hasn’t completed since the last self-run)
  • 🔴 failed — upstream’s most recent terminal run since the last self-run was a failure, blocking firing

Schedule pills show the actual tick time the next firing is anchored to — “21:00 EDT” if today, “2026-05-22 21:00 EDT” otherwise — in the workflow’s configured timezone.

Every workflow and standalone job has a ▶ Run now button:

  • On a workflow card the button opens a modal with checkboxes for each member job. By default all are selected; uncheck to scope the immediate run to a subset. Trigger gates (cron, upstream events) are bypassed for the manual run.
  • On a job card (Jobs tab) the button opens a smaller modal with a “Trigger downstream dependents” checkbox. Off by default — just runs this one job. On — when this job succeeds, every job that has it in its depends_on is also enqueued.

Both paths write a status="requested" rich-history record that the scheduler picks up on its next tick.

Standalone jobs

A job that isn’t a member of any workflow can declare its own trigger:

@ematix.job(
    name="ingest_events",
    target=Events, target_connection="warehouse",
    schedule="*/5 * * * *",
    mode="append",
)
def ingest_events(conn): return "SELECT ..."

These show up on the Workflows tab as kind: "single" cards. Inside a declared workflow, per-job schedule is ignored (the workflow’s trigger supersedes); a warning is emitted at registration if both are set.

Streaming workflows

A workflow that contains an @ematix.streaming_pipeline is implicitly streaming — the consumer drives execution, so no schedule / triggered_by / on_message is needed (or allowed) on the workflow.

The UI renders these with the ▶ LIVE STREAMING pill instead of a trigger summary.