
- Updated the dashboard template to include a new section for displaying the risk trend per month using Chart.js. - Loaded the static files correctly with the addition of the static template tag. - Implemented JavaScript to render a line chart with risk data categorized by severity levels (Low, Medium, High, Critical). - Utilized CSS variables for dynamic color assignment in the chart.
458 lines
17 KiB
Python
458 lines
17 KiB
Python
from collections import Counter
|
|
from django.contrib import messages
|
|
from django.contrib.admin.models import LogEntry
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.db.models import Count
|
|
from django.db.models.functions import TruncMonth
|
|
from django.http import HttpResponseForbidden
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.utils.translation import gettext_lazy as _
|
|
from rest_framework import viewsets
|
|
from rest_framework.permissions import IsAuthenticated
|
|
|
|
from .forms import RiskStatusForm, ControlStatusForm, IncidentStatusForm, ResidualReviewForm
|
|
from .models import Risk, Control, ResidualRisk, AuditLog, Incident, Notification
|
|
from .serializers import (
|
|
ControlSerializer, RiskSerializer, ResidualRiskSerializer,
|
|
UserSerializer, AuditSerializer, IncidentSerializer,
|
|
)
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
def _can_edit_risk(user, risk: Risk) -> bool:
|
|
return bool(user.is_staff or (risk.owner_id and risk.owner_id == user.id))
|
|
|
|
|
|
def _can_edit_control(user, control: Control) -> bool:
|
|
return bool(user.is_staff or (control.responsible_id and control.responsible_id == user.id))
|
|
|
|
|
|
def _can_edit_incident(user, incident: Incident) -> bool:
|
|
return bool(user.is_staff or (incident.reported_by_id and incident.reported_by_id == user.id))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# API ViewSets
|
|
# ---------------------------------------------------------------------------
|
|
class _ChangedByMixin:
|
|
"""Mixin to track user who changed an object."""
|
|
def perform_create(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
|
|
def perform_update(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
|
|
|
|
class RiskViewSet(_ChangedByMixin, viewsets.ModelViewSet):
|
|
"""API endpoint for managing Risks."""
|
|
queryset = Risk.objects.all()
|
|
serializer_class = RiskSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def perform_create(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
instance.save(update_fields=[])
|
|
|
|
def perform_update(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
instance.save(update_fields=[])
|
|
|
|
class ControlViewSet(_ChangedByMixin, viewsets.ModelViewSet):
|
|
"""API endpoint for managing Controls."""
|
|
queryset = Control.objects.all()
|
|
serializer_class = ControlSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def perform_create(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
instance.save(update_fields=[])
|
|
|
|
def perform_update(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
instance.save(update_fields=[])
|
|
|
|
class ResidualRiskViewSet(viewsets.ModelViewSet):
|
|
"""API endpoint for Residual Risks."""
|
|
queryset = ResidualRisk.objects.all()
|
|
serializer_class = ResidualRiskSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def perform_create(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
instance.save(update_fields=[])
|
|
|
|
def perform_update(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
instance.save(update_fields=[])
|
|
|
|
class UserViewSet(_ChangedByMixin, viewsets.ReadOnlyModelViewSet):
|
|
"""API endpoint for listing users and their responsibilities."""
|
|
queryset = User.objects.all()
|
|
serializer_class = UserSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def perform_create(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
instance.save(update_fields=[])
|
|
|
|
def perform_update(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
instance.save(update_fields=[])
|
|
|
|
class AuditViewSet(viewsets.ReadOnlyModelViewSet):
|
|
"""API endpoint for viewing audit logs."""
|
|
queryset = AuditLog.objects.all()
|
|
serializer_class = AuditSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
|
|
class IncidentViewSet(_ChangedByMixin, viewsets.ModelViewSet):
|
|
"""API endpoint for listing incidents and their related risks."""
|
|
queryset = Incident.objects.all()
|
|
serializer_class = IncidentSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def perform_create(self, serializer):
|
|
instance = serializer.save(reported_by=self.request.user)
|
|
instance._changed_by = self.request.user
|
|
|
|
def perform_create(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
instance.save(update_fields=[])
|
|
|
|
def perform_update(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
instance.save(update_fields=[])
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Web Views: Risks
|
|
# ---------------------------------------------------------------------------
|
|
@login_required
|
|
def list_risks(request):
|
|
"""List all risks with filters and sorting."""
|
|
qs = Risk.objects.all().select_related("owner", "residual_risk")
|
|
|
|
# Filters
|
|
filters = {
|
|
"id": request.GET.get("risk"),
|
|
"controls__id": request.GET.get("control"),
|
|
"owner_id": request.GET.get("owner"),
|
|
"category": request.GET.get("category"),
|
|
"asset": request.GET.get("asset"),
|
|
"process": request.GET.get("process"),
|
|
}
|
|
qs = qs.filter(**{k: v for k, v in filters.items() if v})
|
|
|
|
# Sorting
|
|
sort = request.GET.get("sort") or "title"
|
|
direction = request.GET.get("dir") or "asc"
|
|
qs = qs.order_by(f"-{sort}" if direction == "desc" else sort)
|
|
|
|
risks = qs.distinct()
|
|
|
|
return render(request, "risks/list_risks.html", {
|
|
"risks": risks,
|
|
"risk_choices": Risk.objects.all().order_by("title"),
|
|
"control_choices": Control.objects.all().order_by("title"),
|
|
"owner_choices": User.objects.filter(owned_risks__isnull=False).distinct().order_by("username"),
|
|
"category_choices": (Risk.objects.exclude(category__isnull=True).exclude(category__exact="")
|
|
.values_list("category", flat=True).distinct().order_by("category")),
|
|
"asset_choices": (Risk.objects.exclude(asset__isnull=True).exclude(asset__exact="")
|
|
.values_list("asset", flat=True).distinct().order_by("asset")),
|
|
"process_choices": (Risk.objects.exclude(process__isnull=True).exclude(process__exact="")
|
|
.values_list("process", flat=True).distinct().order_by("process")),
|
|
"current_sort": sort,
|
|
"current_dir": direction,
|
|
})
|
|
|
|
|
|
@login_required
|
|
def show_risk(request, id):
|
|
"""Show single risk details + logs."""
|
|
risk = get_object_or_404(
|
|
Risk.objects.select_related("residual_risk", "owner").prefetch_related("controls"),
|
|
pk=id,
|
|
)
|
|
ct = ContentType.objects.get_for_model(Risk)
|
|
logs = LogEntry.objects.filter(content_type=ct, object_id=risk.pk).order_by("-action_time")
|
|
return render(request, "risks/item_risk.html", {"risk": risk, "logs": logs})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Web Views: Controls
|
|
# ---------------------------------------------------------------------------
|
|
@login_required
|
|
def list_controls(request):
|
|
"""List all controls with filters."""
|
|
qs = Control.objects.all().select_related("responsible")
|
|
|
|
filters = {
|
|
"id": request.GET.get("control"),
|
|
"risks__id": request.GET.get("risk"),
|
|
"status": request.GET.get("status"),
|
|
"responsible_id": request.GET.get("responsible"),
|
|
}
|
|
qs = qs.filter(**{k: v for k, v in filters.items() if v})
|
|
|
|
controls = qs.order_by("title").distinct()
|
|
|
|
return render(request, "risks/list_controls.html", {
|
|
"controls": controls,
|
|
"control_choices": Control.objects.all().order_by("title"),
|
|
"risk_choices": Risk.objects.all().order_by("title"),
|
|
"responsible_choices": User.objects.filter(responsible_controls__isnull=False).distinct().order_by("username"),
|
|
"status_choices": Control.STATUS_CHOICES,
|
|
})
|
|
|
|
|
|
@login_required
|
|
def show_control(request, id):
|
|
"""Show single control details + logs."""
|
|
control = get_object_or_404(Control, pk=id)
|
|
ct = ContentType.objects.get_for_model(Control)
|
|
logs = LogEntry.objects.filter(content_type=ct, object_id=control.pk).order_by("-action_time")
|
|
return render(request, "risks/item_control.html", {"control": control, "logs": logs})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Web Views: Incidents
|
|
# ---------------------------------------------------------------------------
|
|
@login_required
|
|
def list_incidents(request):
|
|
"""List all incidents with filters."""
|
|
qs = Incident.objects.all().select_related("reported_by").prefetch_related("related_risks")
|
|
|
|
filters = {
|
|
"related_risks__id": request.GET.get("risk"),
|
|
"status": request.GET.get("status"),
|
|
"reported_by": request.GET.get("reported_by"),
|
|
}
|
|
qs = qs.filter(**{k: v for k, v in filters.items() if v})
|
|
|
|
incidents = qs.order_by("title").distinct()
|
|
|
|
return render(request, "risks/list_incidents.html", {
|
|
"incidents": incidents,
|
|
"incident_choices": incidents,
|
|
"risk_choices": Risk.objects.all().order_by("title"),
|
|
"user_choices": User.objects.filter(incidents__isnull=False).distinct().order_by("username"),
|
|
"status_choices": Incident.STATUS_CHOICES,
|
|
})
|
|
|
|
|
|
@login_required
|
|
def show_incident(request, id):
|
|
"""Show single incident details + logs."""
|
|
incident = get_object_or_404(Incident, pk=id)
|
|
ct = ContentType.objects.get_for_model(Incident)
|
|
logs = LogEntry.objects.filter(content_type=ct, object_id=incident.pk).order_by("-action_time")
|
|
return render(request, "risks/item_incident.html", {"incident": incident, "logs": logs})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dashboard
|
|
# ---------------------------------------------------------------------------
|
|
@login_required
|
|
def dashboard(request):
|
|
# Bestehende KPIs
|
|
risks_total = Risk.objects.count()
|
|
risks_by_level = Risk.objects.values("level").annotate(count=Count("id"))
|
|
|
|
# CIA Counter
|
|
risks_cia = Risk.objects.values_list("cia", flat=True)
|
|
cia_counter = Counter()
|
|
for cia_list in risks_cia:
|
|
if isinstance(cia_list, list):
|
|
for c in cia_list:
|
|
cia_counter[c] += 1
|
|
elif cia_list:
|
|
cia_counter[cia_list] += 1
|
|
|
|
# Residual Risks
|
|
residual_review_required = ResidualRisk.objects.filter(review_required=True).count()
|
|
|
|
# Controls & Incidents
|
|
controls_by_status = Control.objects.values("status").annotate(count=Count("id"))
|
|
incidents_status = Incident.objects.values("status").annotate(count=Count("id"))
|
|
|
|
# Notifications
|
|
notifications_unread = Notification.objects.filter(user=request.user, read=False).count()
|
|
|
|
# Risks by Level per Month (Trend)
|
|
risks_trend_qs = (
|
|
Risk.objects.annotate(month=TruncMonth("created_at"))
|
|
.values("month", "level")
|
|
.annotate(count=Count("id"))
|
|
.order_by("month")
|
|
)
|
|
|
|
# Daten für ChartJS vorbereiten
|
|
months = sorted(set(r["month"].strftime("%Y-%m") for r in risks_trend_qs if r["month"]))
|
|
levels = {r["level"] for r in risks_trend_qs}
|
|
|
|
trend_data = {lvl: [0] * len(months) for lvl in levels}
|
|
for r in risks_trend_qs:
|
|
if r["month"]:
|
|
idx = months.index(r["month"].strftime("%Y-%m"))
|
|
trend_data[r["level"]][idx] = r["count"]
|
|
|
|
context = {
|
|
"risks_total": risks_total,
|
|
"risks_by_level": risks_by_level,
|
|
"risks_by_cia": dict(cia_counter),
|
|
"residual_review_required": residual_review_required,
|
|
"controls_by_status": controls_by_status,
|
|
"incidents_status": incidents_status,
|
|
"notifications_unread": notifications_unread,
|
|
# Trend-Daten
|
|
"months": months,
|
|
"trend_data": trend_data,
|
|
}
|
|
return render(request, "risks/dashboard.html", context)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Notifications
|
|
# ---------------------------------------------------------------------------
|
|
@login_required
|
|
def notifications(request):
|
|
"""View own notifications with optional filter."""
|
|
flt = request.GET.get("filter", "unread")
|
|
qs = Notification.objects.filter(user=request.user).order_by("-created_at")
|
|
if flt == "unread":
|
|
qs = qs.filter(read=False)
|
|
return render(request, "risks/notifications.html", {"notifications": qs, "filter": flt})
|
|
|
|
|
|
@login_required
|
|
def notification_mark_read(request, pk):
|
|
"""Mark single notification as read."""
|
|
if request.method != "POST":
|
|
return HttpResponseForbidden()
|
|
notif = get_object_or_404(Notification, pk=pk, user=request.user)
|
|
notif.read = True
|
|
notif.save(update_fields=["read"])
|
|
messages.success(request, _("Notification marked as read."))
|
|
return redirect(request.META.get("HTTP_REFERER") or "risks:notifications")
|
|
|
|
|
|
@login_required
|
|
def notification_mark_all_read(request):
|
|
"""Mark all notifications as read."""
|
|
if request.method != "POST":
|
|
return HttpResponseForbidden()
|
|
Notification.objects.filter(user=request.user, read=False).update(read=True)
|
|
messages.success(request, _("All notifications marked as read."))
|
|
return redirect("risks:notifications")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status Updates
|
|
# ---------------------------------------------------------------------------
|
|
@login_required
|
|
def update_risk_status(request, id):
|
|
"""Update risk status."""
|
|
risk = get_object_or_404(Risk, pk=id)
|
|
if not _can_edit_risk(request.user, risk):
|
|
return HttpResponseForbidden()
|
|
if request.method == "POST":
|
|
form = RiskStatusForm(request.POST, instance=risk)
|
|
if form.is_valid():
|
|
obj = form.save(commit=False)
|
|
obj._changed_by = request.user
|
|
obj.save(update_fields=["status", "updated_at"])
|
|
messages.success(request, _("Risk status updated."))
|
|
return redirect("risks:show_risk", id=risk.pk)
|
|
|
|
|
|
@login_required
|
|
def update_control_status(request, id):
|
|
"""Update control status."""
|
|
control = get_object_or_404(Control, pk=id)
|
|
if not _can_edit_control(request.user, control):
|
|
return HttpResponseForbidden()
|
|
if request.method == "POST":
|
|
form = ControlStatusForm(request.POST, instance=control)
|
|
if form.is_valid():
|
|
obj = form.save(commit=False)
|
|
obj._changed_by = request.user
|
|
obj.save(update_fields=["status", "updated_at"])
|
|
messages.success(request, _("Control status updated."))
|
|
return redirect("risks:show_control", id=control.pk)
|
|
|
|
|
|
@login_required
|
|
def update_incident_status(request, id):
|
|
"""Update incident status."""
|
|
incident = get_object_or_404(Incident, pk=id)
|
|
if not _can_edit_incident(request.user, incident):
|
|
return HttpResponseForbidden()
|
|
if request.method == "POST":
|
|
form = IncidentStatusForm(request.POST, instance=incident)
|
|
if form.is_valid():
|
|
obj = form.save(commit=False)
|
|
obj._changed_by = request.user
|
|
obj.save(update_fields=["status", "updated_at"])
|
|
messages.success(request, _("Incident status updated."))
|
|
return redirect("risks:show_incident", id=incident.pk)
|
|
|
|
|
|
@login_required
|
|
def update_residual_review(request, risk_id):
|
|
"""Toggle residual risk review flag."""
|
|
risk = get_object_or_404(Risk, pk=risk_id)
|
|
if not _can_edit_risk(request.user, risk):
|
|
return HttpResponseForbidden()
|
|
residual, _ = ResidualRisk.objects.get_or_create(risk=risk)
|
|
if request.method == "POST":
|
|
form = ResidualReviewForm(request.POST, instance=residual)
|
|
if form.is_valid():
|
|
obj = form.save(commit=False)
|
|
obj._changed_by = request.user
|
|
obj.save(update_fields=["review_required", "updated_at"])
|
|
messages.success(request, _("Residual review flag updated."))
|
|
return redirect("risks:show_risk", id=risk.pk)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Risk Matrix
|
|
# ---------------------------------------------------------------------------
|
|
def risk_matrix(request):
|
|
"""Show gross/net risk matrix."""
|
|
risks = Risk.objects.select_related("owner", "residual_risk").all()
|
|
impacts = sorted(Risk.IMPACT_CHOICES, key=lambda x: x[0])
|
|
likelihoods = sorted(Risk.LIKELIHOOD_CHOICES, key=lambda x: x[0])
|
|
|
|
gross_matrix = {i: {l: [] for l, _ in likelihoods} for i, _ in impacts}
|
|
net_matrix = {i: {l: [] for l, _ in likelihoods} for i, _ in impacts}
|
|
|
|
for r in risks:
|
|
gross_matrix[r.impact][r.likelihood].append(r)
|
|
rr = getattr(r, "residual_risk", None)
|
|
if rr:
|
|
net_matrix[rr.impact][rr.likelihood].append(r)
|
|
|
|
return render(request, "risks/risk_matrix.html", {
|
|
"impacts": impacts,
|
|
"likelihoods": likelihoods,
|
|
"gross_matrix": gross_matrix,
|
|
"net_matrix": net_matrix,
|
|
})
|