447 lines
16 KiB
Python
447 lines
16 KiB
Python
from django.contrib.admin.models import LogEntry
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib import messages
|
|
from django.db.models import Count, Q
|
|
from django.http import HttpResponseForbidden
|
|
from django.shortcuts import redirect, render, get_object_or_404
|
|
from django.utils.translation import gettext_lazy as _
|
|
from rest_framework import viewsets
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from collections import Counter, defaultdict
|
|
from .forms import RiskStatusForm, ControlStatusForm, IncidentStatusForm, ResidualReviewForm
|
|
from .models import Risk, Control, ResidualRisk, AuditLog, Incident, Notification
|
|
from .serializers import ControlSerializer, RiskSerializer, ResidualRiskSerializer, UserSerializer, AuditSerializer, IncidentSerializer
|
|
|
|
User = get_user_model()
|
|
|
|
def _can_edit_risk(user, risk: Risk) -> bool:
|
|
return bool(user.is_staff or (risk.owner_id and risk.owner_id == user.id))
|
|
|
|
def _can_edit_control(user, control: Control) -> bool:
|
|
return bool(user.is_staff or (control.responsible_id and control.responsible_id == user.id))
|
|
|
|
def _can_edit_incident(user, incident: Incident) -> bool:
|
|
return bool(user.is_staff or (incident.reported_by_id and incident.reported_by_id == user.id))
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# API
|
|
# ---------------------------------------------------------------------------
|
|
class RiskViewSet(viewsets.ModelViewSet):
|
|
"""
|
|
API endpoint for managing Risks.
|
|
Provides CRUD operations.
|
|
"""
|
|
queryset = Risk.objects.all()
|
|
serializer_class = RiskSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def perform_create(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
|
|
def perform_update(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
|
|
class ControlViewSet(viewsets.ModelViewSet):
|
|
"""
|
|
API endpoint for managing Controls.
|
|
Provides CRUD operations.
|
|
"""
|
|
queryset = Control.objects.all()
|
|
serializer_class = ControlSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def perform_create(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
|
|
def perform_update(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
|
|
class ResidualRiskViewSet(viewsets.ModelViewSet):
|
|
"""
|
|
API endpoint for Residual risks.
|
|
"""
|
|
queryset = ResidualRisk.objects.all()
|
|
serializer_class = ResidualRiskSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
class UserViewSet(viewsets.ReadOnlyModelViewSet):
|
|
"""
|
|
API endpoint for listing users and their responsibilities.
|
|
"""
|
|
queryset = User.objects.all()
|
|
serializer_class = UserSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def perform_create(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
|
|
def perform_update(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
|
|
class AuditViewSet(viewsets.ReadOnlyModelViewSet):
|
|
"""
|
|
API endpoint for view audit logging.
|
|
"""
|
|
queryset = AuditLog.objects.all()
|
|
serializer_class = AuditSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
class IncidentViewSet(viewsets.ModelViewSet):
|
|
"""
|
|
API endpoint for listing incidents and its related risks.
|
|
"""
|
|
queryset = Incident.objects.all()
|
|
serializer_class = IncidentSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def perform_create(self, serializer):
|
|
instance = serializer.save(reported_by=self.request.user)
|
|
instance._changed_by = self.request.user
|
|
|
|
def perform_update(self, serializer):
|
|
instance = serializer.save()
|
|
instance._changed_by = self.request.user
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Web => Risks, Controls, Incidents
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@login_required
|
|
def list_risks(request):
|
|
qs = Risk.objects.all().select_related("owner", "residual_risk")
|
|
|
|
# Filter
|
|
risk_id = request.GET.get("risk")
|
|
control_id = request.GET.get("control")
|
|
owner_id = request.GET.get("owner")
|
|
category = request.GET.get("category")
|
|
asset = request.GET.get("asset")
|
|
process = request.GET.get("process")
|
|
|
|
if risk_id:
|
|
qs = qs.filter(id=risk_id)
|
|
if control_id:
|
|
qs = qs.filter(controls__id=control_id)
|
|
if owner_id:
|
|
qs = qs.filter(owner_id=owner_id)
|
|
if category:
|
|
qs = qs.filter(category=category)
|
|
if asset:
|
|
qs = qs.filter(asset=asset)
|
|
if process:
|
|
qs = qs.filter(process=process)
|
|
|
|
sort = request.GET.get("sort") or "title"
|
|
direction = request.GET.get("dir") or "asc"
|
|
if direction == "desc":
|
|
qs = qs.order_by(f"-{sort}")
|
|
else:
|
|
qs = qs.order_by(sort)
|
|
|
|
risks = qs.distinct()
|
|
|
|
risk_choices = Risk.objects.all().order_by("title")
|
|
control_choices = Control.objects.all().order_by("title")
|
|
owner_choices = User.objects.filter(owned_risks__isnull=False).distinct().order_by("username")
|
|
category_choices = (Risk.objects.exclude(category__isnull=True)
|
|
.exclude(category__exact="")
|
|
.values_list("category", flat=True)
|
|
.distinct()
|
|
.order_by("category"))
|
|
asset_choices = (Risk.objects.exclude(asset__isnull=True)
|
|
.exclude(asset__exact="")
|
|
.values_list("asset", flat=True)
|
|
.distinct()
|
|
.order_by("asset"))
|
|
process_choices = (Risk.objects.exclude(process__isnull=True)
|
|
.exclude(process__exact="")
|
|
.values_list("process", flat=True)
|
|
.distinct()
|
|
.order_by("process"))
|
|
|
|
return render(request, "risks/list_risks.html", {
|
|
"risks": risks,
|
|
"risk_choices": risk_choices,
|
|
"control_choices": control_choices,
|
|
"owner_choices": owner_choices,
|
|
"category_choices": category_choices,
|
|
"asset_choices": asset_choices,
|
|
"process_choices": process_choices,
|
|
"current_sort": sort,
|
|
"current_dir": direction,
|
|
})
|
|
|
|
@login_required
|
|
def show_risk(request, id):
|
|
"""
|
|
View for single risk
|
|
"""
|
|
risk = get_object_or_404(
|
|
Risk.objects.select_related("residual_risk", "owner").prefetch_related("controls"),
|
|
pk=id,
|
|
)
|
|
|
|
ct = ContentType.objects.get_for_model(Risk)
|
|
logs = LogEntry.objects.filter(content_type=ct, object_id=risk.pk).order_by("-action_time")
|
|
|
|
return render(request, "risks/item_risk.html", {"risk": risk, "logs": logs})
|
|
|
|
@login_required
|
|
def list_controls(request):
|
|
"""
|
|
View for listing all Controls
|
|
"""
|
|
qs = Control.objects.all().select_related("responsible")
|
|
|
|
control_id = request.GET.get("control")
|
|
risk_id = request.GET.get("risk")
|
|
status = request.GET.get("status")
|
|
responsible_id = request.GET.get("responsible")
|
|
|
|
if control_id:
|
|
qs = qs.filter(id=control_id)
|
|
if risk_id:
|
|
qs = qs.filter(risks__id=risk_id) # FIX
|
|
if status:
|
|
qs = qs.filter(status=status)
|
|
if responsible_id:
|
|
qs = qs.filter(responsible_id=responsible_id)
|
|
|
|
controls = qs.order_by("title").distinct()
|
|
|
|
risks = Risk.objects.all().order_by("title")
|
|
users = User.objects.filter(responsible_controls__isnull=False).distinct().order_by("username")
|
|
|
|
return render(request, "risks/list_controls.html", {
|
|
"controls": controls,
|
|
"risks": risks,
|
|
"users": users,
|
|
"status_choices": Control.STATUS_CHOICES,
|
|
})
|
|
|
|
@login_required
|
|
def show_control(request, id):
|
|
control = get_object_or_404(Control, pk=id)
|
|
ct = ContentType.objects.get_for_model(Control)
|
|
logs = LogEntry.objects.filter(
|
|
content_type=ct,
|
|
object_id=control.pk
|
|
).order_by("-action_time")
|
|
|
|
return render(request, "risks/item_control.html", {"control": control, "logs": logs})
|
|
|
|
@login_required
|
|
def list_incidents(request):
|
|
"""
|
|
View for listing all Incidents
|
|
"""
|
|
qs = Incident.objects.all().select_related("reported_by").prefetch_related("related_risks")
|
|
|
|
risk_id = request.GET.get("risk")
|
|
status = request.GET.get("status")
|
|
reported_by = request.GET.get("reported_by")
|
|
|
|
if risk_id:
|
|
qs = qs.filter(related_risks__id=risk_id) # FIX
|
|
if status:
|
|
qs = qs.filter(status=status)
|
|
if reported_by:
|
|
qs = qs.filter(reported_by=reported_by)
|
|
|
|
incidents = qs.order_by("title").distinct()
|
|
|
|
risks = Risk.objects.all().order_by("title")
|
|
users = User.objects.filter(incidents__isnull=False).distinct().order_by("username") # sinnvoller
|
|
|
|
return render(request, "risks/list_incidents.html", {
|
|
"incidents": incidents,
|
|
"risks": risks,
|
|
"users": users,
|
|
"status_choices": Incident.STATUS_CHOICES,
|
|
})
|
|
|
|
@login_required
|
|
def show_incident(request, id):
|
|
incident = get_object_or_404(Incident, pk=id)
|
|
ct = ContentType.objects.get_for_model(Incident)
|
|
logs = LogEntry.objects.filter(
|
|
content_type=ct,
|
|
object_id=incident.pk
|
|
).order_by("-action_time")
|
|
|
|
return render(request, "risks/item_incident.html", {"incident": incident, "logs": logs})
|
|
@login_required
|
|
def dashboard(request):
|
|
"""
|
|
Dashboardview with KPIs
|
|
"""
|
|
# Risikoübersicht
|
|
risks_total = Risk.objects.count()
|
|
risks_by_level = Risk.objects.values('level').annotate(count=Count('id'))
|
|
|
|
# CIA-Zähler für MultiSelectField
|
|
risks_cia = Risk.objects.values_list('cia', flat=True)
|
|
cia_counter = Counter()
|
|
for cia_list in risks_cia:
|
|
if isinstance(cia_list, list): # MultiSelectField gibt Liste zurück
|
|
for c in cia_list:
|
|
cia_counter[c] += 1
|
|
elif cia_list: # Falls irgendwie noch ein String drin ist
|
|
cia_counter[cia_list] += 1
|
|
|
|
# Residualrisiken
|
|
residual_review_required = ResidualRisk.objects.filter(review_required=True).count()
|
|
|
|
# Kontrollen
|
|
controls_by_status = Control.objects.values('status').annotate(count=Count('id'))
|
|
|
|
# Incidents
|
|
incidents_status = Incident.objects.values('status').annotate(count=Count('id'))
|
|
|
|
# Benachrichtigungen
|
|
notifications_unread = Notification.objects.filter(user=request.user, read=False).count()
|
|
|
|
print(type(cia_counter), cia_counter)
|
|
|
|
# Context für Template
|
|
context = {
|
|
'risks_total': risks_total,
|
|
'risks_by_level': risks_by_level,
|
|
'risks_by_cia': dict(cia_counter), # <-- hier Counter in dict umwandeln
|
|
'residual_review_required': residual_review_required,
|
|
'controls_by_status': controls_by_status,
|
|
'incidents_status': incidents_status,
|
|
'notifications_unread': notifications_unread,
|
|
}
|
|
return render(request, 'risks/dashboard.html', context)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Notifications
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@login_required
|
|
def notifications(request):
|
|
"""Eigene Benachrichtigungen ansehen + filtern"""
|
|
flt = request.GET.get("filter", "unread")
|
|
qs = Notification.objects.filter(user=request.user).order_by("-created_at")
|
|
if flt == "unread":
|
|
qs = qs.filter(read=False)
|
|
# Einfache Pagination (optional)
|
|
return render(request, "risks/notifications.html", {
|
|
"notifications": qs,
|
|
"filter": flt,
|
|
})
|
|
|
|
@login_required
|
|
def notification_mark_read(request, pk):
|
|
if request.method != "POST":
|
|
return HttpResponseForbidden()
|
|
notif = get_object_or_404(Notification, pk=pk, user=request.user)
|
|
notif.read = True
|
|
notif.save(update_fields=["read"])
|
|
messages.success(request, _("Notification marked as read."))
|
|
return redirect(request.META.get("HTTP_REFERER") or "risks:notifications")
|
|
|
|
@login_required
|
|
def notification_mark_all_read(request):
|
|
if request.method != "POST":
|
|
return HttpResponseForbidden()
|
|
Notification.objects.filter(user=request.user, read=False).update(read=True)
|
|
messages.success(request, _("All notifications marked as read."))
|
|
return redirect("risks:notifications")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status Updates
|
|
# ---------------------------------------------------------------------------
|
|
@login_required
|
|
def update_risk_status(request, id):
|
|
risk = get_object_or_404(Risk, pk=id)
|
|
if not _can_edit_risk(request.user, risk):
|
|
return HttpResponseForbidden()
|
|
if request.method == "POST":
|
|
form = RiskStatusForm(request.POST, instance=risk)
|
|
if form.is_valid():
|
|
obj = form.save(commit=False)
|
|
obj._changed_by = request.user
|
|
obj.save(update_fields=["status", "updated_at"])
|
|
messages.success(request, _("Risk status updated."))
|
|
return redirect("risks:show_risk", id=risk.pk)
|
|
|
|
@login_required
|
|
def update_control_status(request, id):
|
|
control = get_object_or_404(Control, pk=id)
|
|
if not _can_edit_control(request.user, control):
|
|
return HttpResponseForbidden()
|
|
if request.method == "POST":
|
|
form = ControlStatusForm(request.POST, instance=control)
|
|
if form.is_valid():
|
|
obj = form.save(commit=False)
|
|
obj._changed_by = request.user
|
|
obj.save(update_fields=["status", "updated_at"])
|
|
messages.success(request, _("Control status updated."))
|
|
return redirect("risks:show_control", id=control.pk)
|
|
|
|
@login_required
|
|
def update_incident_status(request, id):
|
|
incident = get_object_or_404(Incident, pk=id)
|
|
if not _can_edit_incident(request.user, incident):
|
|
return HttpResponseForbidden()
|
|
if request.method == "POST":
|
|
form = IncidentStatusForm(request.POST, instance=incident)
|
|
if form.is_valid():
|
|
obj = form.save(commit=False)
|
|
obj._changed_by = request.user
|
|
obj.save(update_fields=["status", "updated_at"])
|
|
messages.success(request, _("Incident status updated."))
|
|
return redirect("risks:show_incident", id=incident.pk)
|
|
|
|
@login_required
|
|
def update_residual_review(request, risk_id):
|
|
"""Review-Flag (Restrisiko) setzen/lösen"""
|
|
risk = get_object_or_404(Risk, pk=risk_id)
|
|
if not _can_edit_risk(request.user, risk):
|
|
return HttpResponseForbidden()
|
|
residual, created_resid = ResidualRisk.objects.get_or_create(risk=risk)
|
|
if request.method == "POST":
|
|
form = ResidualReviewForm(request.POST, instance=residual)
|
|
if form.is_valid():
|
|
obj = form.save(commit=False)
|
|
obj._changed_by = request.user
|
|
obj.save(update_fields=["review_required", "updated_at"])
|
|
messages.success(request, _("Residual review flag updated."))
|
|
return redirect("risks:show_risk", id=risk.pk)
|
|
|
|
|
|
def risk_matrix(request):
|
|
risks = (Risk.objects
|
|
.select_related("owner", "residual_risk") # wichtig fürs Netto
|
|
.all())
|
|
|
|
impacts = sorted(Risk.IMPACT_CHOICES, key=lambda x: x[0])
|
|
likelihoods = sorted(Risk.LIKELIHOOD_CHOICES, key=lambda x: x[0])
|
|
|
|
gross_matrix = {i: {l: [] for l, _ in likelihoods} for i, _ in impacts}
|
|
net_matrix = {i: {l: [] for l, _ in likelihoods} for i, _ in impacts}
|
|
|
|
for r in risks:
|
|
# Brutto platzieren
|
|
gross_matrix[r.impact][r.likelihood].append(r)
|
|
# Netto (falls vorhanden) platzieren
|
|
rr = getattr(r, "residual_risk", None)
|
|
if rr:
|
|
net_matrix[rr.impact][rr.likelihood].append(r)
|
|
|
|
return render(request, "risks/risk_matrix.html", {
|
|
"impacts": impacts,
|
|
"likelihoods": likelihoods,
|
|
"gross_matrix": gross_matrix,
|
|
"net_matrix": net_matrix,
|
|
})
|