ISO-27001-Risk-Management/.venv/lib/python3.11/site-packages/mozilla_django_oidc/auth.py

381 lines
14 KiB
Python
Raw Permalink Normal View History

2025-09-24 18:13:40 +02:00
import base64
import hashlib
import json
import logging
import inspect
import requests
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation
from django.urls import reverse
from django.utils.encoding import force_bytes, smart_bytes, smart_str
from django.utils.module_loading import import_string
from josepy.b64 import b64decode
from josepy.jwk import JWK
from josepy.jws import JWS, Header
from requests.auth import HTTPBasicAuth
from requests.exceptions import HTTPError
from mozilla_django_oidc.utils import absolutify, import_from_settings
LOGGER = logging.getLogger(__name__)
def default_username_algo(email, claims=None):
"""Generate username for the Django user.
:arg str/unicode email: the email address to use to generate a username
:arg dic claims: the claims from your OIDC provider, currently unused
:returns: str/unicode
"""
# bluntly stolen from django-browserid
# store the username as a base64 encoded sha224 of the email address
# this protects against data leakage because usernames are often
# treated as public identifiers (so we can't use the email address).
username = base64.urlsafe_b64encode(
hashlib.sha1(force_bytes(email)).digest()
).rstrip(b"=")
return smart_str(username)
class OIDCAuthenticationBackend(ModelBackend):
"""Override Django's authentication."""
def __init__(self, *args, **kwargs):
"""Initialize settings."""
self.OIDC_OP_TOKEN_ENDPOINT = self.get_settings("OIDC_OP_TOKEN_ENDPOINT")
self.OIDC_OP_USER_ENDPOINT = self.get_settings("OIDC_OP_USER_ENDPOINT")
self.OIDC_OP_JWKS_ENDPOINT = self.get_settings("OIDC_OP_JWKS_ENDPOINT", None)
self.OIDC_RP_CLIENT_ID = self.get_settings("OIDC_RP_CLIENT_ID")
self.OIDC_RP_CLIENT_SECRET = self.get_settings("OIDC_RP_CLIENT_SECRET")
self.OIDC_RP_SIGN_ALGO = self.get_settings("OIDC_RP_SIGN_ALGO", "HS256")
self.OIDC_RP_IDP_SIGN_KEY = self.get_settings("OIDC_RP_IDP_SIGN_KEY", None)
if (
self.OIDC_RP_SIGN_ALGO.startswith("RS")
or self.OIDC_RP_SIGN_ALGO.startswith("ES")
) and (
self.OIDC_RP_IDP_SIGN_KEY is None and self.OIDC_OP_JWKS_ENDPOINT is None
):
msg = "{} alg requires OIDC_RP_IDP_SIGN_KEY or OIDC_OP_JWKS_ENDPOINT to be configured."
raise ImproperlyConfigured(msg.format(self.OIDC_RP_SIGN_ALGO))
self.UserModel = get_user_model()
@staticmethod
def get_settings(attr, *args):
return import_from_settings(attr, *args)
def describe_user_by_claims(self, claims):
email = claims.get("email")
return "email {}".format(email)
def filter_users_by_claims(self, claims):
"""Return all users matching the specified email."""
email = claims.get("email")
if not email:
return self.UserModel.objects.none()
return self.UserModel.objects.filter(email__iexact=email)
def verify_claims(self, claims):
"""Verify the provided claims to decide if authentication should be allowed."""
# Verify claims required by default configuration
scopes = self.get_settings("OIDC_RP_SCOPES", "openid email")
if "email" in scopes.split():
return "email" in claims
LOGGER.warning(
"Custom OIDC_RP_SCOPES defined. "
"You need to override `verify_claims` for custom claims verification."
)
return True
def create_user(self, claims):
"""Return object for a newly created user account."""
email = claims.get("email")
username = self.get_username(claims)
return self.UserModel.objects.create_user(username, email=email)
def get_username(self, claims):
"""Generate username based on claims."""
# bluntly stolen from django-browserid
# https://github.com/mozilla/django-browserid/blob/master/django_browserid/auth.py
username_algo = self.get_settings("OIDC_USERNAME_ALGO", None)
if username_algo:
if isinstance(username_algo, str):
username_algo = import_string(username_algo)
if len(inspect.getfullargspec(username_algo).args) == 1:
# this is for backwards compatibility only
return username_algo(claims.get("email"))
else:
# also pass the claims to the custom user name algo
return username_algo(claims.get("email"), claims)
return default_username_algo(claims.get("email"), claims)
def update_user(self, user, claims):
"""Update existing user with new claims, if necessary save, and return user"""
return user
def _verify_jws(self, payload, key):
"""Verify the given JWS payload with the given key and return the payload"""
jws = JWS.from_compact(payload)
try:
alg = jws.signature.combined.alg.name
except KeyError:
msg = "No alg value found in header"
raise SuspiciousOperation(msg)
if alg != self.OIDC_RP_SIGN_ALGO:
msg = (
"The provider algorithm {!r} does not match the client's "
"OIDC_RP_SIGN_ALGO.".format(alg)
)
raise SuspiciousOperation(msg)
if isinstance(key, str):
# Use smart_bytes here since the key string comes from settings.
jwk = JWK.load(smart_bytes(key))
else:
# The key is a json returned from the IDP JWKS endpoint.
jwk = JWK.from_json(key)
if not jws.verify(jwk):
msg = "JWS token verification failed."
raise SuspiciousOperation(msg)
return jws.payload
def retrieve_matching_jwk(self, token):
"""Get the signing key by exploring the JWKS endpoint of the OP."""
response_jwks = requests.get(
self.OIDC_OP_JWKS_ENDPOINT,
verify=self.get_settings("OIDC_VERIFY_SSL", True),
timeout=self.get_settings("OIDC_TIMEOUT", None),
proxies=self.get_settings("OIDC_PROXY", None),
)
response_jwks.raise_for_status()
jwks = response_jwks.json()
# Compute the current header from the given token to find a match
jws = JWS.from_compact(token)
json_header = jws.signature.protected
header = Header.json_loads(json_header)
key = None
for jwk in jwks["keys"]:
if import_from_settings("OIDC_VERIFY_KID", True) and jwk[
"kid"
] != smart_str(header.kid):
continue
if "alg" in jwk and jwk["alg"] != smart_str(header.alg):
continue
key = jwk
if key is None:
raise SuspiciousOperation("Could not find a valid JWKS.")
return key
def get_payload_data(self, token, key):
"""Helper method to get the payload of the JWT token."""
if self.get_settings("OIDC_ALLOW_UNSECURED_JWT", False):
header, payload_data, signature = token.split(b".")
header = json.loads(smart_str(b64decode(header)))
# If config allows unsecured JWTs check the header and return the decoded payload
if "alg" in header and header["alg"] == "none":
return b64decode(payload_data)
# By default fallback to verify JWT signatures
return self._verify_jws(token, key)
def verify_token(self, token, **kwargs):
"""Validate the token signature."""
nonce = kwargs.get("nonce")
token = force_bytes(token)
if self.OIDC_RP_SIGN_ALGO.startswith("RS") or self.OIDC_RP_SIGN_ALGO.startswith(
"ES"
):
if self.OIDC_RP_IDP_SIGN_KEY is not None:
key = self.OIDC_RP_IDP_SIGN_KEY
else:
key = self.retrieve_matching_jwk(token)
else:
key = self.OIDC_RP_CLIENT_SECRET
payload_data = self.get_payload_data(token, key)
# The 'token' will always be a byte string since it's
# the result of base64.urlsafe_b64decode().
# The payload is always the result of base64.urlsafe_b64decode().
# In Python 3 and 2, that's always a byte string.
# In Python3.6, the json.loads() function can accept a byte string
# as it will automagically decode it to a unicode string before
# deserializing https://bugs.python.org/issue17909
payload = json.loads(payload_data.decode("utf-8"))
token_nonce = payload.get("nonce")
if self.get_settings("OIDC_USE_NONCE", True) and nonce != token_nonce:
msg = "JWT Nonce verification failed."
raise SuspiciousOperation(msg)
return payload
def get_token(self, payload):
"""Return token object as a dictionary."""
auth = None
if self.get_settings("OIDC_TOKEN_USE_BASIC_AUTH", False):
# When Basic auth is defined, create the Auth Header and remove secret from payload.
user = payload.get("client_id")
pw = payload.get("client_secret")
auth = HTTPBasicAuth(user, pw)
del payload["client_secret"]
response = requests.post(
self.OIDC_OP_TOKEN_ENDPOINT,
data=payload,
auth=auth,
verify=self.get_settings("OIDC_VERIFY_SSL", True),
timeout=self.get_settings("OIDC_TIMEOUT", None),
proxies=self.get_settings("OIDC_PROXY", None),
)
self.raise_token_response_error(response)
return response.json()
def raise_token_response_error(self, response):
"""Raises :class:`HTTPError`, if one occurred.
as per: https://datatracker.ietf.org/doc/html/rfc6749#section-5.2
"""
# if there wasn't an error all is good
if response.status_code == 200:
return
# otherwise something is up...
http_error_msg = (
f"Get Token Error (url: {response.url}, "
f"status: {response.status_code}, "
f"body: {response.text})"
)
raise HTTPError(http_error_msg, response=response)
def get_userinfo(self, access_token, id_token, payload):
"""Return user details dictionary. The id_token and payload are not used in
the default implementation, but may be used when overriding this method"""
user_response = requests.get(
self.OIDC_OP_USER_ENDPOINT,
headers={"Authorization": "Bearer {0}".format(access_token)},
verify=self.get_settings("OIDC_VERIFY_SSL", True),
timeout=self.get_settings("OIDC_TIMEOUT", None),
proxies=self.get_settings("OIDC_PROXY", None),
)
user_response.raise_for_status()
return user_response.json()
def authenticate(self, request, **kwargs):
"""Authenticates a user based on the OIDC code flow."""
self.request = request
if not self.request:
return None
state = self.request.GET.get("state")
code = self.request.GET.get("code")
nonce = kwargs.pop("nonce", None)
code_verifier = kwargs.pop("code_verifier", None)
if not code or not state:
return None
reverse_url = self.get_settings(
"OIDC_AUTHENTICATION_CALLBACK_URL", "oidc_authentication_callback"
)
token_payload = {
"client_id": self.OIDC_RP_CLIENT_ID,
"client_secret": self.OIDC_RP_CLIENT_SECRET,
"grant_type": "authorization_code",
"code": code,
"redirect_uri": absolutify(self.request, reverse(reverse_url)),
}
# Send code_verifier with token request if using PKCE
if code_verifier is not None:
token_payload.update({"code_verifier": code_verifier})
# Get the token
token_info = self.get_token(token_payload)
id_token = token_info.get("id_token")
access_token = token_info.get("access_token")
# Validate the token
payload = self.verify_token(id_token, nonce=nonce)
if payload:
self.store_tokens(access_token, id_token)
try:
return self.get_or_create_user(access_token, id_token, payload)
except SuspiciousOperation as exc:
LOGGER.warning("failed to get or create user: %s", exc)
return None
return None
def store_tokens(self, access_token, id_token):
"""Store OIDC tokens."""
session = self.request.session
if self.get_settings("OIDC_STORE_ACCESS_TOKEN", False):
session["oidc_access_token"] = access_token
if self.get_settings("OIDC_STORE_ID_TOKEN", False):
session["oidc_id_token"] = id_token
def get_or_create_user(self, access_token, id_token, payload):
"""Returns a User instance if 1 user is found. Creates a user if not found
and configured to do so. Returns nothing if multiple users are matched."""
user_info = self.get_userinfo(access_token, id_token, payload)
claims_verified = self.verify_claims(user_info)
if not claims_verified:
msg = "Claims verification failed"
raise SuspiciousOperation(msg)
# email based filtering
users = self.filter_users_by_claims(user_info)
if len(users) == 1:
return self.update_user(users[0], user_info)
elif len(users) > 1:
# In the rare case that two user accounts have the same email address,
# bail. Randomly selecting one seems really wrong.
msg = "Multiple users returned"
raise SuspiciousOperation(msg)
elif self.get_settings("OIDC_CREATE_USER", True):
user = self.create_user(user_info)
return user
else:
LOGGER.debug(
"Login failed: No user with %s found, and " "OIDC_CREATE_USER is False",
self.describe_user_by_claims(user_info),
)
return None
def get_user(self, user_id):
"""Return a user based on the id."""
try:
return self.UserModel.objects.get(pk=user_id)
except self.UserModel.DoesNotExist:
return None