Streaming¶
What is a stream?¶
Streaming is a bit different from regular REST APIs. REST APIs always return the whole data in their response.
Streaming establishes a persistent connection,
accepts headers with the content type
of application/jsonl (stands for “JSON Lines”) or text/event-stream
and then streams each individual event when it is ready one by one.
---
config:
theme: forest
---
sequenceDiagram
participant App
participant Client
Client->>App: Make the initial request
App->>Client: Establish connection and send headers
App->>App: Produce Event 1
App->>Client: Send Event 1
Client->>Client: Process Event 1
App->>App: Produce Event 2
App->>Client: Send Event 2
Client->>Client: Process Event 2
Note over App: Continue producing events...
Note over Client: Continue consuming events...
Client->>App: Closes the connection
App->>App: Cleans everything up
How streaming works¶
When to use streaming? When you have a single-directional stream of events.
For example:
LLM responses
Logs
Telemetry
Financial services
Sporting events
Live locations
Do not use streaming for regular responses with regular data. Only use it when needed.
Just a regular controller¶
We base our API around dmr.streaming.controller.StreamingController
type which is a slightly modified subclass
of a regular Controller.
Streaming controllers support all the same features:
Different HTTP async or sync methods
Error handling (including special error handling for events)
Optional Response validation (including events validation)
etc
We utilize collections.abc.AsyncIterator
protocol to model async event sources.
Existing streaming formats¶
We support:
JsonLines is a simple format, where events are streamed line by line, and all lines are valid JSON objects.
Read more: Json Lines
1import dataclasses
2from collections.abc import AsyncIterator
3
4from dmr.plugins.msgspec import MsgspecSerializer
5from dmr.streaming.jsonl import JsonLinesController
6
7
8@dataclasses.dataclass(frozen=True, slots=True)
9class _User:
10 email: str
11
12
13class UserEventsController(JsonLinesController[MsgspecSerializer]):
14 async def get(self) -> AsyncIterator[_User]:
15 return self.produce_user_events()
16
17 async def produce_user_events(self) -> AsyncIterator[_User]:
18 # You can send any complex data that can be serialized
19 # by the controller's serializer:
20 yield _User(email='first@example.com')
21 yield _User(email='second@example.com')
22
Run result
$ curl http://127.0.0.1:8000/api/user/events/ -X GET
{"email":"first@example.com"}
{"email":"second@example.com"}
OpenAPI Schema
Preview openapi.json
{
"components": {
"schemas": {
"ErrorDetail": {
"description": "Base schema for error details description.",
"properties": {
"loc": {
"items": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"type": "array"
},
"msg": {
"type": "string"
},
"type": {
"type": "string"
}
},
"required": [
"msg"
],
"title": "ErrorDetail",
"type": "object"
},
"ErrorModel": {
"description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ErrorDetail"
},
"type": "array"
}
},
"required": [
"detail"
],
"title": "ErrorModel",
"type": "object"
},
"_User": {
"properties": {
"email": {
"type": "string"
}
},
"required": [
"email"
],
"title": "_User",
"type": "object"
}
},
"securitySchemes": {}
},
"info": {
"title": "Django Modern Rest",
"version": "0.1.0"
},
"openapi": "3.2.0",
"paths": {
"/api/usereventscontroller/": {
"get": {
"deprecated": false,
"operationId": "getUsereventscontrollerApiUsereventscontroller",
"responses": {
"200": {
"content": {
"application/jsonl": {
"itemSchema": {
"$ref": "#/components/schemas/_User"
}
}
},
"description": "OK",
"headers": {
"Cache-Control": {
"required": true,
"schema": {
"type": "string"
}
},
"Connection": {
"required": true,
"schema": {
"type": "string"
}
},
"X-Accel-Buffering": {
"required": true,
"schema": {
"type": "string"
}
}
}
},
"406": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when provided `Accept` header cannot be satisfied"
},
"422": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when returned response does not match the response schema"
}
}
}
}
}
}
Server Sent Events is very similar: you can put any data in data:
field, but it has more semantics and is more customizable.
Read more: Server Sent Events
1import dataclasses
2from collections.abc import AsyncIterator
3
4from dmr.plugins.msgspec import MsgspecSerializer
5from dmr.streaming.sse import SSEController, SSEvent
6
7
8@dataclasses.dataclass(frozen=True, slots=True)
9class _User:
10 email: str
11
12
13class UserEventsController(SSEController[MsgspecSerializer]):
14 async def get(self) -> AsyncIterator[SSEvent[_User]]:
15 return self.produce_user_events()
16
17 async def produce_user_events(self) -> AsyncIterator[SSEvent[_User]]:
18 # You can send any complex data that can be serialized
19 # by the controller's serializer,
20 # all SSEvent fields can be customized:
21 yield SSEvent(
22 _User(email='first@example.com'),
23 event='user',
24 )
25 yield SSEvent(
26 _User(email='second@example.com'),
27 event='user',
28 )
29
Run result
$ curl http://127.0.0.1:8000/api/user/events/ -X GET
event: user
data: {"email":"first@example.com"}
event: user
data: {"email":"second@example.com"}
OpenAPI Schema
Preview openapi.json
{
"components": {
"schemas": {
"ErrorDetail": {
"description": "Base schema for error details description.",
"properties": {
"loc": {
"items": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"type": "array"
},
"msg": {
"type": "string"
},
"type": {
"type": "string"
}
},
"required": [
"msg"
],
"title": "ErrorDetail",
"type": "object"
},
"ErrorModel": {
"description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ErrorDetail"
},
"type": "array"
}
},
"required": [
"detail"
],
"title": "ErrorModel",
"type": "object"
},
"SSEvent__User_": {
"description": "Server sent event payload.",
"properties": {
"comment": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"data": {
"$ref": "#/components/schemas/_User"
},
"event": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
},
{
"type": "null"
}
]
},
"retry": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
]
}
},
"required": [
"data"
],
"title": "SSEvent[_User]",
"type": "object"
},
"_User": {
"properties": {
"email": {
"type": "string"
}
},
"required": [
"email"
],
"title": "_User",
"type": "object"
}
},
"securitySchemes": {}
},
"info": {
"title": "Django Modern Rest",
"version": "0.1.0"
},
"openapi": "3.2.0",
"paths": {
"/api/usereventscontroller/": {
"get": {
"deprecated": false,
"operationId": "getUsereventscontrollerApiUsereventscontroller",
"responses": {
"200": {
"content": {
"text/event-stream": {
"itemSchema": {
"$ref": "#/components/schemas/SSEvent__User_"
}
}
},
"description": "OK",
"headers": {
"Cache-Control": {
"required": true,
"schema": {
"type": "string"
}
},
"Connection": {
"required": true,
"schema": {
"type": "string"
}
},
"X-Accel-Buffering": {
"required": true,
"schema": {
"type": "string"
}
}
}
},
"406": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when provided `Accept` header cannot be satisfied"
},
"422": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when returned response does not match the response schema"
}
}
}
}
}
}
What happens in these examples?
We create our controller as usual, except we are using
dmr.streaming.controller.StreamingControllersubtypes instead of a regularControllerclassWe return
typing.AsyncIteratorinstance instead of a single data item
Choose the one that fits your needs, or create your own format!
Important
All streaming modes require Django to run in ASGI mode in production.
Handling errors¶
Streaming controllers have two layers of error handling:
Default error handling works the same way for request / response phase, see Error handling for more information
Per-event error handling, which starts when the streaming connection is established and the events are produced. Such errors are handled in
handle_event_error()method
The second layer of event error handling is unique for streaming controllers.
It works for several cases:
When event validation fails (mostly useful in development)
When there’s an error in the event producer
Let’s see how it can be customized.
1from collections.abc import AsyncIterator
2from typing import Any
3
4from typing_extensions import override
5
6from dmr.plugins.pydantic import PydanticSerializer
7from dmr.streaming.jsonl import Json, JsonLinesController
8
9
10class NumberEventsController(JsonLinesController[PydanticSerializer]):
11 async def get(self) -> AsyncIterator[Json]:
12 return self._valid_events()
13
14 @override
15 async def handle_event_error(self, exc: Exception) -> Any:
16 if isinstance(exc, ZeroDivisionError):
17 return {'error': 'zero divizion'}
18 return await super().handle_event_error(exc)
19
20 async def _valid_events(self) -> AsyncIterator[Json]:
21 yield 1
22 # Error here:
23 yield 1 / 0 # noqa: WPS344
24 # Won't be sent:
25 yield 2
26
Run result
$ curl http://127.0.0.1:8000/api/numbereventscontroller/ -X GET
1
{"error":"zero divizion"}
1from collections.abc import AsyncIterator
2from typing import Any
3
4from typing_extensions import override
5
6from dmr.plugins.pydantic import PydanticSerializer
7from dmr.streaming.sse import SSEController, SSEvent
8
9
10class NumberEventsController(SSEController[PydanticSerializer]):
11 async def get(self) -> AsyncIterator[SSEvent[float]]:
12 return self._valid_events()
13
14 @override
15 async def handle_event_error(self, exc: Exception) -> Any:
16 if isinstance(exc, ZeroDivisionError):
17 return SSEvent(b'zero divizion', event='error', serialize=False)
18 return await super().handle_event_error(exc)
19
20 async def _valid_events(self) -> AsyncIterator[SSEvent[float]]:
21 yield SSEvent(1)
22 # Error here:
23 yield SSEvent(1 / 0) # noqa: WPS344
24 # Won't be sent:
25 yield SSEvent(2)
26
Run result
$ curl http://127.0.0.1:8000/api/numbereventscontroller/ -X GET
data: 1
event: error
data: zero divizion
Warning
Please, note that new events won’t be produced if the error happens
in the async generator itself. If you want to handle errors there as well,
use try/except right inside the event producing async generator.
Handling disconnects¶
If you need to immediately close the response stream, you can raise
StreamingCloseError
or asyncio.CancelledError
inside the events producing async iterator.
Async clients can disconnect at any time. We always handle this error gracefully.
See also
Response disconnect docs: https://docs.djangoproject.com/en/stable/ref/request-response/#request-response-streaming-disconnect
Validation¶
All our regular response validation rules are applied to the SSE controllers as well. We strictly validate that all headers / cookies / etc are listed in the endpoint’s metadata.
We follow the regular rules for response validation and it can be disabled
by setting validate_responses=False on the needed level.
We also validate events structure, when streaming them to end users. It is recommended to be turned on in development and turned off in production.
Rules:
If endpoint specified
validate_eventsboolean value, we use itIf endpoint does not specify this flag, but controller does, we use it
If controller does not specify this flag, but settings does, we use it
If no explicit
validate_eventsboolean value is specified, we fallback tovalidate_responsesvalue
Both validate() and modify()
support this flag:
1from collections.abc import AsyncIterator
2from typing import Any
3
4from dmr import modify
5from dmr.plugins.msgspec import MsgspecSerializer
6from dmr.streaming.sse import SSEController, SSEvent
7
8
9class UserEventsController(SSEController[MsgspecSerializer]):
10 @modify(validate_events=False)
11 async def get(self) -> AsyncIterator[SSEvent[int]]:
12 return self.produce_user_events()
13
14 async def produce_user_events(self) -> AsyncIterator[SSEvent[Any]]:
15 yield SSEvent('not-an-int')
16
Run result
$ curl http://127.0.0.1:8000/api/user/events/ -X GET
data: "not-an-int"
1from collections.abc import AsyncIterator
2from typing import Any
3
4from dmr.plugins.msgspec import MsgspecSerializer
5from dmr.streaming.sse import SSEController, SSEvent
6
7
8class UserEventsController(SSEController[MsgspecSerializer]):
9 validate_events = False
10
11 async def get(self) -> AsyncIterator[SSEvent[int]]:
12 return self.produce_user_events()
13
14 async def produce_user_events(self) -> AsyncIterator[SSEvent[Any]]:
15 yield SSEvent('not-an-int')
16
Run result
$ curl http://127.0.0.1:8000/api/user/events/ -X GET
data: "not-an-int"
See validate_events setting.
>>> from dmr.settings import Settings
>>> DMR_SETTINGS = {Settings.validate_events: False}
How do we know the model for events to be validated against?
It might be specified as the
return_typein theResponseSpecof@validatefor the given status codeIt might be specified as the type argument to generic
collections.abc.AsyncIteratorreturn type in@modifystyled endpoint.
We fallback to typing.Any if we can’t infer the event model.
Further reading¶
See also
Best practices for working with async iterators and generators in Python: https://docs.python.org/3.15/library/asyncio-dev.html#asynchronous-generators-best-practices
Streaming individual lines of json.
Streaming Server Sent Events.
Advanced topics¶
Negotiation for different formats¶
If you need different event formats for different users / usecases, then you can use the regular Content negotiation process.
Send different Accept headers to get different event streams back:
1from collections.abc import AsyncGenerator, AsyncIterator
2from contextlib import aclosing
3from typing import Annotated
4
5from typing_extensions import override
6
7from dmr.negotiation import ContentType, conditional_type
8from dmr.plugins.pydantic import PydanticSerializer
9from dmr.settings import default_renderer
10from dmr.streaming import StreamingController
11from dmr.streaming.jsonl import Json
12from dmr.streaming.jsonl.renderer import JsonLinesRenderer
13from dmr.streaming.jsonl.validation import JsonLinesStreamingValidator
14from dmr.streaming.renderer import StreamingRenderer
15from dmr.streaming.sse import SSEvent
16from dmr.streaming.sse.renderer import SSERenderer
17from dmr.streaming.sse.validation import SSEStreamingValidator
18
19
20class EventStreaming(StreamingController[PydanticSerializer]):
21 @classmethod
22 @override
23 def streaming_renderers(
24 cls,
25 serializer: type[PydanticSerializer],
26 ) -> list[StreamingRenderer]:
27 return [
28 SSERenderer(
29 serializer,
30 default_renderer,
31 SSEStreamingValidator,
32 ),
33 JsonLinesRenderer(
34 serializer,
35 default_renderer,
36 JsonLinesStreamingValidator,
37 ),
38 ]
39
40 async def get(
41 self,
42 ) -> AsyncIterator[
43 Annotated[
44 SSEvent[Json] | Json,
45 conditional_type({
46 ContentType.jsonl: Json,
47 ContentType.event_stream: SSEvent[Json],
48 }),
49 ],
50 ]:
51 return self._valid_events()
52
53 async def _valid_events(self) -> AsyncIterator[SSEvent[Json] | Json]:
54 async with aclosing(self._source()) as source:
55 async for event in source:
56 if self.request.accepts(ContentType.event_stream):
57 yield SSEvent(event)
58 else:
59 yield event
60
61 async def _source(self) -> AsyncGenerator[Json]:
62 yield {'a': 1}
63 yield ['b', 2]
64
Run result
$ curl http://127.0.0.1:8000/api/user/events/ -X GET -H 'Accept: application/jsonl, application/json'
{"a":1}
["b",2]
$ curl http://127.0.0.1:8000/api/user/events/ -X GET -H 'Accept: text/event-stream, application/json'
data: {"a":1}
data: ["b",2]
OpenAPI Schema
Preview openapi.json
{
"components": {
"schemas": {
"ErrorDetail": {
"description": "Base schema for error details description.",
"properties": {
"loc": {
"items": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"title": "Loc",
"type": "array"
},
"msg": {
"title": "Msg",
"type": "string"
},
"type": {
"title": "Type",
"type": "string"
}
},
"required": [
"msg"
],
"title": "ErrorDetail",
"type": "object"
},
"ErrorModel": {
"description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ErrorDetail"
},
"title": "Detail",
"type": "array"
}
},
"required": [
"detail"
],
"title": "ErrorModel",
"type": "object"
},
"SSEvent": {
"description": "Server sent event payload.",
"properties": {
"comment": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Comment"
},
"data": {
"title": "Data"
},
"event": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Event"
},
"id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Id"
},
"retry": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"title": "Retry"
}
},
"required": [
"data"
],
"title": "SSEvent",
"type": "object"
}
},
"securitySchemes": {}
},
"info": {
"title": "Django Modern Rest",
"version": "0.1.0"
},
"openapi": "3.2.0",
"paths": {
"/api/eventstreaming/": {
"get": {
"deprecated": false,
"operationId": "getEventstreamingApiEventstreaming",
"responses": {
"200": {
"content": {
"application/jsonl": {
"itemSchema": {}
},
"text/event-stream": {
"itemSchema": {
"$ref": "#/components/schemas/SSEvent"
}
}
},
"description": "OK",
"headers": {
"Cache-Control": {
"required": true,
"schema": {
"type": "string"
}
},
"Connection": {
"required": true,
"schema": {
"type": "string"
}
},
"X-Accel-Buffering": {
"required": true,
"schema": {
"type": "string"
}
}
}
},
"406": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when provided `Accept` header cannot be satisfied"
},
"422": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when returned response does not match the response schema"
}
}
}
}
}
}
This is a really advanced feature that is not required in 99% of cases.
Here we define our own StreamingController
subclass and override a special method to instantiate streaming renderers
streaming_renderers().
Next we define an API endpoint for GET method.
We use conditional_type() function to specify which
type will be returned in which case for the OpenAPI metadata.
And _event_source() method which will provide events for both of the formats.
The last thing we do is we check what Accept header
we are working with and provide an appropriate format for each case.
API Reference¶
Controllers¶
- class dmr.streaming.controller.StreamingController(**kwargs)[source]¶
Bases:
Controller[_SerializerT_co]Base class for all streaming controllers.
It can be used directly, but the most use-cases will be fine with just using the specific streaming protocol.
- endpoint_cls¶
alias of
_StreamingEndpoint
- async handle_event_error(exc: Exception) Any[source]¶
Error handler for the events.
Is called when the
StreamingResponseis iterated in the ASGI handler.
- ping_event() Any | None[source]¶
Return a ping event to be generated if this streaming needs it.
By default pings are disabled for
StreamingControllertypes. Pings must be explicitly enabled in subclasses.If
streaming_ping_secondsis set, this method will be called.
- streaming_ping_seconds: ClassVar[float | None] = None¶
Optional ping keep alive event support.
Some servers might close long living connections with no activity. Specify number in second how long should we wait between events. If we wait longer, we will send a ping event. The payload of the ping event is defined in
ping_event().By default it is disabled. It is only enabled in the SSE streaming.
- abstractmethod classmethod streaming_renderers(serializer: type[_SerializerT_co]) Iterable[StreamingRenderer][source]¶
Returns the iterable of streaming renderers for this controller.
- streaming_response_cls¶
Streaming response type to customize.
alias of
StreamingResponse
- to_stream(streaming_content: AsyncIterator[Any], *, status_code: HTTPStatus = HTTPStatus.OK, headers: Mapping[str, str] | None = None, cookies: Mapping[str, NewCookie] | None = None, regular_renderer: Renderer | None = None, streaming_renderer: StreamingRenderer | None = None, streaming_validator: StreamingValidator | None = None) StreamingResponse[source]¶
Convert streaming content to a streaming response.
- Parameters:
streaming_content – Async iterator which will produce the streaming events.
status_code – Response status code. Defaults to
200.headers – Response extra headers.
cookies – Response cookies.
regular_renderer – Renderer instance to render event payloads.
streaming_renderer – StreamingRenderer instance to render the event stream.
streaming_validator – StreamingValidator instance to validate events one by one if
validate_eventsis set toTrue.
- Returns:
StreamingResponseinstance with all the required properties set.
Responses¶
- class dmr.streaming.stream.StreamingResponse(streaming_content: AsyncIterator[Any], controller: StreamingController[BaseSerializer], *, regular_renderer: Renderer, streaming_renderer: StreamingRenderer, streaming_validator: StreamingValidator, headers: Mapping[str, str] | None = None, status_code: HTTPStatus = HTTPStatus.OK)[source]¶
Bases:
HttpResponseBaseOur own response subclass to mark that we explicitly return SSE.
Converts events to
byteswith the help of a passed serializer and renderer types.We don’t inherit from
django.http.StreamingHttpResponsehere, because it has a strict API for streaming that we can’t use or customize.- __aiter__() AsyncIterator[bytes][source]¶
ASGI handler protocol for streaming responses.
When
streamingisTrue, ASGI handler will async iterate over the response object.When doing so, we will be inside the ASGI handler already. No DMR error handling will work.
- __iter__() Iterator[bytes][source]¶
In development it is useful to have sync interface for streaming.
This is a part of the WSGI handler protocol.
Danger
Do not use this in production! We even added a special error to catch this. In production you must use ASGI servers like
uvicornwith streaming.This implementation has a lot of limitations. Be careful even in development.
Renderers¶
- class dmr.streaming.renderer.StreamingRenderer(serializer: type[BaseSerializer], regular_renderer: Renderer, streaming_validator_cls: type[StreamingValidator])[source]¶
Bases:
RendererBase class for all streaming responses.
It is different from the regular
Rendererin several ways:We need to initialize this renderer with a subrenderer, which will render the individual events itself
Serializer is needed to serialize events
Validator is needed to optionally validate events
- streaming: ClassVar[Literal[True]] = True¶
Whether or not this renderer is used for streaming responses.
- property validation_parser: _NoOpParser¶
Returns a parser that can parse what this renderer rendered.
Why? Because when
validate_responsesisTrue, we parse the response body once again to see if it fits the schema.That’s why all renderers must know how to unparse its results.
Validation¶
- class dmr.streaming.validation.StreamingValidator(event_model: Any, serializer: type[BaseSerializer], *, validate_events: bool)[source]¶
Injects itself into the stream of SSE to validate the events.
This is very different from any other validator. Why?
Because we send just one response. No events can be produced at all for a long period of time. Some events can be correct, while other can be wrong
We can’t close the connection when finding wrong events, it will be a big problem for our users and it would be hard to debug
But, we can modify events to be
errorevents instead!When validation is active and the event is either not the model we expect or has the wrong payload type - we send the error event
- classmethod from_controller(controller: StreamingController[BaseSerializer], status_code: HTTPStatus) Self[source]¶
Construct validator from a controller instance.
Inferences event type model from the endpoint metadata. Also knows whether or not the events validation is turned on or not.