Streaming

What is a stream?

Streaming is a bit different from regular REST APIs. REST APIs always return the whole data in their response.

Streaming establishes a persistent connection, accepts headers with the content type of application/jsonl (stands for “JSON Lines”) or text/event-stream and then streams each individual event when it is ready one by one.

        ---
config:
  theme: forest

---
sequenceDiagram
    participant App
    participant Client

    Client->>App: Make the initial request
    App->>Client: Establish connection and send headers

    App->>App: Produce Event 1
    App->>Client: Send Event 1
    Client->>Client: Process Event 1

    App->>App: Produce Event 2
    App->>Client: Send Event 2
    Client->>Client: Process Event 2

    Note over App: Continue producing events...
    Note over Client: Continue consuming events...

    Client->>App: Closes the connection
    App->>App: Cleans everything up
    

How streaming works

When to use streaming? When you have a single-directional stream of events.

For example:

  • LLM responses

  • Logs

  • Telemetry

  • Financial services

  • Sporting events

  • Live locations

Do not use streaming for regular responses with regular data. Only use it when needed.

Just a regular controller

We base our API around dmr.streaming.controller.StreamingController type which is a slightly modified subclass of a regular Controller.

Streaming controllers support all the same features:

We utilize collections.abc.AsyncIterator protocol to model async event sources.

Existing streaming formats

We support:

JsonLines is a simple format, where events are streamed line by line, and all lines are valid JSON objects.

Read more: Json Lines

Run result

$ curl http://127.0.0.1:8000/api/user/events/ -X GET
{"email":"first@example.com"}
{"email":"second@example.com"}

OpenAPI Schema

Preview openapi.json
{
  "components": {
    "schemas": {
      "ErrorDetail": {
        "description": "Base schema for error details description.",
        "properties": {
          "loc": {
            "items": {
              "anyOf": [
                {
                  "type": "integer"
                },
                {
                  "type": "string"
                }
              ]
            },
            "type": "array"
          },
          "msg": {
            "type": "string"
          },
          "type": {
            "type": "string"
          }
        },
        "required": [
          "msg"
        ],
        "title": "ErrorDetail",
        "type": "object"
      },
      "ErrorModel": {
        "description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
        "properties": {
          "detail": {
            "items": {
              "$ref": "#/components/schemas/ErrorDetail"
            },
            "type": "array"
          }
        },
        "required": [
          "detail"
        ],
        "title": "ErrorModel",
        "type": "object"
      },
      "_User": {
        "properties": {
          "email": {
            "type": "string"
          }
        },
        "required": [
          "email"
        ],
        "title": "_User",
        "type": "object"
      }
    },
    "securitySchemes": {}
  },
  "info": {
    "title": "Django Modern Rest",
    "version": "0.1.0"
  },
  "openapi": "3.2.0",
  "paths": {
    "/api/usereventscontroller/": {
      "get": {
        "deprecated": false,
        "operationId": "getUsereventscontrollerApiUsereventscontroller",
        "responses": {
          "200": {
            "content": {
              "application/jsonl": {
                "itemSchema": {
                  "$ref": "#/components/schemas/_User"
                }
              }
            },
            "description": "OK",
            "headers": {
              "Cache-Control": {
                "required": true,
                "schema": {
                  "type": "string"
                }
              },
              "Connection": {
                "required": true,
                "schema": {
                  "type": "string"
                }
              },
              "X-Accel-Buffering": {
                "required": true,
                "schema": {
                  "type": "string"
                }
              }
            }
          },
          "406": {
            "content": {
              "application/json": {
                "schema": {
                  "$ref": "#/components/schemas/ErrorModel"
                }
              }
            },
            "description": "Raised when provided `Accept` header cannot be satisfied"
          },
          "422": {
            "content": {
              "application/json": {
                "schema": {
                  "$ref": "#/components/schemas/ErrorModel"
                }
              }
            },
            "description": "Raised when returned response does not match the response schema"
          }
        }
      }
    }
  }
}

What happens in these examples?

  1. We create our controller as usual, except we are using dmr.streaming.controller.StreamingController subtypes instead of a regular Controller class

  2. We return typing.AsyncIterator instance instead of a single data item

Choose the one that fits your needs, or create your own format!

Important

All streaming modes require Django to run in ASGI mode in production.

Handling errors

Streaming controllers have two layers of error handling:

  1. Default error handling works the same way for request / response phase, see Error handling for more information

  2. Per-event error handling, which starts when the streaming connection is established and the events are produced. Such errors are handled in handle_event_error() method

The second layer of event error handling is unique for streaming controllers.

It works for several cases:

  1. When event validation fails (mostly useful in development)

  2. When there’s an error in the event producer

Let’s see how it can be customized.

Run result

$ curl http://127.0.0.1:8000/api/numbereventscontroller/ -X GET
1
{"error":"zero divizion"}

Warning

Please, note that new events won’t be produced if the error happens in the async generator itself. If you want to handle errors there as well, use try/except right inside the event producing async generator.

Handling disconnects

If you need to immediately close the response stream, you can raise StreamingCloseError or asyncio.CancelledError inside the events producing async iterator.

Async clients can disconnect at any time. We always handle this error gracefully.

Validation

All our regular response validation rules are applied to the SSE controllers as well. We strictly validate that all headers / cookies / etc are listed in the endpoint’s metadata.

We follow the regular rules for response validation and it can be disabled by setting validate_responses=False on the needed level.

We also validate events structure, when streaming them to end users. It is recommended to be turned on in development and turned off in production.

Rules:

  • If endpoint specified validate_events boolean value, we use it

  • If endpoint does not specify this flag, but controller does, we use it

  • If controller does not specify this flag, but settings does, we use it

  • If no explicit validate_events boolean value is specified, we fallback to validate_responses value

Both validate() and modify() support this flag:

Run result

$ curl http://127.0.0.1:8000/api/user/events/ -X GET
data: "not-an-int"

How do we know the model for events to be validated against?

  • It might be specified as the return_type in the ResponseSpec of @validate for the given status code

  • It might be specified as the type argument to generic collections.abc.AsyncIterator return type in @modify styled endpoint.

We fallback to typing.Any if we can’t infer the event model.

Further reading

See also

Best practices for working with async iterators and generators in Python: https://docs.python.org/3.15/library/asyncio-dev.html#asynchronous-generators-best-practices

JsonLines

Streaming individual lines of json.

Json Lines
SSE

Streaming Server Sent Events.

Server Sent Events

Advanced topics

Negotiation for different formats

If you need different event formats for different users / usecases, then you can use the regular Content negotiation process.

Send different Accept headers to get different event streams back:

Run result

$ curl http://127.0.0.1:8000/api/user/events/ -X GET -H 'Accept: application/jsonl, application/json'
{"a":1}
["b",2]

$ curl http://127.0.0.1:8000/api/user/events/ -X GET -H 'Accept: text/event-stream, application/json'
data: {"a":1}

data: ["b",2]

OpenAPI Schema

Preview openapi.json
{
  "components": {
    "schemas": {
      "ErrorDetail": {
        "description": "Base schema for error details description.",
        "properties": {
          "loc": {
            "items": {
              "anyOf": [
                {
                  "type": "integer"
                },
                {
                  "type": "string"
                }
              ]
            },
            "title": "Loc",
            "type": "array"
          },
          "msg": {
            "title": "Msg",
            "type": "string"
          },
          "type": {
            "title": "Type",
            "type": "string"
          }
        },
        "required": [
          "msg"
        ],
        "title": "ErrorDetail",
        "type": "object"
      },
      "ErrorModel": {
        "description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
        "properties": {
          "detail": {
            "items": {
              "$ref": "#/components/schemas/ErrorDetail"
            },
            "title": "Detail",
            "type": "array"
          }
        },
        "required": [
          "detail"
        ],
        "title": "ErrorModel",
        "type": "object"
      },
      "SSEvent": {
        "properties": {
          "comment": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "title": "Comment"
          },
          "data": {
            "title": "Data"
          },
          "event": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "title": "Event"
          },
          "id": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "title": "Id"
          },
          "retry": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "title": "Retry"
          }
        },
        "required": [
          "data"
        ],
        "title": "SSEvent",
        "type": "object"
      }
    },
    "securitySchemes": {}
  },
  "info": {
    "title": "Django Modern Rest",
    "version": "0.1.0"
  },
  "openapi": "3.2.0",
  "paths": {
    "/api/eventstreaming/": {
      "get": {
        "deprecated": false,
        "operationId": "getEventstreamingApiEventstreaming",
        "responses": {
          "200": {
            "content": {
              "application/jsonl": {
                "itemSchema": {}
              },
              "text/event-stream": {
                "itemSchema": {
                  "$ref": "#/components/schemas/SSEvent"
                }
              }
            },
            "description": "OK",
            "headers": {
              "Cache-Control": {
                "required": true,
                "schema": {
                  "type": "string"
                }
              },
              "Connection": {
                "required": true,
                "schema": {
                  "type": "string"
                }
              },
              "X-Accel-Buffering": {
                "required": true,
                "schema": {
                  "type": "string"
                }
              }
            }
          },
          "406": {
            "content": {
              "application/json": {
                "schema": {
                  "$ref": "#/components/schemas/ErrorModel"
                }
              }
            },
            "description": "Raised when provided `Accept` header cannot be satisfied"
          },
          "422": {
            "content": {
              "application/json": {
                "schema": {
                  "$ref": "#/components/schemas/ErrorModel"
                }
              }
            },
            "description": "Raised when returned response does not match the response schema"
          }
        }
      }
    }
  }
}

This is a really advanced feature that is not required in 99% of cases. Here we define our own StreamingController subclass and override a special method to instantiate streaming renderers streaming_renderers().

Next we define an API endpoint for GET method. We use conditional_type() function to specify which type will be returned in which case for the OpenAPI metadata.

And _event_source() method which will provide events for both of the formats.

The last thing we do is we check what Accept header we are working with and provide an appropriate format for each case.

API Reference

Controllers

class dmr.streaming.controller.StreamingController(**kwargs)[source]

Bases: Controller[_SerializerT_co]

Base class for all streaming controllers.

It can be used directly, but the most use-cases will be fine with just using the specific streaming protocol.

endpoint_cls

alias of _StreamingEndpoint

async handle_event_error(exc: Exception) Any[source]

Error handler for the events.

Is called when the StreamingResponse is iterated in the ASGI handler.

ping_event() Any | None[source]

Return a ping event to be generated if this streaming needs it.

By default pings are disabled for StreamingController types. Pings must be explicitly enabled in subclasses.

If streaming_ping_seconds is set, this method will be called.

streaming_ping_seconds: ClassVar[float | None] = None

Optional ping keep alive event support.

Some servers might close long living connections with no activity. Specify number in second how long should we wait between events. If we wait longer, we will send a ping event. The payload of the ping event is defined in ping_event().

By default it is disabled. It is only enabled in the SSE streaming.

abstractmethod classmethod streaming_renderers(serializer: type[_SerializerT_co]) Iterable[StreamingRenderer][source]

Returns the iterable of streaming renderers for this controller.

streaming_response_cls

Streaming response type to customize.

alias of StreamingResponse

to_stream(streaming_content: AsyncIterator[Any], *, status_code: HTTPStatus = HTTPStatus.OK, headers: Mapping[str, str] | None = None, cookies: Mapping[str, NewCookie] | None = None, regular_renderer: Renderer | None = None, streaming_renderer: StreamingRenderer | None = None, streaming_validator: StreamingValidator | None = None) StreamingResponse[source]

Convert streaming content to a streaming response.

Parameters:
  • streaming_content – Async iterator which will produce the streaming events.

  • status_code – Response status code. Defaults to 200.

  • headers – Response extra headers.

  • cookies – Response cookies.

  • regular_renderer – Renderer instance to render event payloads.

  • streaming_renderer – StreamingRenderer instance to render the event stream.

  • streaming_validator – StreamingValidator instance to validate events one by one if validate_events is set to True.

Returns:

StreamingResponse instance with all the required properties set.

Responses

class dmr.streaming.stream.StreamingResponse(streaming_content: AsyncIterator[Any], controller: StreamingController[BaseSerializer], *, regular_renderer: Renderer, streaming_renderer: StreamingRenderer, streaming_validator: StreamingValidator, headers: Mapping[str, str] | None = None, status_code: HTTPStatus = HTTPStatus.OK)[source]

Bases: HttpResponseBase

Our own response subclass to mark that we explicitly return SSE.

Converts events to bytes with the help of a passed serializer and renderer types.

We don’t inherit from django.http.StreamingHttpResponse here, because it has a strict API for streaming that we can’t use or customize.

__aiter__() AsyncIterator[bytes][source]

ASGI handler protocol for streaming responses.

When streaming is True, ASGI handler will async iterate over the response object.

When doing so, we will be inside the ASGI handler already. No DMR error handling will work.

__iter__() Iterator[bytes][source]

In development it is useful to have sync interface for streaming.

This is a part of the WSGI handler protocol.

Danger

Do not use this in production! We even added a special error to catch this. In production you must use ASGI servers like uvicorn with streaming.

This implementation has a lot of limitations. Be careful even in development.

close() None[source]

Closes the response and cleans up all references.

streaming: Final = True

Part of the ASGI handler protocol. Will trigger __aiter__

Renderers

class dmr.streaming.renderer.StreamingRenderer(serializer: type[BaseSerializer], regular_renderer: Renderer, streaming_validator_cls: type[StreamingValidator])[source]

Bases: Renderer

Base class for all streaming responses.

It is different from the regular Renderer in several ways:

  1. We need to initialize this renderer with a subrenderer, which will render the individual events itself

  2. Serializer is needed to serialize events

  3. Validator is needed to optionally validate events

streaming: ClassVar[Literal[True]] = True

Whether or not this renderer is used for streaming responses.

property validation_parser: _NoOpParser

Returns a parser that can parse what this renderer rendered.

Why? Because when validate_responses is True, we parse the response body once again to see if it fits the schema.

That’s why all renderers must know how to unparse its results.

Validation

class dmr.streaming.validation.StreamingValidator(event_model: Any, serializer: type[BaseSerializer], *, validate_events: bool)[source]

Injects itself into the stream of SSE to validate the events.

This is very different from any other validator. Why?

  1. Because we send just one response. No events can be produced at all for a long period of time. Some events can be correct, while other can be wrong

  2. We can’t close the connection when finding wrong events, it will be a big problem for our users and it would be hard to debug

  3. But, we can modify events to be error events instead!

  4. When validation is active and the event is either not the model we expect or has the wrong payload type - we send the error event

apply_event_pipeline(event: Any) Any[source]

Runs the pipeline.

classmethod from_controller(controller: StreamingController[BaseSerializer], status_code: HTTPStatus) Self[source]

Construct validator from a controller instance.

Inferences event type model from the endpoint metadata. Also knows whether or not the events validation is turned on or not.

abstractmethod validation_pipeline() Iterable[Callable[[Any, Any, type[BaseSerializer]], Any]][source]

Abstract method to define the event validation pipeline.

dmr.streaming.validation.validate_event_type(event: Any, model: Any, serializer: type[BaseSerializer]) Any[source]

Validate that the event type matches the model.