Jobs

One decorator binds a source query to a target table, a load strategy, and a cron schedule. A job is the atomic unit of work.

A job is the atomic unit of work in ematix-flow. One Python function + one target table + one schedule. Group jobs into a workflow to express DAG dependencies between them.

Minimum surface

from typing import Annotated
from ematix_flow import ematix, pk
from ematix_flow.types import BigInt, Text, TimestampTZ

@ematix.connection
class warehouse:
    kind = "postgres"
    url = "${WAREHOUSE_DSN}"

@ematix.table(schema="analytics")
class Events:
    event_id: Annotated[BigInt, pk()]
    name: Text | None
    received_at: TimestampTZ

@ematix.job(
    name="ingest_events",
    target=Events,
    target_connection="warehouse",
    schedule="*/5 * * * *",
    mode="append",
)
def ingest_events(conn):
    return "SELECT event_id, name, received_at FROM raw.events"

That’s it. The framework creates / migrates analytics.events on first run, runs the SELECT every 5 minutes, and appends rows. Restart safety, watermark tracking, and at-least-once delivery come for free.

@ematix.job vs @ematix.pipeline — both names resolve to the same decorator. .pipeline is the historical name and remains supported; new code should prefer .job so the Python surface matches the Workflows/Jobs terminology the Web UI uses.

Function signatures

A job body is either:

  • def f(conn) -> str — return a single SQL string. The framework executes it against conn and streams the result rows into the target.
  • def f(conn) -> Iterator[RecordBatch] — yield Arrow batches. Same target-write path; useful for non-SQL sources or pre-aggregation in Python.

Multi-target fan-out

A single source query can feed multiple targets — same Arrow batches, no re-read.

@ematix.job(
    name="ingest_events_fanout",
    targets=[Events, EventsArchive],
    target_connection={"events": "warehouse", "events_archive": "s3_archive"},
    schedule="*/5 * * * *",
)
def ingest_events(conn):
    return "SELECT event_id, name, received_at FROM raw.events"

Cross-backend

Switching Postgres → Postgres to Postgres → Delta Lake is a one-line change: swap target_connection. Same-DB pairs take the INSERT … SELECT fast path automatically. Cross-backend moves stream Apache Arrow batches end-to-end — no row-by-row serialization, no intermediate file roundtrip.