ISO-27001-Risk-Management/risks/views.py
Kevin Heyer ebfcbddd5c Implement notification system and status update forms
- Added Notification model with admin interface for managing notifications.
- Created context processor to count unread notifications for the user.
- Introduced forms for updating the status of Risk, Control, Incident, and ResidualRisk.
- Added views for displaying and managing notifications, including marking them as read.
- Updated URLs to include routes for notifications and status updates.
- Enhanced templates to support notifications display and status update forms.
- Improved CSS for avatar and badge display in the navbar.
- Translated various static texts to support internationalization.
2025-09-10 13:44:03 +02:00

385 lines
No EOL
13 KiB
Python

from django.contrib.admin.models import LogEntry
from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import login_required
from django.contrib.contenttypes.models import ContentType
from django.contrib import messages
from django.db.models import Count, Q
from django.http import HttpResponseForbidden
from django.shortcuts import redirect, render, get_object_or_404
from django.utils.translation import gettext_lazy as _
from rest_framework import viewsets
from rest_framework.permissions import IsAuthenticated
from collections import Counter
from .forms import RiskStatusForm, ControlStatusForm, IncidentStatusForm, ResidualReviewForm
from .models import Risk, Control, ResidualRisk, AuditLog, Incident, Notification
from .serializers import ControlSerializer, RiskSerializer, ResidualRiskSerializer, UserSerializer, AuditSerializer, IncidentSerializer
User = get_user_model()
def _can_edit_risk(user, risk: Risk) -> bool:
return bool(user.is_staff or (risk.owner_id and risk.owner_id == user.id))
def _can_edit_control(user, control: Control) -> bool:
return bool(user.is_staff or (control.responsible_id and control.responsible_id == user.id))
def _can_edit_incident(user, incident: Incident) -> bool:
return bool(user.is_staff or (incident.reported_by_id and incident.reported_by_id == user.id))
# ---------------------------------------------------------------------------
# API
# ---------------------------------------------------------------------------
class RiskViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing Risks.
Provides CRUD operations.
"""
queryset = Risk.objects.all()
serializer_class = RiskSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
class ControlViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing Controls.
Provides CRUD operations.
"""
queryset = Control.objects.all()
serializer_class = ControlSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
class ResidualRiskViewSet(viewsets.ModelViewSet):
"""
API endpoint for Residual risks.
"""
queryset = ResidualRisk.objects.all()
serializer_class = ResidualRiskSerializer
permission_classes = [IsAuthenticated]
class UserViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint for listing users and their responsibilities.
"""
queryset = User.objects.all()
serializer_class = UserSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
class AuditViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint for view audit logging.
"""
queryset = AuditLog.objects.all()
serializer_class = AuditSerializer
permission_classes = [IsAuthenticated]
class IncidentViewSet(viewsets.ModelViewSet):
"""
API endpoint for listing incidents and its related risks.
"""
queryset = Incident.objects.all()
serializer_class = IncidentSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save(reported_by=self.request.user)
instance._changed_by = self.request.user
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
# ---------------------------------------------------------------------------
# Web => Risks, Controls, Incidents
# ---------------------------------------------------------------------------
@login_required
def list_risks(request):
"""
View for listing all Risks
"""
qs = Risk.objects.all().select_related("owner")
# GET-Parameter lesen
risk_id = request.GET.get("risk")
control_id = request.GET.get("control")
owner_id = request.GET.get("owner")
if risk_id:
qs = qs.filter(id=risk_id)
if control_id:
qs = qs.filter(controls__id=control_id)
if owner_id:
qs = qs.filter(owner_id=owner_id)
risks = qs.order_by("title").distinct()
controls = Control.objects.all().order_by("title")
owners = User.objects.filter(owned_risks__isnull=False).distinct().order_by("username")
return render(request, "risks/list_risks.html", {
"risks": risks,
"controls": controls,
"owners": owners,
})
@login_required
def show_risk(request, id):
"""
View for single risk
"""
risk = get_object_or_404(
Risk.objects.select_related("residual_risk", "owner").prefetch_related("controls"),
pk=id,
)
ct = ContentType.objects.get_for_model(Risk)
logs = LogEntry.objects.filter(content_type=ct, object_id=risk.pk).order_by("-action_time")
return render(request, "risks/item_risk.html", {"risk": risk, "logs": logs})
@login_required
def list_controls(request):
"""
View for listing all Controls
"""
qs = Control.objects.all().select_related("responsible")
control_id = request.GET.get("control")
risk_id = request.GET.get("risk")
status = request.GET.get("status")
responsible_id = request.GET.get("responsible")
if control_id:
qs = qs.filter(id=control_id)
if risk_id:
qs = qs.filter(risks__id=risk_id) # FIX
if status:
qs = qs.filter(status=status)
if responsible_id:
qs = qs.filter(responsible_id=responsible_id)
controls = qs.order_by("title").distinct()
risks = Risk.objects.all().order_by("title")
users = User.objects.filter(responsible_controls__isnull=False).distinct().order_by("username")
return render(request, "risks/list_controls.html", {
"controls": controls,
"risks": risks,
"users": users,
"status_choices": Control.STATUS_CHOICES,
})
@login_required
def show_control(request, id):
control = get_object_or_404(Control, pk=id)
ct = ContentType.objects.get_for_model(Control)
logs = LogEntry.objects.filter(
content_type=ct,
object_id=control.pk
).order_by("-action_time")
return render(request, "risks/item_control.html", {"control": control, "logs": logs})
@login_required
def list_incidents(request):
"""
View for listing all Incidents
"""
qs = Incident.objects.all().select_related("reported_by").prefetch_related("related_risks")
risk_id = request.GET.get("risk")
status = request.GET.get("status")
reported_by = request.GET.get("reported_by")
if risk_id:
qs = qs.filter(related_risks__id=risk_id) # FIX
if status:
qs = qs.filter(status=status)
if reported_by:
qs = qs.filter(reported_by=reported_by)
incidents = qs.order_by("title").distinct()
risks = Risk.objects.all().order_by("title")
users = User.objects.filter(incidents__isnull=False).distinct().order_by("username") # sinnvoller
return render(request, "risks/list_incidents.html", {
"incidents": incidents,
"risks": risks,
"users": users,
"status_choices": Incident.STATUS_CHOICES,
})
@login_required
def show_incident(request, id):
incident = get_object_or_404(Incident, pk=id)
ct = ContentType.objects.get_for_model(Incident)
logs = LogEntry.objects.filter(
content_type=ct,
object_id=incident.pk
).order_by("-action_time")
return render(request, "risks/item_incident.html", {"incident": incident, "logs": logs})
@login_required
def dashboard(request):
"""
Dashboardview with KPIs
"""
# Risikoübersicht
risks_total = Risk.objects.count()
risks_by_level = Risk.objects.values('level').annotate(count=Count('id'))
# CIA-Zähler für MultiSelectField
risks_cia = Risk.objects.values_list('cia', flat=True)
cia_counter = Counter()
for cia_list in risks_cia:
if isinstance(cia_list, list): # MultiSelectField gibt Liste zurück
for c in cia_list:
cia_counter[c] += 1
elif cia_list: # Falls irgendwie noch ein String drin ist
cia_counter[cia_list] += 1
# Residualrisiken
residual_review_required = ResidualRisk.objects.filter(review_required=True).count()
# Kontrollen
controls_by_status = Control.objects.values('status').annotate(count=Count('id'))
# Incidents
incidents_status = Incident.objects.values('status').annotate(count=Count('id'))
# Benachrichtigungen
notifications_unread = Notification.objects.filter(user=request.user, read=False).count()
print(type(cia_counter), cia_counter)
# Context für Template
context = {
'risks_total': risks_total,
'risks_by_level': risks_by_level,
'risks_by_cia': dict(cia_counter), # <-- hier Counter in dict umwandeln
'residual_review_required': residual_review_required,
'controls_by_status': controls_by_status,
'incidents_status': incidents_status,
'notifications_unread': notifications_unread,
}
return render(request, 'risks/dashboard.html', context)
# ---------------------------------------------------------------------------
# Notifications
# ---------------------------------------------------------------------------
@login_required
def notifications(request):
"""Eigene Benachrichtigungen ansehen + filtern"""
flt = request.GET.get("filter", "unread")
qs = Notification.objects.filter(user=request.user).order_by("-created_at")
if flt == "unread":
qs = qs.filter(read=False)
# Einfache Pagination (optional)
return render(request, "risks/notifications.html", {
"notifications": qs,
"filter": flt,
})
@login_required
def notification_mark_read(request, pk):
if request.method != "POST":
return HttpResponseForbidden()
notif = get_object_or_404(Notification, pk=pk, user=request.user)
notif.read = True
notif.save(update_fields=["read"])
messages.success(request, _("Notification marked as read."))
return redirect(request.META.get("HTTP_REFERER") or "risks:notifications")
@login_required
def notification_mark_all_read(request):
if request.method != "POST":
return HttpResponseForbidden()
Notification.objects.filter(user=request.user, read=False).update(read=True)
messages.success(request, _("All notifications marked as read."))
return redirect("risks:notifications")
# ---------------------------------------------------------------------------
# Status Updates
# ---------------------------------------------------------------------------
@login_required
def update_risk_status(request, id):
risk = get_object_or_404(Risk, pk=id)
if not _can_edit_risk(request.user, risk):
return HttpResponseForbidden()
if request.method == "POST":
form = RiskStatusForm(request.POST, instance=risk)
if form.is_valid():
obj = form.save(commit=False)
obj._changed_by = request.user
obj.save(update_fields=["status", "updated_at"])
messages.success(request, _("Risk status updated."))
return redirect("risks:show_risk", id=risk.pk)
@login_required
def update_control_status(request, id):
control = get_object_or_404(Control, pk=id)
if not _can_edit_control(request.user, control):
return HttpResponseForbidden()
if request.method == "POST":
form = ControlStatusForm(request.POST, instance=control)
if form.is_valid():
obj = form.save(commit=False)
obj._changed_by = request.user
obj.save(update_fields=["status", "updated_at"])
messages.success(request, _("Control status updated."))
return redirect("risks:show_control", id=control.pk)
@login_required
def update_incident_status(request, id):
incident = get_object_or_404(Incident, pk=id)
if not _can_edit_incident(request.user, incident):
return HttpResponseForbidden()
if request.method == "POST":
form = IncidentStatusForm(request.POST, instance=incident)
if form.is_valid():
obj = form.save(commit=False)
obj._changed_by = request.user
obj.save(update_fields=["status", "updated_at"])
messages.success(request, _("Incident status updated."))
return redirect("risks:show_incident", id=incident.pk)
@login_required
def update_residual_review(request, risk_id):
"""Review-Flag (Restrisiko) setzen/lösen"""
risk = get_object_or_404(Risk, pk=risk_id)
if not _can_edit_risk(request.user, risk):
return HttpResponseForbidden()
residual, created_resid = ResidualRisk.objects.get_or_create(risk=risk)
if request.method == "POST":
form = ResidualReviewForm(request.POST, instance=residual)
if form.is_valid():
obj = form.save(commit=False)
obj._changed_by = request.user
obj.save(update_fields=["review_required", "updated_at"])
messages.success(request, _("Residual review flag updated."))
return redirect("risks:show_risk", id=risk.pk)