ISO-27001-Risk-Management/risks/views.py

484 lines
18 KiB
Python
Raw Normal View History

from collections import Counter
from django.contrib import messages
from django.contrib.admin.models import LogEntry
from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import login_required
from django.contrib.contenttypes.models import ContentType
from django.db.models import Count
from django.db.models.functions import TruncMonth
from django.http import HttpResponseForbidden
from django.shortcuts import get_object_or_404, redirect, render
from django.utils.translation import gettext_lazy as _
from rest_framework import viewsets
from rest_framework.permissions import IsAuthenticated
from .forms import RiskStatusForm, ControlStatusForm, IncidentStatusForm, ResidualReviewForm
2025-09-16 14:15:04 +02:00
from .models import AuditLog, Risk, Control, ResidualRisk, AuditLog, Incident, Notification
from .serializers import (
ControlSerializer, RiskSerializer, ResidualRiskSerializer,
UserSerializer, AuditSerializer, IncidentSerializer,
)
User = get_user_model()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _can_edit_risk(user, risk: Risk) -> bool:
return bool(user.is_staff or (risk.owner_id and risk.owner_id == user.id))
def _can_edit_control(user, control: Control) -> bool:
return bool(user.is_staff or (control.responsible_id and control.responsible_id == user.id))
def _can_edit_incident(user, incident: Incident) -> bool:
return bool(user.is_staff or (incident.reported_by_id and incident.reported_by_id == user.id))
# ---------------------------------------------------------------------------
# API ViewSets
# ---------------------------------------------------------------------------
class _ChangedByMixin:
"""Mixin to track user who changed an object."""
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
class RiskViewSet(_ChangedByMixin, viewsets.ModelViewSet):
"""API endpoint for managing Risks."""
queryset = Risk.objects.all()
serializer_class = RiskSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save(update_fields=[])
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save(update_fields=[])
class ControlViewSet(_ChangedByMixin, viewsets.ModelViewSet):
"""API endpoint for managing Controls."""
queryset = Control.objects.all()
serializer_class = ControlSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save(update_fields=[])
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save(update_fields=[])
class ResidualRiskViewSet(viewsets.ModelViewSet):
"""API endpoint for Residual Risks."""
queryset = ResidualRisk.objects.all()
serializer_class = ResidualRiskSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save(update_fields=[])
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save(update_fields=[])
class UserViewSet(_ChangedByMixin, viewsets.ReadOnlyModelViewSet):
"""API endpoint for listing users and their responsibilities."""
queryset = User.objects.all()
serializer_class = UserSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save(update_fields=[])
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save(update_fields=[])
class AuditViewSet(viewsets.ReadOnlyModelViewSet):
"""API endpoint for viewing audit logs."""
queryset = AuditLog.objects.all()
serializer_class = AuditSerializer
permission_classes = [IsAuthenticated]
class IncidentViewSet(_ChangedByMixin, viewsets.ModelViewSet):
"""API endpoint for listing incidents and their related risks."""
queryset = Incident.objects.all()
serializer_class = IncidentSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save(reported_by=self.request.user)
instance._changed_by = self.request.user
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save(update_fields=[])
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save(update_fields=[])
# ---------------------------------------------------------------------------
# Web Views: Risks
# ---------------------------------------------------------------------------
@login_required
def list_risks(request):
"""List all risks with filters and sorting."""
qs = Risk.objects.all().select_related("owner", "residual_risk")
# Filters
filters = {
"id": request.GET.get("risk"),
"controls__id": request.GET.get("control"),
"owner_id": request.GET.get("owner"),
"category": request.GET.get("category"),
"asset": request.GET.get("asset"),
"process": request.GET.get("process"),
}
qs = qs.filter(**{k: v for k, v in filters.items() if v})
# Sorting
2025-09-11 15:02:29 +02:00
sort = request.GET.get("sort") or "title"
direction = request.GET.get("dir") or "asc"
qs = qs.order_by(f"-{sort}" if direction == "desc" else sort)
2025-09-11 15:02:29 +02:00
risks = qs.distinct()
return render(request, "risks/list_risks.html", {
"risks": risks,
"risk_choices": Risk.objects.all().order_by("title"),
"control_choices": Control.objects.all().order_by("title"),
"owner_choices": User.objects.filter(owned_risks__isnull=False).distinct().order_by("username"),
"category_choices": (Risk.objects.exclude(category__isnull=True).exclude(category__exact="")
.values_list("category", flat=True).distinct().order_by("category")),
"asset_choices": (Risk.objects.exclude(asset__isnull=True).exclude(asset__exact="")
.values_list("asset", flat=True).distinct().order_by("asset")),
"process_choices": (Risk.objects.exclude(process__isnull=True).exclude(process__exact="")
.values_list("process", flat=True).distinct().order_by("process")),
2025-09-11 15:02:29 +02:00
"current_sort": sort,
"current_dir": direction,
})
@login_required
def show_risk(request, id):
"""Show single risk details + logs."""
risk = get_object_or_404(
Risk.objects.select_related("residual_risk", "owner").prefetch_related("controls"),
pk=id,
)
ct = ContentType.objects.get_for_model(Risk)
logs = LogEntry.objects.filter(content_type=ct, object_id=risk.pk).order_by("-action_time")
return render(request, "risks/item_risk.html", {"risk": risk, "logs": logs})
2025-09-15 11:12:12 +02:00
@login_required
def mark_risk_reviewed(request, pk):
"""Mark a risk as reviewed and close if all controls are completed."""
risk = get_object_or_404(Risk, pk=pk)
all_done = all(c.status in ("completed", "verified") for c in risk.controls.all())
if all_done:
risk.status = "closed"
risk._changed_by = request.user
risk.save(update_fields=["status", "updated_at"])
2025-09-16 14:15:04 +02:00
# ➜ AuditLog schreiben
AuditLog.objects.create(
user=request.user,
action="reviewed",
model="Risk",
object_id=risk.pk,
changes={"status": {"old": "review_required", "new": "closed"}},
)
2025-09-15 11:12:12 +02:00
messages.success(request, _("Risk has been marked as reviewed and closed."))
else:
messages.error(request, _("Not all controls are completed. Risk cannot be closed yet."))
return redirect("risks:show_risk", id=risk.pk)
# ---------------------------------------------------------------------------
# Web Views: Controls
# ---------------------------------------------------------------------------
@login_required
def list_controls(request):
"""List all controls with filters."""
qs = Control.objects.all().select_related("responsible")
filters = {
"id": request.GET.get("control"),
"risks__id": request.GET.get("risk"),
"status": request.GET.get("status"),
"responsible_id": request.GET.get("responsible"),
}
qs = qs.filter(**{k: v for k, v in filters.items() if v})
controls = qs.order_by("title").distinct()
return render(request, "risks/list_controls.html", {
"controls": controls,
"control_choices": Control.objects.all().order_by("title"),
"risk_choices": Risk.objects.all().order_by("title"),
"responsible_choices": User.objects.filter(responsible_controls__isnull=False).distinct().order_by("username"),
"status_choices": Control.STATUS_CHOICES,
})
@login_required
def show_control(request, id):
"""Show single control details + logs."""
control = get_object_or_404(Control, pk=id)
ct = ContentType.objects.get_for_model(Control)
logs = LogEntry.objects.filter(content_type=ct, object_id=control.pk).order_by("-action_time")
return render(request, "risks/item_control.html", {"control": control, "logs": logs})
# ---------------------------------------------------------------------------
# Web Views: Incidents
# ---------------------------------------------------------------------------
@login_required
def list_incidents(request):
"""List all incidents with filters."""
qs = Incident.objects.all().select_related("reported_by").prefetch_related("related_risks")
filters = {
"related_risks__id": request.GET.get("risk"),
"status": request.GET.get("status"),
"reported_by": request.GET.get("reported_by"),
}
qs = qs.filter(**{k: v for k, v in filters.items() if v})
incidents = qs.order_by("title").distinct()
return render(request, "risks/list_incidents.html", {
"incidents": incidents,
"incident_choices": incidents,
"risk_choices": Risk.objects.all().order_by("title"),
"user_choices": User.objects.filter(incidents__isnull=False).distinct().order_by("username"),
"status_choices": Incident.STATUS_CHOICES,
})
@login_required
def show_incident(request, id):
"""Show single incident details + logs."""
incident = get_object_or_404(Incident, pk=id)
ct = ContentType.objects.get_for_model(Incident)
logs = LogEntry.objects.filter(content_type=ct, object_id=incident.pk).order_by("-action_time")
return render(request, "risks/item_incident.html", {"incident": incident, "logs": logs})
# ---------------------------------------------------------------------------
# Dashboard
# ---------------------------------------------------------------------------
@login_required
def dashboard(request):
# Bestehende KPIs
risks_total = Risk.objects.count()
risks_by_level = Risk.objects.values("level").annotate(count=Count("id"))
# CIA Counter
risks_cia = Risk.objects.values_list("cia", flat=True)
cia_counter = Counter()
for cia_list in risks_cia:
if isinstance(cia_list, list):
for c in cia_list:
cia_counter[c] += 1
elif cia_list:
cia_counter[cia_list] += 1
# Residual Risks
residual_review_required = ResidualRisk.objects.filter(review_required=True).count()
# Controls & Incidents
controls_by_status = Control.objects.values("status").annotate(count=Count("id"))
incidents_status = Incident.objects.values("status").annotate(count=Count("id"))
# Notifications
notifications_unread = Notification.objects.filter(user=request.user, read=False).count()
# Risks by Level per Month (Trend)
risks_trend_qs = (
Risk.objects.annotate(month=TruncMonth("created_at"))
.values("month", "level")
.annotate(count=Count("id"))
.order_by("month")
)
# Daten für ChartJS vorbereiten
months = sorted(set(r["month"].strftime("%Y-%m") for r in risks_trend_qs if r["month"]))
levels = {r["level"] for r in risks_trend_qs}
trend_data = {lvl: [0] * len(months) for lvl in levels}
for r in risks_trend_qs:
if r["month"]:
idx = months.index(r["month"].strftime("%Y-%m"))
trend_data[r["level"]][idx] = r["count"]
context = {
"risks_total": risks_total,
"risks_by_level": risks_by_level,
"risks_by_cia": dict(cia_counter),
"residual_review_required": residual_review_required,
"controls_by_status": controls_by_status,
"incidents_status": incidents_status,
"notifications_unread": notifications_unread,
# Trend-Daten
"months": months,
"trend_data": trend_data,
}
return render(request, "risks/dashboard.html", context)
# ---------------------------------------------------------------------------
# Notifications
# ---------------------------------------------------------------------------
@login_required
def notifications(request):
"""View own notifications with optional filter."""
flt = request.GET.get("filter", "unread")
qs = Notification.objects.filter(user=request.user).order_by("-created_at")
if flt == "unread":
qs = qs.filter(read=False)
return render(request, "risks/notifications.html", {"notifications": qs, "filter": flt})
@login_required
def notification_mark_read(request, pk):
"""Mark single notification as read."""
if request.method != "POST":
return HttpResponseForbidden()
notif = get_object_or_404(Notification, pk=pk, user=request.user)
notif.read = True
notif.save(update_fields=["read"])
2025-09-16 14:28:09 +02:00
#messages.success(request, _("Notification marked as read."))
return redirect(request.META.get("HTTP_REFERER") or "risks:notifications")
@login_required
def notification_mark_all_read(request):
"""Mark all notifications as read."""
if request.method != "POST":
return HttpResponseForbidden()
Notification.objects.filter(user=request.user, read=False).update(read=True)
2025-09-16 14:28:09 +02:00
#messages.success(request, _("All notifications marked as read."))
return redirect("risks:notifications")
# ---------------------------------------------------------------------------
# Status Updates
# ---------------------------------------------------------------------------
@login_required
def update_risk_status(request, id):
"""Update risk status."""
risk = get_object_or_404(Risk, pk=id)
if not _can_edit_risk(request.user, risk):
return HttpResponseForbidden()
if request.method == "POST":
form = RiskStatusForm(request.POST, instance=risk)
if form.is_valid():
obj = form.save(commit=False)
obj._changed_by = request.user
obj.save(update_fields=["status", "updated_at"])
messages.success(request, _("Risk status updated."))
return redirect("risks:show_risk", id=risk.pk)
@login_required
def update_control_status(request, id):
"""Update control status."""
control = get_object_or_404(Control, pk=id)
if not _can_edit_control(request.user, control):
return HttpResponseForbidden()
if request.method == "POST":
form = ControlStatusForm(request.POST, instance=control)
if form.is_valid():
obj = form.save(commit=False)
obj._changed_by = request.user
obj.save(update_fields=["status", "updated_at"])
messages.success(request, _("Control status updated."))
return redirect("risks:show_control", id=control.pk)
@login_required
def update_incident_status(request, id):
"""Update incident status."""
incident = get_object_or_404(Incident, pk=id)
if not _can_edit_incident(request.user, incident):
return HttpResponseForbidden()
if request.method == "POST":
form = IncidentStatusForm(request.POST, instance=incident)
if form.is_valid():
obj = form.save(commit=False)
obj._changed_by = request.user
obj.save(update_fields=["status", "updated_at"])
messages.success(request, _("Incident status updated."))
return redirect("risks:show_incident", id=incident.pk)
@login_required
def update_residual_review(request, risk_id):
"""Toggle residual risk review flag."""
risk = get_object_or_404(Risk, pk=risk_id)
if not _can_edit_risk(request.user, risk):
return HttpResponseForbidden()
residual, _ = ResidualRisk.objects.get_or_create(risk=risk)
if request.method == "POST":
form = ResidualReviewForm(request.POST, instance=residual)
if form.is_valid():
obj = form.save(commit=False)
obj._changed_by = request.user
obj.save(update_fields=["review_required", "updated_at"])
messages.success(request, _("Residual review flag updated."))
return redirect("risks:show_risk", id=risk.pk)
# ---------------------------------------------------------------------------
# Risk Matrix
# ---------------------------------------------------------------------------
def risk_matrix(request):
"""Show gross/net risk matrix."""
risks = Risk.objects.select_related("owner", "residual_risk").all()
impacts = sorted(Risk.IMPACT_CHOICES, key=lambda x: x[0])
likelihoods = sorted(Risk.LIKELIHOOD_CHOICES, key=lambda x: x[0])
gross_matrix = {i: {l: [] for l, _ in likelihoods} for i, _ in impacts}
net_matrix = {i: {l: [] for l, _ in likelihoods} for i, _ in impacts}
for r in risks:
gross_matrix[r.impact][r.likelihood].append(r)
rr = getattr(r, "residual_risk", None)
if rr:
net_matrix[rr.impact][rr.likelihood].append(r)
return render(request, "risks/risk_matrix.html", {
"impacts": impacts,
"likelihoods": likelihoods,
"gross_matrix": gross_matrix,
"net_matrix": net_matrix,
})