Server Sent Events¶
Standard: https://html.spec.whatwg.org/multipage/server-sent-events.html
Our SSE implementation allows users to follow the standard above
or fully customize the experience for custom needs.
Using SSE¶
You can use SSE with both validate()
and modify() style endpoints:
1import dataclasses
2from collections.abc import AsyncIterator
3
4from dmr.plugins.msgspec import MsgspecSerializer
5from dmr.streaming.sse import SSEController, SSEvent
6
7
8@dataclasses.dataclass(frozen=True, slots=True)
9class _User:
10 email: str
11
12
13class UserEventsController(SSEController[MsgspecSerializer]):
14 async def get(self) -> AsyncIterator[SSEvent[_User]]:
15 return self.produce_user_events()
16
17 async def produce_user_events(self) -> AsyncIterator[SSEvent[_User]]:
18 # You can send any complex data that can be serialized
19 # by the controller's serializer,
20 # all SSEvent fields can be customized:
21 yield SSEvent(
22 _User(email='first@example.com'),
23 event='user',
24 )
25 yield SSEvent(
26 _User(email='second@example.com'),
27 event='user',
28 )
29
Run result
$ curl http://127.0.0.1:8000/api/user/events/ -X GET
event: user
data: {"email":"first@example.com"}
event: user
data: {"email":"second@example.com"}
OpenAPI Schema
Preview openapi.json
{
"components": {
"schemas": {
"ErrorDetail": {
"description": "Base schema for error details description.",
"properties": {
"loc": {
"items": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"type": "array"
},
"msg": {
"type": "string"
},
"type": {
"type": "string"
}
},
"required": [
"msg"
],
"title": "ErrorDetail",
"type": "object"
},
"ErrorModel": {
"description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ErrorDetail"
},
"type": "array"
}
},
"required": [
"detail"
],
"title": "ErrorModel",
"type": "object"
},
"SSEvent__User_": {
"description": "Default implementation for the Server Sent Event.\n\nAll parameters are optional, but at least one is required.\n\nAttributes:\n data: Event payload.\n event: Event type.\n id: Unique event's identification.\n retry: The reconnection time.\n comment: Comment about the event.\n\n.. note::\n\n It is recommended for end users to define their own types\n that will be type-safe and will have the correct schema.\n\nSee also:\n https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields",
"properties": {
"comment": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"data": {
"$ref": "#/components/schemas/_User"
},
"event": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
},
{
"type": "null"
}
]
},
"retry": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
]
}
},
"required": [
"data"
],
"title": "SSEvent[_User]",
"type": "object"
},
"_User": {
"properties": {
"email": {
"type": "string"
}
},
"required": [
"email"
],
"title": "_User",
"type": "object"
}
},
"securitySchemes": {}
},
"info": {
"title": "Django Modern Rest",
"version": "0.1.0"
},
"openapi": "3.2.0",
"paths": {
"/api/usereventscontroller/": {
"get": {
"deprecated": false,
"operationId": "getUsereventscontrollerApiUsereventscontroller",
"responses": {
"200": {
"content": {
"text/event-stream": {
"itemSchema": {
"$ref": "#/components/schemas/SSEvent__User_"
}
}
},
"description": "OK",
"headers": {
"Cache-Control": {
"required": true,
"schema": {
"type": "string"
}
},
"Connection": {
"required": true,
"schema": {
"type": "string"
}
},
"X-Accel-Buffering": {
"required": true,
"schema": {
"type": "string"
}
}
}
},
"406": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when provided `Accept` header cannot be satisfied"
},
"422": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when returned response does not match the response schema"
}
}
}
}
}
}
1import dataclasses
2from collections.abc import AsyncIterator
3
4from dmr import validate
5from dmr.negotiation import ContentType
6from dmr.plugins.msgspec import MsgspecSerializer
7from dmr.streaming import StreamingResponse, streaming_response_spec
8from dmr.streaming.sse import SSEController, SSEvent
9
10
11@dataclasses.dataclass(frozen=True, slots=True)
12class _User:
13 email: str
14
15
16class UserEventsController(SSEController[MsgspecSerializer]):
17 @validate(
18 streaming_response_spec(
19 SSEvent[_User],
20 content_type=ContentType.event_stream,
21 ),
22 )
23 async def get(self) -> StreamingResponse:
24 return self.to_stream(self.produce_user_events())
25
26 async def produce_user_events(self) -> AsyncIterator[SSEvent[_User]]:
27 # You can send any complex data that can be serialized
28 # by the controller's serializer,
29 # all SSEvent fields can be customized:
30 yield SSEvent(
31 _User(email='first@example.com'),
32 event='user',
33 )
34 yield SSEvent(
35 _User(email='second@example.com'),
36 event='user',
37 )
38
Run result
$ curl http://127.0.0.1:8000/api/user/events/ -X GET
event: user
data: {"email":"first@example.com"}
event: user
data: {"email":"second@example.com"}
OpenAPI Schema
Preview openapi.json
{
"components": {
"schemas": {
"ErrorDetail": {
"description": "Base schema for error details description.",
"properties": {
"loc": {
"items": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"type": "array"
},
"msg": {
"type": "string"
},
"type": {
"type": "string"
}
},
"required": [
"msg"
],
"title": "ErrorDetail",
"type": "object"
},
"ErrorModel": {
"description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ErrorDetail"
},
"type": "array"
}
},
"required": [
"detail"
],
"title": "ErrorModel",
"type": "object"
},
"SSEvent__User_": {
"description": "Default implementation for the Server Sent Event.\n\nAll parameters are optional, but at least one is required.\n\nAttributes:\n data: Event payload.\n event: Event type.\n id: Unique event's identification.\n retry: The reconnection time.\n comment: Comment about the event.\n\n.. note::\n\n It is recommended for end users to define their own types\n that will be type-safe and will have the correct schema.\n\nSee also:\n https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields",
"properties": {
"comment": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"data": {
"$ref": "#/components/schemas/_User"
},
"event": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
},
{
"type": "null"
}
]
},
"retry": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
]
}
},
"required": [
"data"
],
"title": "SSEvent[_User]",
"type": "object"
},
"_User": {
"properties": {
"email": {
"type": "string"
}
},
"required": [
"email"
],
"title": "_User",
"type": "object"
}
},
"securitySchemes": {}
},
"info": {
"title": "Django Modern Rest",
"version": "0.1.0"
},
"openapi": "3.2.0",
"paths": {
"/api/usereventscontroller/": {
"get": {
"deprecated": false,
"operationId": "getUsereventscontrollerApiUsereventscontroller",
"responses": {
"200": {
"content": {
"text/event-stream": {
"itemSchema": {
"$ref": "#/components/schemas/SSEvent__User_"
}
}
},
"description": "OK",
"headers": {
"Cache-Control": {
"required": true,
"schema": {
"type": "string"
}
},
"Connection": {
"required": true,
"schema": {
"type": "string"
}
},
"X-Accel-Buffering": {
"required": true,
"schema": {
"type": "string"
}
}
}
},
"406": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when provided `Accept` header cannot be satisfied"
},
"422": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when returned response does not match the response schema"
}
}
}
}
}
}
What happens in these examples?
We define an event producing method
produce_user_eventsyielding events one by one. It returns ancollections.abc.AsyncIteratorinstanceIt must produces instances of
dmr.streaming.sse.metadata.SSEvent, which will be renderer into the streamWe define a special
SSEControllerclass that has regulargetHTTP endpoint. In@modifyexample it returns the async generator directly, while in@validateexample it returns thedmr.streaming.stream.StreamingResponseinstanceNext, ASGI will take the returned data and stream events to your users
See also
Async Generators in Python: https://peps.python.org/pep-0525
Streaming in ASGI: https://asgi.readthedocs.io/en/latest/specs/main.html
Important
Our streaming implementation will not work with a WSGI handler in production.
Why? Because streaming is a long-living connection by design.
WSGI handlers have very limited number of connections.
Basically number_of_workers * number_of_threads,
just a very small number of streaming clients will completely
block all other work on the server.
Use ASGI for all streaming endpoints. This will give you the best of two worlds: simple sync Django for the major part of your code base and some async endpoints where you need them. See our guide.
However, we allow running streaming with WSGI
if settings.DEBUG is True for local development and testing.
In a very limited compatibility mode.
Using components¶
If you want to parse any incoming data, you can do it the same way as in any other controller.
Note
Note that default EventSource JavaScript API only support
headers, cookies, query, and path parameters in GET HTTP method.
Custom implementations might use any HTTP methods and any type of parameters.
For example, if you need to parse Last-Event-ID header
(which is a part of the default EventSource spec and API):
1from collections.abc import AsyncIterator
2
3import msgspec
4
5from dmr.components import Headers
6from dmr.plugins.msgspec import MsgspecSerializer
7from dmr.streaming.sse import SSEController, SSEvent
8
9
10class HeaderModel(msgspec.Struct):
11 last_event_id: int | None = msgspec.field(
12 default=None,
13 name='Last-Event-ID',
14 )
15
16
17class UserEventsController(SSEController[MsgspecSerializer]):
18 def get(
19 self,
20 parsed_headers: Headers[HeaderModel],
21 ) -> AsyncIterator[SSEvent[str]]:
22 return self.produce_user_events(parsed_headers)
23
24 async def produce_user_events(
25 self,
26 parsed_headers: HeaderModel,
27 ) -> AsyncIterator[SSEvent[str]]:
28 if parsed_headers.last_event_id is None:
29 yield SSEvent('starting from scratch')
30 else:
31 yield SSEvent(f'starting from {parsed_headers.last_event_id}')
32
Run result
$ curl http://127.0.0.1:8000/api/usereventscontroller/ -X GET
data: "starting from scratch"
$ curl http://127.0.0.1:8000/api/usereventscontroller/ -X GET -H 'Last-Event-ID: 5'
data: "starting from 5"
$ curl http://127.0.0.1:8000/api/usereventscontroller/ -D - -X GET -H 'Last-Event-ID: abc'
HTTP/1.1 400 Bad Request
date: Sun, 29 Mar 2026 18:53:19 GMT
server: uvicorn
Content-Type: application/json
X-Frame-Options: DENY
Vary: Accept-Language
Content-Language: en
Content-Length: 114
X-Content-Type-Options: nosniff
Referrer-Policy: same-origin
Cross-Origin-Opener-Policy: same-origin
{"detail":[{"msg":"Expected `int | null`, got `str` - at `$.parsed_headers.last_event_id`","type":"value_error"}]}
OpenAPI Schema
Preview openapi.json
{
"components": {
"schemas": {
"ErrorDetail": {
"description": "Base schema for error details description.",
"properties": {
"loc": {
"items": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"type": "array"
},
"msg": {
"type": "string"
},
"type": {
"type": "string"
}
},
"required": [
"msg"
],
"title": "ErrorDetail",
"type": "object"
},
"ErrorModel": {
"description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ErrorDetail"
},
"type": "array"
}
},
"required": [
"detail"
],
"title": "ErrorModel",
"type": "object"
},
"SSEvent_str_": {
"description": "Default implementation for the Server Sent Event.\n\nAll parameters are optional, but at least one is required.\n\nAttributes:\n data: Event payload.\n event: Event type.\n id: Unique event's identification.\n retry: The reconnection time.\n comment: Comment about the event.\n\n.. note::\n\n It is recommended for end users to define their own types\n that will be type-safe and will have the correct schema.\n\nSee also:\n https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields",
"properties": {
"comment": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"data": {
"type": "string"
},
"event": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
},
{
"type": "null"
}
]
},
"retry": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
]
}
},
"required": [
"data"
],
"title": "SSEvent[str]",
"type": "object"
}
},
"securitySchemes": {}
},
"info": {
"title": "Django Modern Rest",
"version": "0.1.0"
},
"openapi": "3.2.0",
"paths": {
"/api/usereventscontroller/": {
"get": {
"deprecated": false,
"operationId": "getUsereventscontrollerApiUsereventscontroller",
"parameters": [
{
"deprecated": false,
"in": "header",
"name": "Last-Event-ID",
"schema": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
]
}
}
],
"responses": {
"200": {
"content": {
"text/event-stream": {
"itemSchema": {
"$ref": "#/components/schemas/SSEvent_str_"
}
}
},
"description": "OK",
"headers": {
"Cache-Control": {
"required": true,
"schema": {
"type": "string"
}
},
"Connection": {
"required": true,
"schema": {
"type": "string"
}
},
"X-Accel-Buffering": {
"required": true,
"schema": {
"type": "string"
}
}
}
},
"400": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when request components cannot be parsed"
},
"406": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when provided `Accept` header cannot be satisfied"
},
"422": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when returned response does not match the response schema"
}
}
}
}
}
}
We are using a regular approach
with the Headers component.
Note
Use Last-Event-ID header to handle reconnects to start sending
events to the client from the last consumed one.
See also
Read our Components guide.
Auth¶
SSE endpoints can also be protected by any instance of the async auth.
However, note that default EventSource JavaScript API
does not support passing explicit headers. There are several options:
Cookies based auth, because
EventSourcepasses all the cookies on the requestQuery string based auth, but it might be exposed in logs / etc, so make sure tokens have a really short expiration time
Using your own
EventSource
Here’s an example with
DjangoSessionAsyncAuth class:
1import dataclasses
2from collections.abc import AsyncIterator
3
4from dmr.plugins.msgspec import MsgspecSerializer
5from dmr.security.django_session import DjangoSessionAsyncAuth
6from dmr.streaming.sse import SSEController, SSEvent
7
8
9@dataclasses.dataclass(frozen=True, slots=True)
10class _User:
11 email: str
12
13
14class UserEventsController(SSEController[MsgspecSerializer]):
15 auth = (DjangoSessionAsyncAuth(),)
16
17 async def get(self) -> AsyncIterator[SSEvent[_User]]:
18 return self.produce_user_events()
19
20 async def produce_user_events(self) -> AsyncIterator[SSEvent[_User]]:
21 yield SSEvent(
22 _User(email='first@example.com'),
23 event='user',
24 )
25
Run result
$ curl http://127.0.0.1:8000/api/user/events/ -X GET
{"detail":[{"msg":"Not authenticated","type":"security"}]}
OpenAPI Schema
Preview openapi.json
{
"components": {
"schemas": {
"ErrorDetail": {
"description": "Base schema for error details description.",
"properties": {
"loc": {
"items": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"type": "array"
},
"msg": {
"type": "string"
},
"type": {
"type": "string"
}
},
"required": [
"msg"
],
"title": "ErrorDetail",
"type": "object"
},
"ErrorModel": {
"description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ErrorDetail"
},
"type": "array"
}
},
"required": [
"detail"
],
"title": "ErrorModel",
"type": "object"
},
"SSEvent__User_": {
"description": "Default implementation for the Server Sent Event.\n\nAll parameters are optional, but at least one is required.\n\nAttributes:\n data: Event payload.\n event: Event type.\n id: Unique event's identification.\n retry: The reconnection time.\n comment: Comment about the event.\n\n.. note::\n\n It is recommended for end users to define their own types\n that will be type-safe and will have the correct schema.\n\nSee also:\n https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields",
"properties": {
"comment": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"data": {
"$ref": "#/components/schemas/_User"
},
"event": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
},
{
"type": "null"
}
]
},
"retry": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
]
}
},
"required": [
"data"
],
"title": "SSEvent[_User]",
"type": "object"
},
"_User": {
"properties": {
"email": {
"type": "string"
}
},
"required": [
"email"
],
"title": "_User",
"type": "object"
}
},
"securitySchemes": {
"csrf": {
"description": "CSRF protection",
"in": "cookie",
"name": "csrftoken",
"type": "apiKey"
},
"django_session": {
"description": "Reusing standard Django auth flow for API",
"in": "cookie",
"name": "sessionid",
"type": "apiKey"
}
}
},
"info": {
"title": "Django Modern Rest",
"version": "0.1.0"
},
"openapi": "3.2.0",
"paths": {
"/api/usereventscontroller/": {
"get": {
"deprecated": false,
"operationId": "getUsereventscontrollerApiUsereventscontroller",
"responses": {
"200": {
"content": {
"text/event-stream": {
"itemSchema": {
"$ref": "#/components/schemas/SSEvent__User_"
}
}
},
"description": "OK",
"headers": {
"Cache-Control": {
"required": true,
"schema": {
"type": "string"
}
},
"Connection": {
"required": true,
"schema": {
"type": "string"
}
},
"X-Accel-Buffering": {
"required": true,
"schema": {
"type": "string"
}
}
}
},
"401": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when auth was not successful"
},
"403": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when CSRF check failed"
},
"406": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when provided `Accept` header cannot be satisfied"
},
"422": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when returned response does not match the response schema"
}
},
"security": [
{
"csrf": [],
"django_session": []
}
]
}
}
}
}
If you don’t use EventSource API, you can use any other auth
of your choice, all of them will just work.
See also
Read our How authentication works guide.
Serializing events¶
Our default class SSEvent
supports two modes:
Passing
serialize=True(default) for all event bodies, so they will be serialized with the serializer from the controller. In this mode you can pass any objects that are supported by your serializerOr passing
serialize=Falsealongside the existingbytesobject. It won’t trigger any extra serialization. It might be useful if you already have some existing binary data
Modeling business events¶
We provide our default implementation for sending events:
SSEvent
But, users are not required to use it directly.
They can create their own models, as long as they respect
SSE protocol fields.
It is quite common in SSE to model different ADTs. Because events can be of different types, they might have different data based on it. And they might also contain different other fields based on that.
For example, let’s model three different events:
If any new users are registered, send us an event with type
user,idwith the user’s id, and a username as the dataIf any new payment is made, send us
paymentevent type with{"amount": int, "currency": str}json string as the dataSometimes we send purely technical
pingevents with: pingas a comment andretry: 50instruction
Let’s model this with perfect type-safety and state-of-the-art OpenAPI schema.
1from collections.abc import AsyncIterator
2from typing import Literal, TypeAlias
3
4import pydantic
5from pydantic.json_schema import SkipJsonSchema
6
7from dmr.errors import ErrorModel
8from dmr.plugins.pydantic import PydanticSerializer
9from dmr.streaming.sse import SSEController
10
11
12class _BaseEvent(pydantic.BaseModel):
13 comment: SkipJsonSchema[str | None] = None
14 retry: SkipJsonSchema[int | None] = None
15
16 @property
17 def should_serialize_data(self) -> bool:
18 return True
19
20
21class UserEvent(_BaseEvent):
22 event: Literal['user'] = 'user'
23 id: int
24 data: str # username
25
26
27class _Payment(pydantic.BaseModel):
28 amount: int
29 currency: str
30
31
32class PaymentEvent(_BaseEvent):
33 id: SkipJsonSchema[None] = None
34 event: Literal['payment'] = 'payment'
35 data: pydantic.Json[_Payment]
36
37
38class PingEvent(pydantic.BaseModel):
39 retry: int = 100
40 comment: Literal['ping'] = 'ping'
41 id: SkipJsonSchema[None] = None
42 data: SkipJsonSchema[None] = None
43 event: SkipJsonSchema[None] = None
44
45 @property
46 def should_serialize_data(self) -> bool:
47 return False
48
49
50class ErrorEvent(_BaseEvent):
51 # Can happen if event validation will fail:
52 id: SkipJsonSchema[None] = None
53 event: Literal['error'] = 'error'
54 data: pydantic.Json[ErrorModel]
55
56
57_PossibleEvents: TypeAlias = UserEvent | PaymentEvent | PingEvent | ErrorEvent
58
59
60class ComplexEventsController(SSEController[PydanticSerializer]):
61 def get(self) -> AsyncIterator[_PossibleEvents]:
62 return self.complex_events()
63
64 async def complex_events(self) -> AsyncIterator[_PossibleEvents]:
65 yield UserEvent(id=1, data='sobolevn')
66 yield PaymentEvent(
67 data=_Payment(
68 amount=10,
69 currency='$',
70 ).model_dump_json(),
71 )
72 yield PingEvent()
73
Run result
$ curl http://127.0.0.1:8000/api/complexeventscontroller/ -X GET
id: 1
event: user
data: "sobolevn"
event: payment
data: {"amount":10,"currency":"$"}
: ping
retry: 100
OpenAPI Schema
Preview openapi.json
{
"components": {
"schemas": {
"ErrorDetail": {
"description": "Base schema for error details description.",
"properties": {
"loc": {
"items": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"title": "Loc",
"type": "array"
},
"msg": {
"title": "Msg",
"type": "string"
},
"type": {
"title": "Type",
"type": "string"
}
},
"required": [
"msg"
],
"title": "ErrorDetail",
"type": "object"
},
"ErrorEvent": {
"properties": {
"data": {
"contentMediaType": "application/json",
"contentSchema": {
"$ref": "#/components/schemas/ErrorModel"
},
"title": "Data",
"type": "string"
},
"event": {
"const": "error",
"default": "error",
"title": "Event",
"type": "string"
}
},
"required": [
"data"
],
"title": "ErrorEvent",
"type": "object"
},
"ErrorModel": {
"description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ErrorDetail"
},
"title": "Detail",
"type": "array"
}
},
"required": [
"detail"
],
"title": "ErrorModel",
"type": "object"
},
"PaymentEvent": {
"properties": {
"data": {
"contentMediaType": "application/json",
"contentSchema": {
"$ref": "#/components/schemas/_Payment"
},
"title": "Data",
"type": "string"
},
"event": {
"const": "payment",
"default": "payment",
"title": "Event",
"type": "string"
}
},
"required": [
"data"
],
"title": "PaymentEvent",
"type": "object"
},
"PingEvent": {
"properties": {
"comment": {
"const": "ping",
"default": "ping",
"title": "Comment",
"type": "string"
},
"retry": {
"default": 100,
"title": "Retry",
"type": "integer"
}
},
"title": "PingEvent",
"type": "object"
},
"UserEvent": {
"properties": {
"data": {
"title": "Data",
"type": "string"
},
"event": {
"const": "user",
"default": "user",
"title": "Event",
"type": "string"
},
"id": {
"title": "Id",
"type": "integer"
}
},
"required": [
"id",
"data"
],
"title": "UserEvent",
"type": "object"
},
"_Payment": {
"properties": {
"amount": {
"title": "Amount",
"type": "integer"
},
"currency": {
"title": "Currency",
"type": "string"
}
},
"required": [
"amount",
"currency"
],
"title": "_Payment",
"type": "object"
}
},
"securitySchemes": {}
},
"info": {
"title": "Django Modern Rest",
"version": "0.1.0"
},
"openapi": "3.2.0",
"paths": {
"/api/complexeventscontroller/": {
"get": {
"deprecated": false,
"operationId": "getComplexeventscontrollerApiComplexeventscontroller",
"responses": {
"200": {
"content": {
"text/event-stream": {
"itemSchema": {
"anyOf": [
{
"$ref": "#/components/schemas/UserEvent"
},
{
"$ref": "#/components/schemas/PaymentEvent"
},
{
"$ref": "#/components/schemas/PingEvent"
},
{
"$ref": "#/components/schemas/ErrorEvent"
}
]
}
}
},
"description": "OK",
"headers": {
"Cache-Control": {
"required": true,
"schema": {
"type": "string"
}
},
"Connection": {
"required": true,
"schema": {
"type": "string"
}
},
"X-Accel-Buffering": {
"required": true,
"schema": {
"type": "string"
}
}
}
},
"406": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when provided `Accept` header cannot be satisfied"
},
"422": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when returned response does not match the response schema"
}
}
}
}
}
}
This will also generate a correct OpenAPI spec with all the logical cases covered.
If you are still not happy with the resulting OpenAPI schema,
you can fully customize it using your serializer’s official docs.
For example, pydantic uses __get_pydantic_json_schema__ method
for this purpose.
Note
When creating custom event types, don’t forget to validate
that id and event fields do not contain: '\x00',
'\n', and '\r' chars.
Use dmr.streaming.sse.validation.check_event_field() to do that.
Best practices¶
django-modern-rest implements a bunch of best practices for streaming SSE:
Connection: keep-aliveheader keeps the connection openCache-Control: no-cacheheader prevents caching the stream responseX-Accel-Buffering: noheader prevents proxy response buffering in some proxy servers like NginxEvery 15 seconds we send
: pingkeep-alive events, when there hasn’t been any message, to prevent some servers from closing the connection as inactive. This is a direct recommendation from the SSE spec
Everything just works out of the box, you don’t have to do anything.
API Reference¶
Controller¶
- class dmr.streaming.sse.controller.SSEController(**kwargs)[source]¶
Bases:
StreamingController[_SerializerT_co]Controller for streaming Server Sent Events (SSE).
Danger
WSGI handers do not support streaming responses, including SSE, by default. You would need to use ASGI handler for streaming endpoints.
We allow running SSE during
settings.DEBUGbuilds for debugging. But, in production we will raiseRuntimeErrorwhen WSGI handler will be detected used together with streaming.- async handle_event_error(exc: Exception) Any[source]¶
Handles errors that can happen while sending events.
Return alternative event that will indicate what error has happened. By default does nothing and just reraises the exception.
- streaming_default_renderer: ClassVar[Renderer] = <dmr.plugins.msgspec.json.MsgspecJsonRenderer object>¶
Default renderer for event
bodyfield.
- streaming_ping_seconds: ClassVar[float | None] = 15.0¶
Optional ping keep alive event support.
Some servers might close long living connections with no activity. Specify number in second how long should we wait between events. If we wait longer, we will send a ping event. The payload of the ping event is defined in
ping_event().By default it is disabled. It is only enabled in the SSE streaming.
- classmethod streaming_renderers(serializer: type[_SerializerT_co]) list[StreamingRenderer][source]¶
Returns the streaming renderer.
- streaming_validator_cls¶
Validator for events, only active when
validate_eventsis set.alias of
SSEStreamingValidator
Metadata¶
- class dmr.streaming.sse.metadata.SSE(*args, **kwargs)[source]¶
Basic interface for all possible SSE implementations.
We don’t force users to use our default implementation, moreover, we encourage them to create their own event ADT and models.
- final class dmr.streaming.sse.metadata.SSEvent(data: None = None, *, event: str, id: int | str | None = None, retry: int | None = None, comment: str | None = None, serialize: bool = True)[source]¶
- final class dmr.streaming.sse.metadata.SSEvent(data: None = None, *, event: str | None = None, id: int | str, retry: int | None = None, comment: str | None = None, serialize: bool = True)
- final class dmr.streaming.sse.metadata.SSEvent(data: None = None, *, event: str | None = None, id: int | str | None = None, retry: int, comment: str | None = None, serialize: bool = True)
- final class dmr.streaming.sse.metadata.SSEvent(data: None = None, *, event: str | None = None, id: int | str | None = None, retry: int | None = None, comment: str, serialize: bool = True)
- final class dmr.streaming.sse.metadata.SSEvent(data: bytes, *, event: str | None = None, id: int | str | None = None, retry: int | None = None, comment: str | None = None, serialize: bool = True)
- final class dmr.streaming.sse.metadata.SSEvent(data: _DataT_co, *, event: str | None = None, id: int | str | None = None, retry: int | None = None, comment: str | None = None, serialize: Literal[True] = True)
Default implementation for the Server Sent Event.
All parameters are optional, but at least one is required.
- data¶
Event payload.
- Type:
dmr.streaming.sse.metadata._DataT_co
Note
It is recommended for end users to define their own types that will be type-safe and will have the correct schema.
Renderer¶
- class dmr.streaming.sse.renderer.SSERenderer(serializer: type[BaseSerializer], regular_renderer: Renderer, streaming_validator_cls: type[StreamingValidator], *, sep: bytes = b'\r\n', encoding: str = 'utf-8', linebreak: Pattern[bytes] = re.compile(b'\\r\\n|\\r|\\n'))[source]¶
Renders response as a stream of SSE.
Uses sub-renderer to render events’ data into the correct format.
Validation¶
- class dmr.streaming.sse.validation.SSEStreamingValidator(event_model: Any, serializer: type[BaseSerializer], *, validate_events: bool)[source]¶
Bases:
StreamingValidatorInjects itself into the stream of SSE to validate the events.