Json Lines¶
Standard: https://jsonlines.org
Our jsonl implementation allows users to follow the standard above.
Using JsonLines¶
You can use JsonLines format with both validate()
and modify() style endpoints:
1import dataclasses
2from collections.abc import AsyncIterator
3
4from dmr.plugins.msgspec import MsgspecSerializer
5from dmr.streaming.jsonl import JsonLinesController
6
7
8@dataclasses.dataclass(frozen=True, slots=True)
9class _User:
10 email: str
11
12
13class UserEventsController(JsonLinesController[MsgspecSerializer]):
14 async def get(self) -> AsyncIterator[_User]:
15 return self.produce_user_events()
16
17 async def produce_user_events(self) -> AsyncIterator[_User]:
18 # You can send any complex data that can be serialized
19 # by the controller's serializer:
20 yield _User(email='first@example.com')
21 yield _User(email='second@example.com')
22
Run result
$ curl http://127.0.0.1:8000/api/user/events/ -X GET
{"email":"first@example.com"}
{"email":"second@example.com"}
OpenAPI Schema
Preview openapi.json
{
"components": {
"schemas": {
"ErrorDetail": {
"description": "Base schema for error details description.",
"properties": {
"loc": {
"items": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"type": "array"
},
"msg": {
"type": "string"
},
"type": {
"type": "string"
}
},
"required": [
"msg"
],
"title": "ErrorDetail",
"type": "object"
},
"ErrorModel": {
"description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ErrorDetail"
},
"type": "array"
}
},
"required": [
"detail"
],
"title": "ErrorModel",
"type": "object"
},
"_User": {
"properties": {
"email": {
"type": "string"
}
},
"required": [
"email"
],
"title": "_User",
"type": "object"
}
},
"securitySchemes": {}
},
"info": {
"title": "Django Modern Rest",
"version": "0.1.0"
},
"openapi": "3.2.0",
"paths": {
"/api/usereventscontroller/": {
"get": {
"deprecated": false,
"operationId": "getUsereventscontrollerApiUsereventscontroller",
"responses": {
"200": {
"content": {
"application/jsonl": {
"itemSchema": {
"$ref": "#/components/schemas/_User"
}
}
},
"description": "OK",
"headers": {
"Cache-Control": {
"required": true,
"schema": {
"type": "string"
}
},
"Connection": {
"required": true,
"schema": {
"type": "string"
}
},
"X-Accel-Buffering": {
"required": true,
"schema": {
"type": "string"
}
}
}
},
"406": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when provided `Accept` header cannot be satisfied"
},
"422": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when returned response does not match the response schema"
}
}
}
}
}
}
1import dataclasses
2from collections.abc import AsyncIterator
3
4from dmr import validate
5from dmr.negotiation import ContentType
6from dmr.plugins.msgspec import MsgspecSerializer
7from dmr.streaming import StreamingResponse, streaming_response_spec
8from dmr.streaming.jsonl import JsonLinesController
9
10
11@dataclasses.dataclass(frozen=True, slots=True)
12class _User:
13 email: str
14
15
16class UserEventsController(JsonLinesController[MsgspecSerializer]):
17 @validate(
18 streaming_response_spec(
19 _User,
20 content_type=ContentType.jsonl,
21 ),
22 )
23 async def get(self) -> StreamingResponse:
24 return self.to_stream(self.produce_user_events())
25
26 async def produce_user_events(self) -> AsyncIterator[_User]:
27 # You can send any complex data that can be serialized
28 # by the controller's serializer:
29 yield _User(email='first@example.com')
30 yield _User(email='second@example.com')
31
Run result
$ curl http://127.0.0.1:8000/api/user/events/ -X GET
{"email":"first@example.com"}
{"email":"second@example.com"}
OpenAPI Schema
Preview openapi.json
{
"components": {
"schemas": {
"ErrorDetail": {
"description": "Base schema for error details description.",
"properties": {
"loc": {
"items": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"type": "array"
},
"msg": {
"type": "string"
},
"type": {
"type": "string"
}
},
"required": [
"msg"
],
"title": "ErrorDetail",
"type": "object"
},
"ErrorModel": {
"description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ErrorDetail"
},
"type": "array"
}
},
"required": [
"detail"
],
"title": "ErrorModel",
"type": "object"
},
"_User": {
"properties": {
"email": {
"type": "string"
}
},
"required": [
"email"
],
"title": "_User",
"type": "object"
}
},
"securitySchemes": {}
},
"info": {
"title": "Django Modern Rest",
"version": "0.1.0"
},
"openapi": "3.2.0",
"paths": {
"/api/usereventscontroller/": {
"get": {
"deprecated": false,
"operationId": "getUsereventscontrollerApiUsereventscontroller",
"responses": {
"200": {
"content": {
"application/jsonl": {
"itemSchema": {
"$ref": "#/components/schemas/_User"
}
}
},
"description": "OK",
"headers": {
"Cache-Control": {
"required": true,
"schema": {
"type": "string"
}
},
"Connection": {
"required": true,
"schema": {
"type": "string"
}
},
"X-Accel-Buffering": {
"required": true,
"schema": {
"type": "string"
}
}
}
},
"406": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when provided `Accept` header cannot be satisfied"
},
"422": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when returned response does not match the response schema"
}
}
}
}
}
}
What happens in these examples?
We define an event producing method
produce_user_eventsyielding events one by one. It returns ancollections.abc.AsyncIteratorinstanceIt must produces instances of objects that can be serialized to
jsonwith a serializer of your choice. These events will be renderer into a streamWe define a special
JsonLinesControllerclass that has regulargetHTTP endpoint. In@modifyexample it returns the async generator directly, while in@validateexample it returns thedmr.streaming.stream.StreamingResponseinstanceNext, ASGI will take the returned data and stream events to your users
See also
Async Generators in Python: https://peps.python.org/pep-0525
Streaming in ASGI: https://asgi.readthedocs.io/en/latest/specs/main.html
Important
Our streaming implementation will not work with a WSGI handler in production.
Why? Because streaming is a long-living connection by design.
WSGI handlers have very limited number of connections.
Basically number_of_workers * number_of_threads,
just a very small number of streaming clients will completely
block all other work on the server.
Use ASGI for all streaming endpoints. This will give you the best of two worlds: simple sync Django for the major part of your code base and some async endpoints where you need them. See our guide.
However, we allow running streaming with WSGI
if settings.DEBUG is True for local development and testing.
In a very limited compatibility mode.
Using components¶
If you want to parse any incoming data, you can do it the same way as in any other controller.
JsonL supports passing any type of data to the endpoint.
1from collections.abc import AsyncIterator
2
3import msgspec
4
5from dmr.components import Headers
6from dmr.plugins.msgspec import MsgspecSerializer
7from dmr.streaming.jsonl import Json, JsonLinesController
8
9
10class HeaderModel(msgspec.Struct):
11 last_event_id: int | None = msgspec.field(
12 default=None,
13 name='Last-Event-ID',
14 )
15
16
17class UserEventsController(JsonLinesController[MsgspecSerializer]):
18 def get(
19 self,
20 parsed_headers: Headers[HeaderModel],
21 ) -> AsyncIterator[Json]:
22 return self.produce_user_events(parsed_headers)
23
24 async def produce_user_events(
25 self,
26 parsed_headers: HeaderModel,
27 ) -> AsyncIterator[Json]:
28 if parsed_headers.last_event_id is None:
29 yield 'starting from scratch'
30 else:
31 yield f'starting from {parsed_headers.last_event_id}'
32
Run result
$ curl http://127.0.0.1:8000/api/usereventscontroller/ -X GET
"starting from scratch"
$ curl http://127.0.0.1:8000/api/usereventscontroller/ -X GET -H 'Last-Event-ID: 5'
"starting from 5"
$ curl http://127.0.0.1:8000/api/usereventscontroller/ -D - -X GET -H 'Last-Event-ID: abc'
HTTP/1.1 400 Bad Request
date: Sun, 29 Mar 2026 18:53:16 GMT
server: uvicorn
Content-Type: application/json
X-Frame-Options: DENY
Vary: Accept-Language
Content-Language: en
Content-Length: 114
X-Content-Type-Options: nosniff
Referrer-Policy: same-origin
Cross-Origin-Opener-Policy: same-origin
{"detail":[{"msg":"Expected `int | null`, got `str` - at `$.parsed_headers.last_event_id`","type":"value_error"}]}
OpenAPI Schema
Preview openapi.json
{
"components": {
"schemas": {
"ErrorDetail": {
"description": "Base schema for error details description.",
"properties": {
"loc": {
"items": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"type": "array"
},
"msg": {
"type": "string"
},
"type": {
"type": "string"
}
},
"required": [
"msg"
],
"title": "ErrorDetail",
"type": "object"
},
"ErrorModel": {
"description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ErrorDetail"
},
"type": "array"
}
},
"required": [
"detail"
],
"title": "ErrorModel",
"type": "object"
}
},
"securitySchemes": {}
},
"info": {
"title": "Django Modern Rest",
"version": "0.1.0"
},
"openapi": "3.2.0",
"paths": {
"/api/usereventscontroller/": {
"get": {
"deprecated": false,
"operationId": "getUsereventscontrollerApiUsereventscontroller",
"parameters": [
{
"deprecated": false,
"in": "header",
"name": "Last-Event-ID",
"schema": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
]
}
}
],
"responses": {
"200": {
"content": {
"application/jsonl": {
"itemSchema": {}
}
},
"description": "OK",
"headers": {
"Cache-Control": {
"required": true,
"schema": {
"type": "string"
}
},
"Connection": {
"required": true,
"schema": {
"type": "string"
}
},
"X-Accel-Buffering": {
"required": true,
"schema": {
"type": "string"
}
}
}
},
"400": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when request components cannot be parsed"
},
"406": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when provided `Accept` header cannot be satisfied"
},
"422": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when returned response does not match the response schema"
}
}
}
}
}
}
We are using a regular approach
with the Headers component.
Note
Use Last-Event-ID header to handle reconnects to start sending
events to the client from the last consumed one.
See also
Read our Components guide.
Auth¶
JsonL endpoints fully support any style of auth that you might need.
Here’s an example with
JWTAsyncAuth class:
1import dataclasses
2from collections.abc import AsyncIterator
3
4from dmr.plugins.msgspec import MsgspecSerializer
5from dmr.security.jwt import JWTAsyncAuth
6from dmr.streaming.jsonl import JsonLinesController
7
8
9@dataclasses.dataclass(frozen=True, slots=True)
10class _User:
11 email: str
12
13
14class UserEventsController(JsonLinesController[MsgspecSerializer]):
15 auth = (JWTAsyncAuth(),)
16
17 async def get(self) -> AsyncIterator[_User]:
18 return self.produce_user_events()
19
20 async def produce_user_events(self) -> AsyncIterator[_User]:
21 yield _User(email='first@example.com')
22
Run result
$ curl http://127.0.0.1:8000/api/user/events/ -X GET
{"detail":[{"msg":"Not authenticated","type":"security"}]}
OpenAPI Schema
Preview openapi.json
{
"components": {
"schemas": {
"ErrorDetail": {
"description": "Base schema for error details description.",
"properties": {
"loc": {
"items": {
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"type": "array"
},
"msg": {
"type": "string"
},
"type": {
"type": "string"
}
},
"required": [
"msg"
],
"title": "ErrorDetail",
"type": "object"
},
"ErrorModel": {
"description": "Default error response schema.\n\nCan be customized.\nSee :ref:`customizing-error-messages` for more details.",
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ErrorDetail"
},
"type": "array"
}
},
"required": [
"detail"
],
"title": "ErrorModel",
"type": "object"
},
"_User": {
"properties": {
"email": {
"type": "string"
}
},
"required": [
"email"
],
"title": "_User",
"type": "object"
}
},
"securitySchemes": {
"jwt": {
"bearerFormat": "JWT",
"description": "JWT token auth",
"scheme": "Bearer",
"type": "http"
}
}
},
"info": {
"title": "Django Modern Rest",
"version": "0.1.0"
},
"openapi": "3.2.0",
"paths": {
"/api/usereventscontroller/": {
"get": {
"deprecated": false,
"operationId": "getUsereventscontrollerApiUsereventscontroller",
"responses": {
"200": {
"content": {
"application/jsonl": {
"itemSchema": {
"$ref": "#/components/schemas/_User"
}
}
},
"description": "OK",
"headers": {
"Cache-Control": {
"required": true,
"schema": {
"type": "string"
}
},
"Connection": {
"required": true,
"schema": {
"type": "string"
}
},
"X-Accel-Buffering": {
"required": true,
"schema": {
"type": "string"
}
}
}
},
"401": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when auth was not successful"
},
"406": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when provided `Accept` header cannot be satisfied"
},
"422": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorModel"
}
}
},
"description": "Raised when returned response does not match the response schema"
}
},
"security": [
{
"jwt": []
}
]
}
}
}
}
See also
Read our How authentication works guide.
Best practices¶
django-modern-rest implements a bunch of best practices for streaming SSE:
Connection: keep-aliveheader keeps the connection openCache-Control: no-cacheheader prevents caching the stream responseX-Accel-Buffering: noheader prevents proxy response buffering in some proxy servers like Nginx
Everything just works out of the box, you don’t have to do anything.
However, we don’t send ping events by default, because the format
for them is not well defined in jsonl.
You can enable them by changing
streaming_ping_seconds
to the maximum number of second before the ping event happens.
And ping_event()
for the event payload.
API Reference¶
Controller¶
- class dmr.streaming.jsonl.controller.JsonLinesController(**kwargs)[source]¶
Bases:
StreamingController[_SerializerT_co]Controller for streaming json lines (JsonL).
See also
Json Lines standard: https://jsonlines.org
Danger
WSGI handers do not support streaming responses, including JsonLines, by default. You would need to use ASGI handler for streaming endpoints.
We allow running streaming during
settings.DEBUGbuilds for debugging. But, in production we will raiseRuntimeErrorwhen WSGI handler will be detected used together with JsonLines.- async handle_event_error(exc: Exception) Any[source]¶
Handles errors that can happen while sending events.
Return alternative event that will indicate what error has happened. By default does nothing and just reraises the exception.
- classmethod streaming_renderers(serializer: type[_SerializerT_co]) list[StreamingRenderer][source]¶
Returns the streaming renderer.
- streaming_validator_cls¶
alias of
JsonLinesStreamingValidator
Renderer¶
- class dmr.streaming.jsonl.renderer.JsonLinesRenderer(serializer: type[BaseSerializer], regular_renderer: Renderer, streaming_validator_cls: type[StreamingValidator], *, sep: bytes = b'\n')[source]¶
Renders response as a stream of json liens.
Uses sub-renderer to render events’ data into the correct format.
Validation¶
- class dmr.streaming.jsonl.validation.JsonLinesStreamingValidator(event_model: Any, serializer: type[BaseSerializer], *, validate_events: bool)[source]¶
Bases:
StreamingValidatorInjects itself into the stream of json lines to validate the events.