EMATIX(R) DATA TERMINAL — ROBCO INDUSTRIES UNIFIED OPERATING SYSTEM
COPYRIGHT 2026 EMATIX SYSTEMS — ALL RIGHTS RESERVED
USER: GUEST   SESSION: 2026-05-20 21:38:22Z   HOST: ematix.dev/guide
// USER GUIDE

Stream processing

SQL transforms, scalar + aggregate UDFs, tumbling/hopping/session windows over streaming sources.


Streaming sources can carry SQL transforms, Python UDFs, and windowed aggregations — the same primitives as the batch side, running over a continuous stream of Arrow batches.

SQL transforms

@ematix.streaming_pipeline(
    source_connection="kafka_prod",
    source_topic="events.v1",
    target=DailyByTenant,
    target_connection="warehouse",
    transform="""
        SELECT
          tenant_id,
          DATE_TRUNC('day', received_at) AS day,
          COUNT(*)                       AS event_count
        FROM source
        GROUP BY tenant_id, day
    """,
)
def rollup(batch): return batch

source is the streaming batch, registered as a virtual table per batch. The SQL runs in-process against Arrow — no JVM, no shuffle to a separate cluster.

Python UDFs

Scalar:

import ematix_flow as ef

@ef.udf(return_type=ef.types.Float64)
def fahrenheit_to_celsius(f: float) -> float:
    return (f - 32) * 5 / 9

Aggregate (UDAF):

@ef.udaf(return_type=ef.types.Float64, state_type=ef.types.Struct(...))
class RunningAverage:
    def init(self):           ...
    def update(self, state, x): ...
    def merge(self, a, b):    ...
    def finalize(self, state): ...

Both are usable from transform="..." SQL or directly from Python.

Windows

Tumbling, hopping, session — declared on the pipeline:

@ematix.streaming_pipeline(
    source_connection="kafka_prod",
    source_topic="events.v1",
    target=PerMinute,
    target_connection="warehouse",
    window={"kind": "tumbling", "size": "1 minute", "time_column": "received_at"},
    transform="""
        SELECT
          window_start,
          tenant_id,
          COUNT(*) AS n
        FROM source
        GROUP BY window_start, tenant_id
    """,
)
def per_minute(batch): return batch

Session windows accept gap= instead of size=. Hopping windows accept both size= and slide=.

That’s the full surface. Head to Specs for the “why bother” half.


◀ BACK TO USER GUIDE ▲ HOME