Scheduling & triggers
Cron, event-driven, message-driven, and composite triggers. AND-conjunction semantics, real-world scenarios, and what ematix-flow's trigger surface offers that other schedulers don't.
The flow in ematix-flow is the workflow. Workflows are the central organizing concept of the project — every operational concern (when something runs, what runs in what order, who depends on what, what fires when an upstream succeeds, what the operator sees in the UI) is anchored to the workflow.
This page is the canonical reference for the trigger surface. Every workflow (and every standalone job) declares its trigger conditions, which are evaluated as an AND-conjunction against that workflow’s own last successful run. We cover every trigger kind, how composite triggers compose, real operational scenarios, and what this surface offers that isn’t available in other off-the-shelf orchestrators.
If you haven’t yet, skim Workflows first for the high-level mental model. This page goes deep on the trigger surface.
The four trigger kinds
Triggers live on the workflow declaration (or on a standalone job). There are four kinds:
| Kwarg | Meaning |
|---|---|
schedule="cron" + optional timezone="IANA" | Time trigger. Next cron tick after the workflow’s last successful run must reach. |
triggered_by=[name, ...] | Event trigger. Each named upstream (job or workflow) must have succeeded since this workflow last succeeded. |
on_message=<source> | Message trigger. Per-message firing — one workflow run per inbound message from the source. |
(implicit) any streaming pipeline in jobs= | Streaming trigger. The streaming consumer drives execution; no other trigger required or allowed. |
You must declare at least one trigger (unless the workflow is
implicitly streaming). on_message is mutually exclusive with schedule /
triggered_by; the other two can be combined freely.
Composite triggers: AND, OR, and nested expressions
The simple case — multiple kwargs conjoin with AND
Setting more than one trigger kwarg conjoins them with AND. The workflow
fires only when all declared conditions are satisfied relative to
last_successful_run_of_self.
ematix.workflow(
name="evening_combined_report",
triggered_by=["workflow_A", "workflow_B"],
schedule="0 21 * * *",
timezone="America/New_York",
jobs=[...],
)
Reading: fires when workflow_A has succeeded AND workflow_B has succeeded AND 21:00 EDT has reached — all measured against this workflow’s own last successful run. If conditions are satisfied at different times, the workflow fires the instant the last condition becomes true, not at the next cron tick.
Boolean expressions: AllOf / AnyOf for OR + nesting
triggered_by= accepts a boolean expression tree built from two
combinators imported from the top-level package:
from ematix_flow import ematix, AllOf, AnyOf
ematix.workflow(
name="combined_report",
# Fires when workflow_A succeeded AND (workflow_B OR workflow_C succeeded)
triggered_by=AllOf("workflow_A", AnyOf("workflow_B", "workflow_C")),
schedule="0 21 * * *",
timezone="America/New_York",
jobs=[...],
)
The flat-list shape (triggered_by=["A", "B"]) is sugar for
AllOf("A", "B"); a bare string is sugar for AllOf(<name>). Anything
more elaborate uses the combinators directly.
Trees nest arbitrarily. AllOf("A", AnyOf("B", AllOf("C", "D")))
expresses A AND (B OR (C AND D)). There’s no depth limit and no
restriction on mixing operators.
Evaluation semantics
Per node, given the workflow’s own last_successful_run:
- leaf (an upstream name): ready if that upstream has a successful run after the workflow’s last self-success; failed if the upstream’s most recent terminal run since then was a failure; pending otherwise.
AllOf: ready iff every child is ready; failed iff any child failed and none are pending; pending otherwise.AnyOf: ready iff any child is ready; failed iff every child failed; pending otherwise.
schedule= is evaluated separately and AND-ed with the root of the
expression tree — i.e., the schedule kwarg always behaves as if it were
wrapped in an outer AllOf with the events tree.
In the UI
The Workflows tab renders the full expression tree with AND/OR joiners,
parentheses around nested groups, and a coloured dot on every leaf —
🟢 ready · 🟡 pending · 🔴 failed. The composite-trigger card on the
home-page screenshot is a real worked example: an outer AllOf over a
cron tick, a workflow upstream, and an inner AnyOf over two more
upstreams.
| State | workflow_A | workflow_B | 21:00 tick | Fires? |
|---|---|---|---|---|
| Normal evening | 18:00 ✓ | 19:30 ✓ | reached | yes, at 21:00 |
| B late | 18:00 ✓ | 22:00 ✓ | reached | yes, at 22:00 immediately |
| A failed | 18:00 ✓ | failed | reached | no — waits for A to succeed |
| Both done early | 12:00 ✓ | 14:00 ✓ | not yet | no — waits for 21:00 |
That last column matters. A workflow announced as “around 9 PM” can keep that contract even when upstreams are late — it fires the moment the final condition slips into place, instead of skipping the cycle and waiting until tomorrow.
Trigger state, live
The Workflows tab renders each trigger as a coloured-dot pill:
- 🟢 ready — the condition is satisfied right now
- 🟡 pending — waiting for the condition (cron tick still in the future, or upstream hasn’t completed since the last self-run)
- 🔴 failed — upstream’s most recent terminal run since the last self-run was a failure, blocking firing
Once every pill on a workflow card is green, the workflow is about to fire. That makes “why hasn’t this fired yet?” a one-glance question instead of a log-grep exercise.
Scenarios
Scenario 1 — Daily report with composite dependency
A regional sales report runs every weekday at 21:00 NY. It pulls from two upstream workflows: a customer-sync workflow (runs hourly) and an orders-extract workflow (runs every 5 minutes). The report must use fresh data from both. If either upstream is failing, the report waits rather than running with stale data.
ematix.workflow(
name="regional_sales_report",
triggered_by=["customer_sync", "orders_extract"],
schedule="0 21 * * 1-5",
timezone="America/New_York",
jobs=["build_sales_facts", "build_geo_rollups", "publish_report"],
)
Scenario 2 — Hourly rollup that backfills when upstream slips
Hourly aggregations run at :00 of every hour, but only after the ingestion has actually delivered the data for the last hour. If ingestion is late by 12 minutes, the rollup fires at :12 instead of :00 — once the data is actually there.
ematix.workflow(
name="hourly_rollup",
triggered_by=["ingest_events"],
schedule="0 * * * *",
jobs=["compute_rollup", "publish_to_warehouse"],
)
Scenario 3 — Reactive workflow on Kafka messages
An audit-log publisher fires one workflow run per message arriving on
the audit-events Kafka topic. No cron; the firing rate is whatever
the message rate is.
ematix.workflow(
name="audit_log_publisher",
on_message=KAFKA.topic("audit-events"),
jobs=["enrich_audit_event", "write_to_compliance_db", "notify_slack"],
)
Scenario 4 — Workflow chain (A → B → C)
A linear data pipeline: extract → transform → publish. Each stage is its own workflow, triggered by the previous one’s completion. The cron lives only on the head; downstreams react.
ematix.workflow(name="extract_orders", schedule="*/5 * * * *", jobs=[...])
ematix.workflow(name="transform_orders", triggered_by=["extract_orders"], jobs=[...])
ematix.workflow(name="publish_orders", triggered_by=["transform_orders"], jobs=[...])
This is the right model when the stages need their own DAGs, their own
retry policies, or their own teams. For tight DAGs that always run
together, prefer one workflow with member jobs + per-job depends_on.
Scenario 5 — Manual override with job-subset selection
Every workflow card on the Workflows tab has a ▶ Run now button.
Click it on regional_sales_report and you get a modal listing each
member job with a checkbox. Want to re-run only publish_report while
the upstream facts are still good? Uncheck the other three and submit.
POST /api/workflows/regional_sales_report/run-now
Content-Type: application/json
{"jobs": ["publish_report"]}
The trigger gates are bypassed for the manual run — you’re explicitly saying “run this now.”
Scenario 6 — Single job with optional cascade
Sometimes you want to fire one job ad-hoc but also want everything downstream to react. The ▶ Run now button on a Jobs tab card opens a modal with a “Trigger downstream dependents” checkbox.
- Off (default): runs just this job. Downstream jobs that depend on it stay where they were.
- On: when this job succeeds, every job that has this one in its
depends_onis also enqueued.
This is the explicit version of Scenario 5 from the job angle.
How firing is evaluated
The scheduler tick evaluates each workflow’s trigger conjunction with this rough logic:
def should_fire(workflow):
last = last_successful_run(workflow.name) # None if never run
if workflow.is_streaming:
return False # streaming consumer handles execution itself
if workflow.on_message:
return False # per-message dispatcher handles it
for upstream in workflow.triggered_by:
ok = last_successful_run(upstream)
if ok is None or (last is not None and ok <= last):
return False
if workflow.schedule is not None:
next_tick = next_cron_after(
workflow.schedule, last or epoch_zero, tz=workflow.timezone,
)
if now() < next_tick:
return False
return True
Once should_fire returns true, the worker topo-sorts the workflow’s
member jobs using each job’s depends_on=[...] and runs the DAG.
Trigger precedence: workflow vs. job
A job that is a member of a workflow inherits the workflow’s trigger.
Per-job schedule= / triggered_by= on a workflow member are ignored,
and a DeprecationWarning is emitted at registration to flag the dead
setting.
A standalone job (one not listed in any workflow’s jobs=) keeps its
own schedule= / triggered_by= / on_message=. The Workflows tab
renders these as kind: "single" workflow-of-one cards.
How this compares to other schedulers
Honest assessment of where the trigger surface sits versus the schedulers most people choose between.
vs. dbt (Core or Cloud)
dbt Core has no scheduler — you wire it into a cron / orchestrator and
invoke dbt run. dbt Cloud has job triggers (cron, manual, GitHub
webhook) but no boolean composition of trigger conditions and no native
event-driven cross-job dependencies. The “wait for upstream X AND cron
tick Y” pattern requires bringing in a separate orchestrator (Airflow,
Prefect, Dagster) or writing the gating yourself. So vs. dbt this is a
real gap we close — but it’s not really a like-for-like comparison;
dbt’s scope is the SQL transformation surface, not the trigger surface.
vs. Apache Airflow
Airflow ≥ 2.4 has Datasets with DatasetAny / DatasetAll
combinators that can be nested into trees, and 2.9 added TimeAndDataset
to combine cron with datasets. Boolean composition of trigger conditions
exists. The differences are around indirection and ergonomics:
- Airflow’s model goes through the Dataset abstraction — tasks have
to declare
outlets=[dataset_x]and explicitly update them; downstream DAGs reference the Dataset name. ematix-flow references workflow / job names directly. - ematix-flow shows live per-condition state as coloured dots in the
UI. Airflow’s UI surfaces the next scheduled run and queued/running
states but doesn’t render each Dataset’s individual readiness next to
the DAG declaration. (
Datasetsview exists, but it’s separate from the DAG card.) - Manual runs with selective jobs: Airflow’s manual trigger fires the whole DAG. The closest equivalent for “just re-run this subset” is backfill, which is heavier-weight and dated. ematix-flow’s Run-now modal lets you check off member jobs and submit.
Functionally close on composition; ergonomically different.
vs. Prefect
Prefect 2/3 has Automations — event-driven rules (“when X happens,
do Y”) that can chain to express OR-style dependencies. But they’re
multiple separate rules, not a single declarative expression on the
workflow itself. Conceptually parallel to our triggered_by tree;
syntactically more spread out.
vs. Dagster
Dagster’s sensors are arbitrary Python that polls and decides when to fire. You can express anything because it’s code, but the price is that the trigger logic is imperative and lives outside the asset/job declaration. Asset freshness policies are closer to our model but operate on assets, not workflows.
What’s distinctive about this combination
No single piece is unprecedented — the combination is what we think makes ematix-flow worth choosing for workflow-heavy data work:
- Single-declaration AND/OR composite triggers that reference workflow / job names directly, with no intermediate Dataset abstraction or imperative sensor.
- Live per-condition coloured-dot pills on every workflow card. “Why hasn’t this fired?” becomes a one-glance question instead of a log query.
- Run-now with job-subset selection on a workflow + Run-now with optional cascade on a job. Both bypass trigger gates for the ad-hoc run only.
- First-class workflow concept: named, explicit
jobs=[...]membership, the workflow’s own trigger supersedes any per-job schedule. - Single-process scheduler.
flow scheduleris one Python process embedding the Rust engine — no separate metadata DB, no webserver/scheduler split, no schema migrations between releases. Run it from cron, systemd, K8sCronJob, or the bundled long-running loop — same code, same semantics.
If you’re already heavily invested in Airflow + Datasets, the composition piece alone may not be worth migrating for. The combination of all of the above in a single declarative Python decorator surface plus the Rust execution engine underneath is the bet.
Operational basics
The trigger surface above is what you declare. These are the verbs that evaluate and act on those declarations.
flow run-due
Single-tick scheduler — looks at every workflow / standalone job whose triggers are satisfied right now and runs them. Designed to be invoked from host cron at one-minute granularity.
flow run-due --module my_pipelines
flow scheduler
Long-running loop — same evaluation as run-due, but keeps a lease per
workflow so multiple workers can share work. Useful in Kubernetes /
systemd / Docker-Compose setups where you don’t want host cron.
flow scheduler --module my_pipelines --executor subprocess+python:// --poll-interval 10
flow run — one-shot
Runs a single workflow or job once, ignoring triggers. Useful for CI backfills and manual replays.
flow run --module my_pipelines regional_sales_report
Programmatic
Every decorated function exposes .sync() for orchestrator integration:
from my_pipelines import regional_sales_report
regional_sales_report.sync() # run now, blocking
regional_sales_report.sync(force=True) # ignore watermark for one run
This is the integration point if you want to drive ematix-flow from Airflow, Dagster, Prefect, or any other orchestrator while still benefiting from the ematix-flow execution + run-history surface.
Run history
Every fire — scheduled or manual — writes a row to the configured
run-history store. The Web UI’s Runs tab and the flow status CLI both
read from this store.
flow runs list --pipeline regional_sales_report --since 24h
flow runs show <run-id>