A pipeline is just a Python function returning a SQL string (or an Arrow iterator). The decorator binds it to a target table, a connection, a load mode, and a cron schedule.
Minimum surface
from typing import Annotated
from ematix_flow import ematix, pk
from ematix_flow.types import BigInt, Text, TimestampTZ
@ematix.connection
class warehouse:
kind = "postgres"
url = "${WAREHOUSE_DSN}"
@ematix.table(schema="analytics")
class Events:
event_id: Annotated[BigInt, pk()]
name: Text | None
received_at: TimestampTZ
@ematix.pipeline(
target=Events,
target_connection="warehouse",
schedule="*/5 * * * *",
mode="append",
)
def ingest_events(conn):
return "SELECT event_id, name, received_at FROM raw.events"
That’s it. The framework creates / migrates analytics.events on
first run, runs the SELECT every 5 minutes, and appends rows. Restart
safety, watermark tracking, and at-least-once delivery come for free.
Function signatures
A pipeline body is either:
def f(conn) -> str— return a single SQL string. The framework executes it againstconnand streams the result rows into the target.def f(conn) -> Iterator[RecordBatch]— yield Arrow batches. Same target-write path; useful for non-SQL sources or pre-aggregation in Python.
Multi-target fan-out
A single source query can feed multiple targets — same Arrow batches, no re-read.
@ematix.pipeline(
targets=[Events, EventsArchive],
target_connection={"events": "warehouse", "events_archive": "s3_archive"},
schedule="*/5 * * * *",
)
def ingest_events(conn):
return "SELECT event_id, name, received_at FROM raw.events"
Cross-backend
Switching Postgres → Postgres to Postgres → Delta Lake is a
one-line change: swap target_connection. Same-DB pairs take the
INSERT … SELECT fast path automatically. Cross-backend moves stream
Apache Arrow batches end-to-end — no row-by-row serialization, no
intermediate file roundtrip.