ISO-27001-Risk-Management/.venv/lib/python3.11/site-packages/mozilla_django_oidc/views.py
2025-09-24 18:13:40 +02:00

276 lines
10 KiB
Python

import time
from urllib.parse import urlencode
from django.contrib import auth
from django.core.exceptions import SuspiciousOperation
from django.http import HttpResponseNotAllowed, HttpResponseRedirect
from django.shortcuts import resolve_url
from django.urls import reverse
from django.utils.crypto import get_random_string
from django.utils.http import url_has_allowed_host_and_scheme
from django.utils.module_loading import import_string
from django.views.generic import View
from mozilla_django_oidc.utils import (
absolutify,
add_state_and_verifier_and_nonce_to_session,
generate_code_challenge,
import_from_settings,
)
class OIDCAuthenticationCallbackView(View):
"""OIDC client authentication callback HTTP endpoint"""
http_method_names = ["get"]
@staticmethod
def get_settings(attr, *args):
return import_from_settings(attr, *args)
@property
def failure_url(self):
return self.get_settings("LOGIN_REDIRECT_URL_FAILURE", "/")
@property
def success_url(self):
# Pull the next url from the session or settings--we don't need to
# sanitize here because it should already have been sanitized.
next_url = self.request.session.get("oidc_login_next", None)
return next_url or resolve_url(self.get_settings("LOGIN_REDIRECT_URL", "/"))
def login_failure(self):
return HttpResponseRedirect(self.failure_url)
def login_success(self):
# If the user hasn't changed (because this is a session refresh instead of a
# normal login), don't call login. This prevents invaliding the user's current CSRF token
request_user = getattr(self.request, "user", None)
if (
not request_user
or not request_user.is_authenticated
or request_user != self.user
):
auth.login(self.request, self.user)
# Figure out when this id_token will expire. This is ignored unless you're
# using the SessionRefresh middleware.
expiration_interval = self.get_settings(
"OIDC_RENEW_ID_TOKEN_EXPIRY_SECONDS", 60 * 15
)
self.request.session["oidc_id_token_expiration"] = (
time.time() + expiration_interval
)
return HttpResponseRedirect(self.success_url)
def get(self, request):
"""Callback handler for OIDC authorization code flow"""
if request.GET.get("error"):
# Ouch! Something important failed.
# Delete the state entry also for failed authentication attempts
# to prevent replay attacks.
if (
"state" in request.GET
and "oidc_states" in request.session
and request.GET["state"] in request.session["oidc_states"]
):
del request.session["oidc_states"][request.GET["state"]]
request.session.save()
# Make sure the user doesn't get to continue to be logged in
# otherwise the refresh middleware will force the user to
# redirect to authorize again if the session refresh has
# expired.
if request.user.is_authenticated:
auth.logout(request)
assert not request.user.is_authenticated
elif "code" in request.GET and "state" in request.GET:
# Check instead of "oidc_state" check if the "oidc_states" session key exists!
if "oidc_states" not in request.session:
return self.login_failure()
# State, Nonce and PKCE Code Verifier are stored in the session "oidc_states"
# dictionary.
# State is the key, the value is a dictionary with the Nonce in the "nonce" field, and
# Code Verifier or None in the "code_verifier" field.
state = request.GET.get("state")
if state not in request.session["oidc_states"]:
msg = "OIDC callback state not found in session `oidc_states`!"
raise SuspiciousOperation(msg)
# Get the nonce and optional code verifier from the dictionary for further processing
# and delete the entry to prevent replay attacks.
code_verifier = request.session["oidc_states"][state].get("code_verifier")
nonce = request.session["oidc_states"][state]["nonce"]
del request.session["oidc_states"][state]
# Authenticating is slow, so save the updated oidc_states.
request.session.save()
# Reset the session. This forces the session to get reloaded from the database after
# fetching the token from the OpenID connect provider.
# Without this step we would overwrite items that are being added/removed from the
# session in parallel browser tabs.
request.session = request.session.__class__(request.session.session_key)
kwargs = {
"request": request,
"nonce": nonce,
"code_verifier": code_verifier,
}
self.user = auth.authenticate(**kwargs)
if self.user and self.user.is_active:
return self.login_success()
return self.login_failure()
def get_next_url(request, redirect_field_name):
"""Retrieves next url from request
Note: This verifies that the url is safe before returning it. If the url
is not safe, this returns None.
:arg HttpRequest request: the http request
:arg str redirect_field_name: the name of the field holding the next url
:returns: safe url or None
"""
next_url = request.GET.get(redirect_field_name)
if next_url:
kwargs = {
"url": next_url,
"require_https": import_from_settings(
"OIDC_REDIRECT_REQUIRE_HTTPS", request.is_secure()
),
}
hosts = list(import_from_settings("OIDC_REDIRECT_ALLOWED_HOSTS", []))
hosts.append(request.get_host())
kwargs["allowed_hosts"] = hosts
is_safe = url_has_allowed_host_and_scheme(**kwargs)
if is_safe:
return next_url
return None
class OIDCAuthenticationRequestView(View):
"""OIDC client authentication HTTP endpoint"""
http_method_names = ["get"]
def __init__(self, *args, **kwargs):
super(OIDCAuthenticationRequestView, self).__init__(*args, **kwargs)
self.OIDC_OP_AUTH_ENDPOINT = self.get_settings("OIDC_OP_AUTHORIZATION_ENDPOINT")
self.OIDC_RP_CLIENT_ID = self.get_settings("OIDC_RP_CLIENT_ID")
@staticmethod
def get_settings(attr, *args):
return import_from_settings(attr, *args)
def get(self, request):
"""OIDC client authentication initialization HTTP endpoint"""
state = get_random_string(self.get_settings("OIDC_STATE_SIZE", 32))
redirect_field_name = self.get_settings("OIDC_REDIRECT_FIELD_NAME", "next")
reverse_url = self.get_settings(
"OIDC_AUTHENTICATION_CALLBACK_URL", "oidc_authentication_callback"
)
params = {
"response_type": "code",
"scope": self.get_settings("OIDC_RP_SCOPES", "openid email"),
"client_id": self.OIDC_RP_CLIENT_ID,
"redirect_uri": absolutify(request, reverse(reverse_url)),
"state": state,
}
params.update(self.get_extra_params(request))
if self.get_settings("OIDC_USE_NONCE", True):
nonce = get_random_string(self.get_settings("OIDC_NONCE_SIZE", 32))
params.update({"nonce": nonce})
if self.get_settings("OIDC_USE_PKCE", False):
code_verifier_length = self.get_settings("OIDC_PKCE_CODE_VERIFIER_SIZE", 64)
# Check that code_verifier_length is between the min and max length
# defined in https://datatracker.ietf.org/doc/html/rfc7636#section-4.1
if not (43 <= code_verifier_length <= 128):
raise ValueError("code_verifier_length must be between 43 and 128")
# Generate code_verifier and code_challenge pair
code_verifier = get_random_string(code_verifier_length)
code_challenge_method = self.get_settings(
"OIDC_PKCE_CODE_CHALLENGE_METHOD", "S256"
)
code_challenge = generate_code_challenge(
code_verifier, code_challenge_method
)
# Append code_challenge to authentication request parameters
params.update(
{
"code_challenge": code_challenge,
"code_challenge_method": code_challenge_method,
}
)
else:
code_verifier = None
add_state_and_verifier_and_nonce_to_session(
request, state, params, code_verifier
)
request.session["oidc_login_next"] = get_next_url(request, redirect_field_name)
query = urlencode(params)
redirect_url = "{url}?{query}".format(
url=self.OIDC_OP_AUTH_ENDPOINT, query=query
)
return HttpResponseRedirect(redirect_url)
def get_extra_params(self, request):
return self.get_settings("OIDC_AUTH_REQUEST_EXTRA_PARAMS", {})
class OIDCLogoutView(View):
"""Logout helper view"""
http_method_names = ["get", "post"]
@staticmethod
def get_settings(attr, *args):
return import_from_settings(attr, *args)
@property
def redirect_url(self):
"""Return the logout url defined in settings."""
return self.get_settings("LOGOUT_REDIRECT_URL", "/")
def post(self, request):
"""Log out the user."""
logout_url = self.redirect_url
if request.user.is_authenticated:
# Check if a method exists to build the URL to log out the user
# from the OP.
logout_from_op = self.get_settings("OIDC_OP_LOGOUT_URL_METHOD", "")
if logout_from_op:
logout_url = import_string(logout_from_op)(request)
# Log out the Django user if they were logged in.
auth.logout(request)
return HttpResponseRedirect(logout_url)
def get(self, request):
"""Log out the user."""
if self.get_settings("ALLOW_LOGOUT_GET_METHOD", False):
return self.post(request)
return HttpResponseNotAllowed(["POST"])