ISO-27001-Risk-Management/risks/views.py

267 lines
8.8 KiB
Python
Raw Normal View History

from django.contrib.admin.models import LogEntry
from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import login_required
from django.contrib.contenttypes.models import ContentType
from django.db.models import Count, Q
from rest_framework import viewsets
from rest_framework.permissions import IsAuthenticated
from django.shortcuts import render, get_object_or_404
from collections import Counter
from .models import Risk, Control, ResidualRisk, AuditLog, Incident, Notification
from .serializers import ControlSerializer, RiskSerializer, ResidualRiskSerializer, UserSerializer, AuditSerializer, IncidentSerializer
User = get_user_model()
# ---------------------------------------------------------------------------
# API
# ---------------------------------------------------------------------------
class RiskViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing Risks.
Provides CRUD operations.
"""
queryset = Risk.objects.all()
serializer_class = RiskSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
class ControlViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing Controls.
Provides CRUD operations.
"""
queryset = Control.objects.all()
serializer_class = ControlSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
class ResidualRiskViewSet(viewsets.ModelViewSet):
queryset = ResidualRisk.objects.all()
serializer_class = ResidualRiskSerializer
permission_classes = [IsAuthenticated]
class UserViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint for listing users and their responsibilities.
"""
queryset = User.objects.all()
serializer_class = UserSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
class AuditViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint for view audit logging.
"""
queryset = AuditLog.objects.all()
serializer_class = AuditSerializer
permission_classes = [IsAuthenticated]
class IncidentViewSet(viewsets.ModelViewSet):
"""
API endpoint for listing incidents and its related risks.
"""
queryset = Incident.objects.all()
serializer_class = IncidentSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save(reported_by=self.request.user)
instance._changed_by = self.request.user
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
# ---------------------------------------------------------------------------
# Web => Risks, Controls, Incidents
# ---------------------------------------------------------------------------
@login_required
def stats(request):
return render(request, "risks/statistics.html")
@login_required
def list_risks(request):
qs = Risk.objects.all().select_related("owner")
# GET-Parameter lesen
risk_id = request.GET.get("risk")
control_id = request.GET.get("control")
owner_id = request.GET.get("owner")
if risk_id:
qs = qs.filter(id=risk_id)
if control_id:
qs = qs.filter(controls__id=control_id)
if owner_id:
qs = qs.filter(owner_id=owner_id)
risks = qs.order_by("title").distinct()
controls = Control.objects.all().order_by("title")
owners = User.objects.filter(owned_risks__isnull=False).distinct().order_by("username")
return render(request, "risks/list_risks.html", {
"risks": risks,
"controls": controls,
"owners": owners,
})
@login_required
def show_risk(request, id):
risk = get_object_or_404(
Risk.objects.select_related("residual_risk", "owner").prefetch_related("controls"),
pk=id,
)
ct = ContentType.objects.get_for_model(Risk)
logs = LogEntry.objects.filter(content_type=ct, object_id=risk.pk).order_by("-action_time")
return render(request, "risks/item_risk.html", {"risk": risk, "logs": logs})
@login_required
def list_controls(request):
qs = Control.objects.all().select_related("responsible")
control_id = request.GET.get("control")
risk_id = request.GET.get("risk")
status = request.GET.get("status")
responsible_id = request.GET.get("responsible")
if control_id:
qs = qs.filter(id=control_id)
if risk_id:
qs = qs.filter(risks__id=risk_id) # FIX
if status:
qs = qs.filter(status=status)
if responsible_id:
qs = qs.filter(responsible_id=responsible_id)
controls = qs.order_by("title").distinct()
risks = Risk.objects.all().order_by("title")
users = User.objects.filter(responsible_controls__isnull=False).distinct().order_by("username")
return render(request, "risks/list_controls.html", {
"controls": controls,
"risks": risks,
"users": users,
"status_choices": Control.STATUS_CHOICES,
})
@login_required
def show_control(request, id):
control = get_object_or_404(Control, pk=id)
ct = ContentType.objects.get_for_model(Control)
logs = LogEntry.objects.filter(
content_type=ct,
object_id=control.pk
).order_by("-action_time")
return render(request, "risks/item_control.html", {"control": control, "logs": logs})
@login_required
def list_incidents(request):
qs = Incident.objects.all().select_related("reported_by").prefetch_related("related_risks")
risk_id = request.GET.get("risk")
status = request.GET.get("status")
reported_by = request.GET.get("reported_by")
if risk_id:
qs = qs.filter(related_risks__id=risk_id) # FIX
if status:
qs = qs.filter(status=status)
if reported_by:
qs = qs.filter(reported_by=reported_by)
incidents = qs.order_by("title").distinct()
risks = Risk.objects.all().order_by("title")
users = User.objects.filter(incidents__isnull=False).distinct().order_by("username") # sinnvoller
return render(request, "risks/list_incidents.html", {
"incidents": incidents,
"risks": risks,
"users": users,
"status_choices": Incident.STATUS_CHOICES,
})
@login_required
def show_incident(request, id):
incident = get_object_or_404(Incident, pk=id)
ct = ContentType.objects.get_for_model(Incident)
logs = LogEntry.objects.filter(
content_type=ct,
object_id=incident.pk
).order_by("-action_time")
return render(request, "risks/item_incident.html", {"incident": incident, "logs": logs})
# ---------------------------------------------------------------------------
# Web => Dashboard
# ---------------------------------------------------------------------------
@login_required
def dashboard(request):
# Risikoübersicht
risks_total = Risk.objects.count()
risks_by_level = Risk.objects.values('level').annotate(count=Count('id'))
# CIA-Zähler für MultiSelectField
risks_cia = Risk.objects.values_list('cia', flat=True)
cia_counter = Counter()
for cia_list in risks_cia:
if isinstance(cia_list, list): # MultiSelectField gibt Liste zurück
for c in cia_list:
cia_counter[c] += 1
elif cia_list: # Falls irgendwie noch ein String drin ist
cia_counter[cia_list] += 1
# Residualrisiken
residual_review_required = ResidualRisk.objects.filter(review_required=True).count()
# Kontrollen
controls_by_status = Control.objects.values('status').annotate(count=Count('id'))
# Incidents
incidents_status = Incident.objects.values('status').annotate(count=Count('id'))
# Benachrichtigungen
notifications_unread = Notification.objects.filter(user=request.user, read=False).count()
print(type(cia_counter), cia_counter)
# Context für Template
context = {
'risks_total': risks_total,
'risks_by_level': risks_by_level,
'risks_by_cia': dict(cia_counter), # <-- hier Counter in dict umwandeln
'residual_review_required': residual_review_required,
'controls_by_status': controls_by_status,
'incidents_status': incidents_status,
'notifications_unread': notifications_unread,
}
return render(request, 'risks/dashboard.html', context)