Streaming pipelines

Long-running consumers for Kafka / RabbitMQ / Pub/Sub / Kinesis with at-least-once delivery and DLQ support.

Streaming pipelines are the long-running cousin of batch pipelines. They consume from a streaming source, write to a target, and commit offsets only after a durable target write — at-least-once end to end.

Decorator form

from ematix_flow import ematix

@ematix.connection
class kafka_prod:
    kind = "kafka"
    bootstrap_servers = "${KAFKA_BOOTSTRAP}"
    group_id = "ematix-flow"

@ematix.streaming_pipeline(
    source_connection="kafka_prod",
    source_topic="events.v1",
    target=Events,
    target_connection="warehouse",
    dead_letter_topic="events.v1.dlq",
)
def consume_events(batch):
    return batch    # pass-through; transform in Python if needed

Run it as a foreground process:

flow consume --module my_pipelines consume_events

Or wrap with systemd, supervisord, or a Kubernetes Deployment.

TOML form

For ops teams that want pipelines out of Python entirely:

[streaming_pipelines.consume_events]
source = { connection = "kafka_prod", topic = "events.v1" }
target = { connection = "warehouse", table = "analytics.events" }
dead_letter_topic = "events.v1.dlq"
batch_size = 1000
flow consume --config streaming.toml consume_events

Guarantees

  • Manual offset commit / ack. ematix-flow calls commit_offsets() on the source only after a durable target write.
  • At-least-once end to end is the default.
  • Exactly-once. Kafka producer-side via transactions; consumer- coordinated via KafkaToKafkaEosPipeline.
  • DLQ. App-level (dead_letter_topic routes failed rows to a separate target) and broker-level (RabbitMQ x-dead-letter-exchange, Pub/Sub dead_letter_policy).
  • Schema Registry. Avro / Protobuf decode-encode via Confluent SR or Apicurio — declare it as a connection and reference it from the Kafka connection.