Jobs
One decorator binds a source query to a target table, a load strategy, and a cron schedule. A job is the atomic unit of work.
A job is the atomic unit of work in ematix-flow. One Python function + one target table + one schedule. Group jobs into a workflow to express DAG dependencies between them.
Minimum surface
from typing import Annotated
from ematix_flow import ematix, pk
from ematix_flow.types import BigInt, Text, TimestampTZ
@ematix.connection
class warehouse:
kind = "postgres"
url = "${WAREHOUSE_DSN}"
@ematix.table(schema="analytics")
class Events:
event_id: Annotated[BigInt, pk()]
name: Text | None
received_at: TimestampTZ
@ematix.job(
name="ingest_events",
target=Events,
target_connection="warehouse",
schedule="*/5 * * * *",
mode="append",
)
def ingest_events(conn):
return "SELECT event_id, name, received_at FROM raw.events"
That’s it. The framework creates / migrates analytics.events on first
run, runs the SELECT every 5 minutes, and appends rows. Restart safety,
watermark tracking, and at-least-once delivery come for free.
@ematix.jobvs@ematix.pipeline— both names resolve to the same decorator..pipelineis the historical name and remains supported; new code should prefer.jobso the Python surface matches the Workflows/Jobs terminology the Web UI uses.
Function signatures
A job body is either:
def f(conn) -> str— return a single SQL string. The framework executes it againstconnand streams the result rows into the target.def f(conn) -> Iterator[RecordBatch]— yield Arrow batches. Same target-write path; useful for non-SQL sources or pre-aggregation in Python.
Multi-target fan-out
A single source query can feed multiple targets — same Arrow batches, no re-read.
@ematix.job(
name="ingest_events_fanout",
targets=[Events, EventsArchive],
target_connection={"events": "warehouse", "events_archive": "s3_archive"},
schedule="*/5 * * * *",
)
def ingest_events(conn):
return "SELECT event_id, name, received_at FROM raw.events"
Cross-backend
Switching Postgres → Postgres to Postgres → Delta Lake is a one-line
change: swap target_connection. Same-DB pairs take the
INSERT … SELECT fast path automatically. Cross-backend moves stream
Apache Arrow batches end-to-end — no row-by-row serialization, no
intermediate file roundtrip.