Scheduling & triggers

Cron, event-driven, message-driven, and composite triggers. AND-conjunction semantics, real-world scenarios, and what ematix-flow's trigger surface offers that other schedulers don't.

The flow in ematix-flow is the workflow. Workflows are the central organizing concept of the project — every operational concern (when something runs, what runs in what order, who depends on what, what fires when an upstream succeeds, what the operator sees in the UI) is anchored to the workflow.

This page is the canonical reference for the trigger surface. Every workflow (and every standalone job) declares its trigger conditions, which are evaluated as an AND-conjunction against that workflow’s own last successful run. We cover every trigger kind, how composite triggers compose, real operational scenarios, and what this surface offers that isn’t available in other off-the-shelf orchestrators.

If you haven’t yet, skim Workflows first for the high-level mental model. This page goes deep on the trigger surface.

The four trigger kinds

Triggers live on the workflow declaration (or on a standalone job). There are four kinds:

KwargMeaning
schedule="cron" + optional timezone="IANA"Time trigger. Next cron tick after the workflow’s last successful run must reach.
triggered_by=[name, ...]Event trigger. Each named upstream (job or workflow) must have succeeded since this workflow last succeeded.
on_message=<source>Message trigger. Per-message firing — one workflow run per inbound message from the source.
(implicit) any streaming pipeline in jobs=Streaming trigger. The streaming consumer drives execution; no other trigger required or allowed.

You must declare at least one trigger (unless the workflow is implicitly streaming). on_message is mutually exclusive with schedule / triggered_by; the other two can be combined freely.

Composite triggers: AND, OR, and nested expressions

The simple case — multiple kwargs conjoin with AND

Setting more than one trigger kwarg conjoins them with AND. The workflow fires only when all declared conditions are satisfied relative to last_successful_run_of_self.

ematix.workflow(
    name="evening_combined_report",
    triggered_by=["workflow_A", "workflow_B"],
    schedule="0 21 * * *",
    timezone="America/New_York",
    jobs=[...],
)

Reading: fires when workflow_A has succeeded AND workflow_B has succeeded AND 21:00 EDT has reached — all measured against this workflow’s own last successful run. If conditions are satisfied at different times, the workflow fires the instant the last condition becomes true, not at the next cron tick.

Boolean expressions: AllOf / AnyOf for OR + nesting

triggered_by= accepts a boolean expression tree built from two combinators imported from the top-level package:

from ematix_flow import ematix, AllOf, AnyOf

ematix.workflow(
    name="combined_report",
    # Fires when workflow_A succeeded AND (workflow_B OR workflow_C succeeded)
    triggered_by=AllOf("workflow_A", AnyOf("workflow_B", "workflow_C")),
    schedule="0 21 * * *",
    timezone="America/New_York",
    jobs=[...],
)

The flat-list shape (triggered_by=["A", "B"]) is sugar for AllOf("A", "B"); a bare string is sugar for AllOf(<name>). Anything more elaborate uses the combinators directly.

Trees nest arbitrarily. AllOf("A", AnyOf("B", AllOf("C", "D"))) expresses A AND (B OR (C AND D)). There’s no depth limit and no restriction on mixing operators.

Evaluation semantics

Per node, given the workflow’s own last_successful_run:

  • leaf (an upstream name): ready if that upstream has a successful run after the workflow’s last self-success; failed if the upstream’s most recent terminal run since then was a failure; pending otherwise.
  • AllOf: ready iff every child is ready; failed iff any child failed and none are pending; pending otherwise.
  • AnyOf: ready iff any child is ready; failed iff every child failed; pending otherwise.

schedule= is evaluated separately and AND-ed with the root of the expression tree — i.e., the schedule kwarg always behaves as if it were wrapped in an outer AllOf with the events tree.

In the UI

The Workflows tab renders the full expression tree with AND/OR joiners, parentheses around nested groups, and a coloured dot on every leaf — 🟢 ready · 🟡 pending · 🔴 failed. The composite-trigger card on the home-page screenshot is a real worked example: an outer AllOf over a cron tick, a workflow upstream, and an inner AnyOf over two more upstreams.

Stateworkflow_Aworkflow_B21:00 tickFires?
Normal evening18:00 ✓19:30 ✓reachedyes, at 21:00
B late18:00 ✓22:00 ✓reachedyes, at 22:00 immediately
A failed18:00 ✓failedreachedno — waits for A to succeed
Both done early12:00 ✓14:00 ✓not yetno — waits for 21:00

That last column matters. A workflow announced as “around 9 PM” can keep that contract even when upstreams are late — it fires the moment the final condition slips into place, instead of skipping the cycle and waiting until tomorrow.

Trigger state, live

The Workflows tab renders each trigger as a coloured-dot pill:

  • 🟢 ready — the condition is satisfied right now
  • 🟡 pending — waiting for the condition (cron tick still in the future, or upstream hasn’t completed since the last self-run)
  • 🔴 failed — upstream’s most recent terminal run since the last self-run was a failure, blocking firing

Once every pill on a workflow card is green, the workflow is about to fire. That makes “why hasn’t this fired yet?” a one-glance question instead of a log-grep exercise.

Scenarios

Scenario 1 — Daily report with composite dependency

A regional sales report runs every weekday at 21:00 NY. It pulls from two upstream workflows: a customer-sync workflow (runs hourly) and an orders-extract workflow (runs every 5 minutes). The report must use fresh data from both. If either upstream is failing, the report waits rather than running with stale data.

ematix.workflow(
    name="regional_sales_report",
    triggered_by=["customer_sync", "orders_extract"],
    schedule="0 21 * * 1-5",
    timezone="America/New_York",
    jobs=["build_sales_facts", "build_geo_rollups", "publish_report"],
)

Scenario 2 — Hourly rollup that backfills when upstream slips

Hourly aggregations run at :00 of every hour, but only after the ingestion has actually delivered the data for the last hour. If ingestion is late by 12 minutes, the rollup fires at :12 instead of :00 — once the data is actually there.

ematix.workflow(
    name="hourly_rollup",
    triggered_by=["ingest_events"],
    schedule="0 * * * *",
    jobs=["compute_rollup", "publish_to_warehouse"],
)

Scenario 3 — Reactive workflow on Kafka messages

An audit-log publisher fires one workflow run per message arriving on the audit-events Kafka topic. No cron; the firing rate is whatever the message rate is.

ematix.workflow(
    name="audit_log_publisher",
    on_message=KAFKA.topic("audit-events"),
    jobs=["enrich_audit_event", "write_to_compliance_db", "notify_slack"],
)

Scenario 4 — Workflow chain (A → B → C)

A linear data pipeline: extract → transform → publish. Each stage is its own workflow, triggered by the previous one’s completion. The cron lives only on the head; downstreams react.

ematix.workflow(name="extract_orders",   schedule="*/5 * * * *", jobs=[...])
ematix.workflow(name="transform_orders", triggered_by=["extract_orders"],   jobs=[...])
ematix.workflow(name="publish_orders",   triggered_by=["transform_orders"], jobs=[...])

This is the right model when the stages need their own DAGs, their own retry policies, or their own teams. For tight DAGs that always run together, prefer one workflow with member jobs + per-job depends_on.

Scenario 5 — Manual override with job-subset selection

Every workflow card on the Workflows tab has a ▶ Run now button. Click it on regional_sales_report and you get a modal listing each member job with a checkbox. Want to re-run only publish_report while the upstream facts are still good? Uncheck the other three and submit.

POST /api/workflows/regional_sales_report/run-now
Content-Type: application/json

{"jobs": ["publish_report"]}

The trigger gates are bypassed for the manual run — you’re explicitly saying “run this now.”

Scenario 6 — Single job with optional cascade

Sometimes you want to fire one job ad-hoc but also want everything downstream to react. The ▶ Run now button on a Jobs tab card opens a modal with a “Trigger downstream dependents” checkbox.

  • Off (default): runs just this job. Downstream jobs that depend on it stay where they were.
  • On: when this job succeeds, every job that has this one in its depends_on is also enqueued.

This is the explicit version of Scenario 5 from the job angle.

How firing is evaluated

The scheduler tick evaluates each workflow’s trigger conjunction with this rough logic:

def should_fire(workflow):
    last = last_successful_run(workflow.name)  # None if never run

    if workflow.is_streaming:
        return False  # streaming consumer handles execution itself
    if workflow.on_message:
        return False  # per-message dispatcher handles it

    for upstream in workflow.triggered_by:
        ok = last_successful_run(upstream)
        if ok is None or (last is not None and ok <= last):
            return False

    if workflow.schedule is not None:
        next_tick = next_cron_after(
            workflow.schedule, last or epoch_zero, tz=workflow.timezone,
        )
        if now() < next_tick:
            return False

    return True

Once should_fire returns true, the worker topo-sorts the workflow’s member jobs using each job’s depends_on=[...] and runs the DAG.

Trigger precedence: workflow vs. job

A job that is a member of a workflow inherits the workflow’s trigger. Per-job schedule= / triggered_by= on a workflow member are ignored, and a DeprecationWarning is emitted at registration to flag the dead setting.

A standalone job (one not listed in any workflow’s jobs=) keeps its own schedule= / triggered_by= / on_message=. The Workflows tab renders these as kind: "single" workflow-of-one cards.

How this compares to other schedulers

Honest assessment of where the trigger surface sits versus the schedulers most people choose between.

vs. dbt (Core or Cloud)

dbt Core has no scheduler — you wire it into a cron / orchestrator and invoke dbt run. dbt Cloud has job triggers (cron, manual, GitHub webhook) but no boolean composition of trigger conditions and no native event-driven cross-job dependencies. The “wait for upstream X AND cron tick Y” pattern requires bringing in a separate orchestrator (Airflow, Prefect, Dagster) or writing the gating yourself. So vs. dbt this is a real gap we close — but it’s not really a like-for-like comparison; dbt’s scope is the SQL transformation surface, not the trigger surface.

vs. Apache Airflow

Airflow ≥ 2.4 has Datasets with DatasetAny / DatasetAll combinators that can be nested into trees, and 2.9 added TimeAndDataset to combine cron with datasets. Boolean composition of trigger conditions exists. The differences are around indirection and ergonomics:

  • Airflow’s model goes through the Dataset abstraction — tasks have to declare outlets=[dataset_x] and explicitly update them; downstream DAGs reference the Dataset name. ematix-flow references workflow / job names directly.
  • ematix-flow shows live per-condition state as coloured dots in the UI. Airflow’s UI surfaces the next scheduled run and queued/running states but doesn’t render each Dataset’s individual readiness next to the DAG declaration. (Datasets view exists, but it’s separate from the DAG card.)
  • Manual runs with selective jobs: Airflow’s manual trigger fires the whole DAG. The closest equivalent for “just re-run this subset” is backfill, which is heavier-weight and dated. ematix-flow’s Run-now modal lets you check off member jobs and submit.

Functionally close on composition; ergonomically different.

vs. Prefect

Prefect 2/3 has Automations — event-driven rules (“when X happens, do Y”) that can chain to express OR-style dependencies. But they’re multiple separate rules, not a single declarative expression on the workflow itself. Conceptually parallel to our triggered_by tree; syntactically more spread out.

vs. Dagster

Dagster’s sensors are arbitrary Python that polls and decides when to fire. You can express anything because it’s code, but the price is that the trigger logic is imperative and lives outside the asset/job declaration. Asset freshness policies are closer to our model but operate on assets, not workflows.

What’s distinctive about this combination

No single piece is unprecedented — the combination is what we think makes ematix-flow worth choosing for workflow-heavy data work:

  • Single-declaration AND/OR composite triggers that reference workflow / job names directly, with no intermediate Dataset abstraction or imperative sensor.
  • Live per-condition coloured-dot pills on every workflow card. “Why hasn’t this fired?” becomes a one-glance question instead of a log query.
  • Run-now with job-subset selection on a workflow + Run-now with optional cascade on a job. Both bypass trigger gates for the ad-hoc run only.
  • First-class workflow concept: named, explicit jobs=[...] membership, the workflow’s own trigger supersedes any per-job schedule.
  • Single-process scheduler. flow scheduler is one Python process embedding the Rust engine — no separate metadata DB, no webserver/scheduler split, no schema migrations between releases. Run it from cron, systemd, K8s CronJob, or the bundled long-running loop — same code, same semantics.

If you’re already heavily invested in Airflow + Datasets, the composition piece alone may not be worth migrating for. The combination of all of the above in a single declarative Python decorator surface plus the Rust execution engine underneath is the bet.

Operational basics

The trigger surface above is what you declare. These are the verbs that evaluate and act on those declarations.

flow run-due

Single-tick scheduler — looks at every workflow / standalone job whose triggers are satisfied right now and runs them. Designed to be invoked from host cron at one-minute granularity.

flow run-due --module my_pipelines

flow scheduler

Long-running loop — same evaluation as run-due, but keeps a lease per workflow so multiple workers can share work. Useful in Kubernetes / systemd / Docker-Compose setups where you don’t want host cron.

flow scheduler --module my_pipelines --executor subprocess+python:// --poll-interval 10

flow run — one-shot

Runs a single workflow or job once, ignoring triggers. Useful for CI backfills and manual replays.

flow run --module my_pipelines regional_sales_report

Programmatic

Every decorated function exposes .sync() for orchestrator integration:

from my_pipelines import regional_sales_report
regional_sales_report.sync()                  # run now, blocking
regional_sales_report.sync(force=True)        # ignore watermark for one run

This is the integration point if you want to drive ematix-flow from Airflow, Dagster, Prefect, or any other orchestrator while still benefiting from the ematix-flow execution + run-history surface.

Run history

Every fire — scheduled or manual — writes a row to the configured run-history store. The Web UI’s Runs tab and the flow status CLI both read from this store.

flow runs list --pipeline regional_sales_report --since 24h
flow runs show <run-id>