from collections import Counter from django.contrib import messages from django.contrib.admin.models import LogEntry from django.contrib.auth import get_user_model from django.contrib.auth.decorators import login_required from django.contrib.contenttypes.models import ContentType from django.db.models import Count from django.db.models.functions import TruncMonth from django.http import HttpResponseForbidden from django.shortcuts import get_object_or_404, redirect, render from django.utils.translation import gettext_lazy as _ from rest_framework import viewsets from rest_framework.permissions import IsAuthenticated from .forms import RiskStatusForm, ControlStatusForm, IncidentStatusForm, ResidualReviewForm from .models import AuditLog, Risk, Control, ResidualRisk, AuditLog, Incident, Notification from .serializers import ( ControlSerializer, RiskSerializer, ResidualRiskSerializer, UserSerializer, AuditSerializer, IncidentSerializer, ) User = get_user_model() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _can_edit_risk(user, risk: Risk) -> bool: return bool(user.is_staff or (risk.owner_id and risk.owner_id == user.id)) def _can_edit_control(user, control: Control) -> bool: return bool(user.is_staff or (control.responsible_id and control.responsible_id == user.id)) def _can_edit_incident(user, incident: Incident) -> bool: return bool(user.is_staff or (incident.reported_by_id and incident.reported_by_id == user.id)) # --------------------------------------------------------------------------- # API ViewSets # --------------------------------------------------------------------------- class _ChangedByMixin: """Mixin to track user who changed an object.""" def perform_create(self, serializer): instance = serializer.save() instance._changed_by = self.request.user def perform_update(self, serializer): instance = serializer.save() instance._changed_by = self.request.user class RiskViewSet(_ChangedByMixin, viewsets.ModelViewSet): """API endpoint for managing Risks.""" queryset = Risk.objects.all() serializer_class = RiskSerializer permission_classes = [IsAuthenticated] def perform_create(self, serializer): instance = serializer.save() instance._changed_by = self.request.user instance.save(update_fields=[]) def perform_update(self, serializer): instance = serializer.save() instance._changed_by = self.request.user instance.save(update_fields=[]) class ControlViewSet(_ChangedByMixin, viewsets.ModelViewSet): """API endpoint for managing Controls.""" queryset = Control.objects.all() serializer_class = ControlSerializer permission_classes = [IsAuthenticated] def perform_create(self, serializer): instance = serializer.save() instance._changed_by = self.request.user instance.save(update_fields=[]) def perform_update(self, serializer): instance = serializer.save() instance._changed_by = self.request.user instance.save(update_fields=[]) class ResidualRiskViewSet(viewsets.ModelViewSet): """API endpoint for Residual Risks.""" queryset = ResidualRisk.objects.all() serializer_class = ResidualRiskSerializer permission_classes = [IsAuthenticated] def perform_create(self, serializer): instance = serializer.save() instance._changed_by = self.request.user instance.save(update_fields=[]) def perform_update(self, serializer): instance = serializer.save() instance._changed_by = self.request.user instance.save(update_fields=[]) class UserViewSet(_ChangedByMixin, viewsets.ReadOnlyModelViewSet): """API endpoint for listing users and their responsibilities.""" queryset = User.objects.all() serializer_class = UserSerializer permission_classes = [IsAuthenticated] def perform_create(self, serializer): instance = serializer.save() instance._changed_by = self.request.user instance.save(update_fields=[]) def perform_update(self, serializer): instance = serializer.save() instance._changed_by = self.request.user instance.save(update_fields=[]) class AuditViewSet(viewsets.ReadOnlyModelViewSet): """API endpoint for viewing audit logs.""" queryset = AuditLog.objects.all() serializer_class = AuditSerializer permission_classes = [IsAuthenticated] class IncidentViewSet(_ChangedByMixin, viewsets.ModelViewSet): """API endpoint for listing incidents and their related risks.""" queryset = Incident.objects.all() serializer_class = IncidentSerializer permission_classes = [IsAuthenticated] def perform_create(self, serializer): instance = serializer.save(reported_by=self.request.user) instance._changed_by = self.request.user def perform_create(self, serializer): instance = serializer.save() instance._changed_by = self.request.user instance.save(update_fields=[]) def perform_update(self, serializer): instance = serializer.save() instance._changed_by = self.request.user instance.save(update_fields=[]) # --------------------------------------------------------------------------- # Web Views: Risks # --------------------------------------------------------------------------- @login_required def list_risks(request): """List all risks with filters and sorting.""" qs = Risk.objects.all().select_related("owner", "residual_risk") # Filters filters = { "id": request.GET.get("risk"), "controls__id": request.GET.get("control"), "owner_id": request.GET.get("owner"), "category": request.GET.get("category"), "asset": request.GET.get("asset"), "process": request.GET.get("process"), } qs = qs.filter(**{k: v for k, v in filters.items() if v}) # Sorting sort = request.GET.get("sort") or "title" direction = request.GET.get("dir") or "asc" qs = qs.order_by(f"-{sort}" if direction == "desc" else sort) risks = qs.distinct() return render(request, "risks/list_risks.html", { "risks": risks, "risk_choices": Risk.objects.all().order_by("title"), "control_choices": Control.objects.all().order_by("title"), "owner_choices": User.objects.filter(owned_risks__isnull=False).distinct().order_by("username"), "category_choices": (Risk.objects.exclude(category__isnull=True).exclude(category__exact="") .values_list("category", flat=True).distinct().order_by("category")), "asset_choices": (Risk.objects.exclude(asset__isnull=True).exclude(asset__exact="") .values_list("asset", flat=True).distinct().order_by("asset")), "process_choices": (Risk.objects.exclude(process__isnull=True).exclude(process__exact="") .values_list("process", flat=True).distinct().order_by("process")), "current_sort": sort, "current_dir": direction, }) @login_required def show_risk(request, id): """Show single risk details + logs.""" risk = get_object_or_404( Risk.objects.select_related("residual_risk", "owner").prefetch_related("controls"), pk=id, ) ct = ContentType.objects.get_for_model(Risk) logs = LogEntry.objects.filter(content_type=ct, object_id=risk.pk).order_by("-action_time") return render(request, "risks/item_risk.html", {"risk": risk, "logs": logs}) @login_required def mark_risk_reviewed(request, pk): """Mark a risk as reviewed and close if all controls are completed.""" risk = get_object_or_404(Risk, pk=pk) all_done = all(c.status in ("completed", "verified") for c in risk.controls.all()) if all_done: risk.status = "closed" risk._changed_by = request.user risk.save(update_fields=["status", "updated_at"]) # ➜ AuditLog schreiben AuditLog.objects.create( user=request.user, action="reviewed", model="Risk", object_id=risk.pk, changes={"status": {"old": "review_required", "new": "closed"}}, ) messages.success(request, _("Risk has been marked as reviewed and closed.")) else: messages.error(request, _("Not all controls are completed. Risk cannot be closed yet.")) return redirect("risks:show_risk", id=risk.pk) # --------------------------------------------------------------------------- # Web Views: Controls # --------------------------------------------------------------------------- @login_required def list_controls(request): """List all controls with filters.""" qs = Control.objects.all().select_related("responsible") filters = { "id": request.GET.get("control"), "risks__id": request.GET.get("risk"), "status": request.GET.get("status"), "responsible_id": request.GET.get("responsible"), } qs = qs.filter(**{k: v for k, v in filters.items() if v}) controls = qs.order_by("title").distinct() return render(request, "risks/list_controls.html", { "controls": controls, "control_choices": Control.objects.all().order_by("title"), "risk_choices": Risk.objects.all().order_by("title"), "responsible_choices": User.objects.filter(responsible_controls__isnull=False).distinct().order_by("username"), "status_choices": Control.STATUS_CHOICES, }) @login_required def show_control(request, id): """Show single control details + logs.""" control = get_object_or_404(Control, pk=id) ct = ContentType.objects.get_for_model(Control) logs = LogEntry.objects.filter(content_type=ct, object_id=control.pk).order_by("-action_time") return render(request, "risks/item_control.html", {"control": control, "logs": logs}) # --------------------------------------------------------------------------- # Web Views: Incidents # --------------------------------------------------------------------------- @login_required def list_incidents(request): """List all incidents with filters.""" qs = Incident.objects.all().select_related("reported_by").prefetch_related("related_risks") filters = { "related_risks__id": request.GET.get("risk"), "status": request.GET.get("status"), "reported_by": request.GET.get("reported_by"), } qs = qs.filter(**{k: v for k, v in filters.items() if v}) incidents = qs.order_by("title").distinct() return render(request, "risks/list_incidents.html", { "incidents": incidents, "incident_choices": incidents, "risk_choices": Risk.objects.all().order_by("title"), "user_choices": User.objects.filter(incidents__isnull=False).distinct().order_by("username"), "status_choices": Incident.STATUS_CHOICES, }) @login_required def show_incident(request, id): """Show single incident details + logs.""" incident = get_object_or_404(Incident, pk=id) ct = ContentType.objects.get_for_model(Incident) logs = LogEntry.objects.filter(content_type=ct, object_id=incident.pk).order_by("-action_time") return render(request, "risks/item_incident.html", {"incident": incident, "logs": logs}) # --------------------------------------------------------------------------- # Dashboard # --------------------------------------------------------------------------- @login_required def dashboard(request): # Bestehende KPIs risks_total = Risk.objects.count() risks_by_level = Risk.objects.values("level").annotate(count=Count("id")) # CIA Counter risks_cia = Risk.objects.values_list("cia", flat=True) cia_counter = Counter() for cia_list in risks_cia: if isinstance(cia_list, list): for c in cia_list: cia_counter[c] += 1 elif cia_list: cia_counter[cia_list] += 1 # Residual Risks residual_review_required = ResidualRisk.objects.filter(review_required=True).count() # Controls & Incidents controls_by_status = Control.objects.values("status").annotate(count=Count("id")) incidents_status = Incident.objects.values("status").annotate(count=Count("id")) # Notifications notifications_unread = Notification.objects.filter(user=request.user, read=False).count() # Risks by Level per Month (Trend) risks_trend_qs = ( Risk.objects.annotate(month=TruncMonth("created_at")) .values("month", "level") .annotate(count=Count("id")) .order_by("month") ) # Daten für ChartJS vorbereiten months = sorted(set(r["month"].strftime("%Y-%m") for r in risks_trend_qs if r["month"])) levels = {r["level"] for r in risks_trend_qs} trend_data = {lvl: [0] * len(months) for lvl in levels} for r in risks_trend_qs: if r["month"]: idx = months.index(r["month"].strftime("%Y-%m")) trend_data[r["level"]][idx] = r["count"] context = { "risks_total": risks_total, "risks_by_level": risks_by_level, "risks_by_cia": dict(cia_counter), "residual_review_required": residual_review_required, "controls_by_status": controls_by_status, "incidents_status": incidents_status, "notifications_unread": notifications_unread, # Trend-Daten "months": months, "trend_data": trend_data, } return render(request, "risks/dashboard.html", context) # --------------------------------------------------------------------------- # Notifications # --------------------------------------------------------------------------- @login_required def notifications(request): """View own notifications with optional filter.""" flt = request.GET.get("filter", "unread") qs = Notification.objects.filter(user=request.user).order_by("-created_at") if flt == "unread": qs = qs.filter(read=False) return render(request, "risks/notifications.html", {"notifications": qs, "filter": flt}) @login_required def notification_mark_read(request, pk): """Mark single notification as read.""" if request.method != "POST": return HttpResponseForbidden() notif = get_object_or_404(Notification, pk=pk, user=request.user) notif.read = True notif.save(update_fields=["read"]) #messages.success(request, _("Notification marked as read.")) return redirect(request.META.get("HTTP_REFERER") or "risks:notifications") @login_required def notification_mark_all_read(request): """Mark all notifications as read.""" if request.method != "POST": return HttpResponseForbidden() Notification.objects.filter(user=request.user, read=False).update(read=True) #messages.success(request, _("All notifications marked as read.")) return redirect("risks:notifications") # --------------------------------------------------------------------------- # Status Updates # --------------------------------------------------------------------------- @login_required def update_risk_status(request, id): """Update risk status.""" risk = get_object_or_404(Risk, pk=id) if not _can_edit_risk(request.user, risk): return HttpResponseForbidden() if request.method == "POST": form = RiskStatusForm(request.POST, instance=risk) if form.is_valid(): obj = form.save(commit=False) obj._changed_by = request.user obj.save(update_fields=["status", "updated_at"]) messages.success(request, _("Risk status updated.")) return redirect("risks:show_risk", id=risk.pk) @login_required def update_control_status(request, id): """Update control status.""" control = get_object_or_404(Control, pk=id) if not _can_edit_control(request.user, control): return HttpResponseForbidden() if request.method == "POST": form = ControlStatusForm(request.POST, instance=control) if form.is_valid(): obj = form.save(commit=False) obj._changed_by = request.user obj.save(update_fields=["status", "updated_at"]) messages.success(request, _("Control status updated.")) return redirect("risks:show_control", id=control.pk) @login_required def update_incident_status(request, id): """Update incident status.""" incident = get_object_or_404(Incident, pk=id) if not _can_edit_incident(request.user, incident): return HttpResponseForbidden() if request.method == "POST": form = IncidentStatusForm(request.POST, instance=incident) if form.is_valid(): obj = form.save(commit=False) obj._changed_by = request.user obj.save(update_fields=["status", "updated_at"]) messages.success(request, _("Incident status updated.")) return redirect("risks:show_incident", id=incident.pk) @login_required def update_residual_review(request, risk_id): """Toggle residual risk review flag.""" risk = get_object_or_404(Risk, pk=risk_id) if not _can_edit_risk(request.user, risk): return HttpResponseForbidden() residual, _ = ResidualRisk.objects.get_or_create(risk=risk) if request.method == "POST": form = ResidualReviewForm(request.POST, instance=residual) if form.is_valid(): obj = form.save(commit=False) obj._changed_by = request.user obj.save(update_fields=["review_required", "updated_at"]) messages.success(request, _("Residual review flag updated.")) return redirect("risks:show_risk", id=risk.pk) # --------------------------------------------------------------------------- # Risk Matrix # --------------------------------------------------------------------------- def risk_matrix(request): """Show gross/net risk matrix.""" risks = Risk.objects.select_related("owner", "residual_risk").all() impacts = sorted(Risk.IMPACT_CHOICES, key=lambda x: x[0]) likelihoods = sorted(Risk.LIKELIHOOD_CHOICES, key=lambda x: x[0]) gross_matrix = {i: {l: [] for l, _ in likelihoods} for i, _ in impacts} net_matrix = {i: {l: [] for l, _ in likelihoods} for i, _ in impacts} for r in risks: gross_matrix[r.impact][r.likelihood].append(r) rr = getattr(r, "residual_risk", None) if rr: net_matrix[rr.impact][rr.likelihood].append(r) return render(request, "risks/risk_matrix.html", { "impacts": impacts, "likelihoods": likelihoods, "gross_matrix": gross_matrix, "net_matrix": net_matrix, })