Source code for dmr.security.jwt.views

import datetime as dt
import uuid
from abc import abstractmethod
from collections.abc import Mapping, Sequence
from http import HTTPStatus
from typing import Any, ClassVar, Generic, Literal, TypeAlias, TypeVar

from django.conf import settings
from django.contrib.auth import aauthenticate, authenticate
from typing_extensions import TypedDict

from dmr import Body, Controller, ResponseSpec, modify
from dmr.errors import ErrorModel
from dmr.exceptions import NotAuthenticatedError
from dmr.security.jwt.token import JWToken
from dmr.serializer import BaseSerializer

_ObtainTokensT = TypeVar('_ObtainTokensT', bound=Mapping[str, Any])
_TokensResponseT = TypeVar('_TokensResponseT')
_SerializerT = TypeVar(
    '_SerializerT',
    bound=BaseSerializer,
)

_TokenType: TypeAlias = Literal['access', 'refresh']


[docs] class ObtainTokensPayload(TypedDict): """ Payload for default version of a jwt request body. Is also used as kwargs for :func:`django.contrib.auth.authenticate`. """ username: str password: str
[docs] class ObtainTokensResponse(TypedDict): """Default response type for refresh token endpoint.""" access_token: str refresh_token: str
class _BaseTokenSettings: """Collection of jwt settings that can be applied to any jwt controller.""" jwt_audiences: ClassVar[str | Sequence[str] | None] = None jwt_issuer: ClassVar[str | None] = None jwt_algorithm: ClassVar[str] = 'HS256' jwt_expiration: ClassVar[dt.timedelta] = dt.timedelta(days=1) jwt_secret: ClassVar[str | None] = None jwt_token_cls: ClassVar[type[JWToken]] = JWToken class _BaseObtainTokensSettings(_BaseTokenSettings): """Settings that can be applied to controllers with refresh tokens.""" jwt_refresh_expiration: ClassVar[dt.timedelta] = dt.timedelta(days=10) class _BaseTokenController( _BaseObtainTokensSettings, Controller[_SerializerT], ): def create_jwt_token( # noqa: WPS211 self, *, subject: str | None = None, issuer: str | None = None, audiences: str | Sequence[str] | None = None, expiration: dt.datetime | None = None, jwt_id: str | None = None, token_type: _TokenType | None = None, secret: str | None = None, algorithm: str | None = None, token_headers: dict[str, Any] | None = None, ) -> str: """Create correct jwt token of a give *expiration* and *type*.""" return self.jwt_token_cls( sub=subject or str(self.request.user.pk), exp=expiration or (dt.datetime.now(dt.UTC) + self.jwt_expiration), iss=issuer or self.jwt_issuer, aud=audiences or self.jwt_audiences, jti=jwt_id or self.make_jwt_id(), extras={'type': token_type} if token_type else {}, ).encode( secret=secret or self.jwt_secret or settings.SECRET_KEY, algorithm=algorithm or self.jwt_algorithm, headers=token_headers, ) def make_jwt_id(self) -> str | None: """Create unique token's jwt id.""" return uuid.uuid4().hex
[docs] class ObtainTokensSyncController( _BaseTokenController[_SerializerT], Generic[_SerializerT, _ObtainTokensT, _TokensResponseT], ): """ Sync controller to get access and refresh tokens. Attributes: jwt_audiences: String or sequence of string of audiences for JWT token. jwt_issuer: String of who issued this JWT token. jwt_algorithm: Default algorithm to use for token signing. jwt_expiration: Default access token expiration timedelta. jwt_refresh_expiration: Default refresh token expiration timedelta. jwt_secret: Alternative token secret for signing. By default uses ``secret.SECRET_KEY`` jwt_token_cls: Possible custom JWT token class. See also: https://pyjwt.readthedocs.io/en/stable for all the JWT terms and options explanation. """ responses = ( ResponseSpec( return_type=ErrorModel, status_code=HTTPStatus.UNAUTHORIZED, ), )
[docs] @modify(status_code=HTTPStatus.OK) def post(self, parsed_body: Body[_ObtainTokensT]) -> _TokensResponseT: """By default tokens are acquired on post.""" return self.login(parsed_body)
[docs] def login(self, parsed_body: _ObtainTokensT) -> _TokensResponseT: """Perform the sync login routine for user.""" user = authenticate( self.request, **self.convert_auth_payload(parsed_body), ) if user is None: raise NotAuthenticatedError self.request.user = user return self.make_api_response()
[docs] @abstractmethod def convert_auth_payload( self, payload: _ObtainTokensT, ) -> ObtainTokensPayload: """ Convert your custom payload to kwargs that django supports. See :func:`django.contrib.auth.authenticate` docs on which kwargs it supports. Basically it needs ``username`` and ``password`` strings. """ raise NotImplementedError
[docs] @abstractmethod def make_api_response(self) -> _TokensResponseT: """Abstract method to create a response payload.""" raise NotImplementedError
[docs] class ObtainTokensAsyncController( _BaseTokenController[_SerializerT], Generic[_SerializerT, _ObtainTokensT, _TokensResponseT], ): """ Async controller to get access and refresh tokens. Attributes: jwt_audiences: String or sequence of string of audiences for JWT token. jwt_issuer: String of who issued this JWT token. jwt_algorithm: Default algorithm to use for token signing. jwt_expiration: Default token expiration timedelta. jwt_refresh_expiration: Default refresh token expiration timedelta. jwt_secret: Alternative token secret for signing. By default uses ``secret.SECRET_KEY`` jwt_token_cls: Possible custom JWT token class. See also: https://pyjwt.readthedocs.io/en/stable for all the JWT terms and options explanation. """ responses = ( ResponseSpec( return_type=ErrorModel, status_code=HTTPStatus.UNAUTHORIZED, ), )
[docs] @modify(status_code=HTTPStatus.OK) async def post(self, parsed_body: Body[_ObtainTokensT]) -> _TokensResponseT: """By default tokens are acquired on post.""" return await self.login(parsed_body)
[docs] async def login(self, parsed_body: _ObtainTokensT) -> _TokensResponseT: """Perform the async login routine for user.""" user = await aauthenticate( self.request, **(await self.convert_auth_payload(parsed_body)), ) if user is None: raise NotAuthenticatedError self.request.user = user return await self.make_api_response()
[docs] @abstractmethod async def convert_auth_payload( self, payload: _ObtainTokensT, ) -> ObtainTokensPayload: """ Convert your custom payload to kwargs that django supports. See :func:`django.contrib.auth.authenticate` docs on which kwargs it supports. Basically it needs ``username`` and ``password`` strings. """ raise NotImplementedError
[docs] @abstractmethod async def make_api_response(self) -> _TokensResponseT: """Abstract method to create a response payload.""" raise NotImplementedError