ISO-27001-Risk-Management/risks/signals.py
Kevin Heyer ab01841cf2 Add risk status and notification preferences
- Introduced a new `status` field to the `Risk` model with choices for "open", "in_progress", "closed", and "review_required".
- Created a `NotificationPreference` model to manage user notification settings for various events related to risks, controls, residual risks, reviews, users, and incidents.
- Updated the admin interface to include `NotificationPreference` inline with the `User` admin.
- Enhanced signal handlers to send notifications based on user preferences for created, updated, and deleted events for users, risks, controls, and incidents.
- Modified the `check_risk_followups` utility function to update risk status and create notifications for follow-ups.
- Updated serializers and views to accommodate the new `status` field and improved risk listing functionality.
- Added a new section in the risk detail template to display related incidents.
- Removed the unused statistics view from URLs.
2025-09-10 11:54:08 +02:00

355 lines
No EOL
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import date, datetime
from django.contrib.auth import get_user_model
from django.db.models import Model
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.dispatch import receiver
from django.utils.translation import gettext_lazy as _
from .audit_context import get_current_user
from .models import Control, Risk, ResidualRisk, AuditLog, Incident, Notification, NotificationPreference
from .utils import model_diff
# ---------------------------------------------------------------------------
# General definitions
# ---------------------------------------------------------------------------
User = get_user_model()
def serialize_value(value):
if isinstance(value, Model):
return value.pk # oder str(value), wenn du mehr Infos willst
if isinstance(value, (datetime, date)):
return value.isoformat()
return value
def _pref(user: User) -> NotificationPreference | None:
if not user:
return None
pref = getattr(user, "notification_preference", None)
if not pref:
pref = NotificationPreference.objects.create(user=user)
return pref
def _notify(users, message: str, event_code: str):
"""legt Notification für alle users an, die dieses Event wünschen."""
for u in set(filter(None, users)):
pref = _pref(u)
if pref and pref.should_notify(event_code):
Notification.objects.create(user=u, message=message)
def _risk_stakeholders(risk: Risk):
"""Risikoeigner + alle Verantwortlichen zugehöriger Controls."""
owners = [risk.owner] if risk.owner else []
responsibles = list(
User.objects.filter(responsible_controls__risks=risk).distinct()
)
return set(owners + responsibles)
# ---------------------------------------------------------------------------
# Incidents
# ---------------------------------------------------------------------------
@receiver(post_save, sender=User)
def user_saved(sender, instance: User, created, **kwargs):
# Prefs automatisch anlegen
_pref(instance)
# An Staff, die dieses Event wollen
if created:
staff = User.objects.filter(is_staff=True, notification_preference__user_created=True)
_notify(staff, _("User '{u}' created").format(u=instance.username), "user_created")
@receiver(post_delete, sender=User)
def user_deleted(sender, instance: User, **kwargs):
staff = User.objects.filter(is_staff=True, notification_preference__user_deleted=True)
_notify(staff, _("User '{u}' deleted").format(u=instance.username), "user_deleted")
# ---------------------------------------------------------------------------
# Risks
# ---------------------------------------------------------------------------
@receiver(post_save, sender=Risk)
def risk_saved(sender, instance: Risk, created, **kwargs):
event = "risk_created" if created else "risk_updated"
msg = _("Risk '{title}' {state}").format(
title=instance.title,
state=_("created") if created else _("updated"),
)
_notify([instance.owner], msg, event)
@receiver(post_delete, sender=Risk)
def risk_deleted(sender, instance: Risk, **kwargs):
msg = _("Risk '{title}' deleted").format(title=instance.title)
# Owner existiert evtl. nicht mehr -> kein Notify nötig
if instance.owner:
_notify([instance.owner], msg, "risk_deleted")
@receiver(post_save, sender=Risk)
def log_risk_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="Risk",
object_id=instance.pk,
changes={
f.name: {
"old": None,
"new": serialize_value(getattr(instance, f.name))
} for f in instance._meta.fields
},
)
else:
old = Risk.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean_changes = {
field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])}
for field, vals in changes.items()
}
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Risk",
object_id=instance.pk,
changes=clean_changes,
)
@receiver(post_delete, sender=Risk)
def log_risk_delete(sender, instance, **kwargs):
"""
Signal that runs after a Risk is deleted.
"""
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user,
action="delete",
model="Risk",
object_id=instance.pk,
changes=None, # no fields to track on deletion
)
# ---------------------------------------------------------------------------
# Controls
# ---------------------------------------------------------------------------
@receiver(post_save, sender=Control)
def control_saved(sender, instance: Control, created, **kwargs):
# Review-Flag für alle betroffenen Residuals setzen
for risk in instance.risks.all():
resid, created = ResidualRisk.objects.get_or_create(risk=risk)
# Statuswechsel auf Review Required
if not resid.review_required:
resid.review_required = True
resid.save()
if risk.status != "review_required":
Risk.objects.filter(pk=risk.pk).update(status="review_required")
# Notifications
event = "control_created" if created else "control_updated"
msg = _("Control '{title}' {state}").format(
title=instance.title,
state=_("created") if created else _("updated"),
)
stakeholders = {instance.responsible} | set(r.owner for r in instance.risks.all() if r.owner)
_notify(stakeholders, msg, event)
@receiver(post_delete, sender=Control)
def control_deleted(sender, instance: Control, **kwargs):
msg = _("Control '{title}' deleted").format(title=instance.title)
stakeholders = {instance.responsible} | set(r.owner for r in instance.risks.all() if r.owner)
_notify(stakeholders, msg, "control_deleted")
@receiver(post_save, sender=Control)
def log_control_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="Control",
object_id=instance.pk,
changes={
f.name: {
"old": None,
"new": serialize_value(getattr(instance, f.name))
} for f in instance._meta.fields
},
)
else:
old = Control.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean_changes = {
field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])}
for field, vals in changes.items()
}
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Control",
object_id=instance.pk,
changes=clean_changes,
)
@receiver(post_delete, sender=Control)
def log_control_delete(sender, instance, **kwargs):
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user,
action="delete",
model="Control",
object_id=instance.pk,
changes=None,
)
@receiver(m2m_changed, sender=Control.risks.through)
def control_risks_changed(sender, instance: Control, action, reverse, pk_set, **kwargs):
if action in {"post_add", "post_remove", "post_clear"}:
affected = instance.risks.all() if not pk_set else Risk.objects.filter(pk__in=pk_set)
for risk in affected:
resid, created = ResidualRisk.objects.get_or_create(risk=risk)
if not resid.review_required:
resid.review_required = True
resid.save()
if risk.status != "review_required":
Risk.objects.filter(pk=risk.pk).update(status="review_required")
_notify(_risk_stakeholders(risk), _("Review required for risk '{t}' due to control change").format(t=risk.title), "review_required")
# ---------------------------------------------------------------------------
# Residual risks
# ---------------------------------------------------------------------------
@receiver(post_save, sender=ResidualRisk)
def residual_saved(sender, instance: ResidualRisk, created, **kwargs):
# AuditLog erstellst du bereits anderswo hier Fokus auf Status/Notify
risk = instance.risk
old = None
if not created:
try:
old = ResidualRisk.objects.get(pk=instance.pk)
except ResidualRisk.DoesNotExist:
pass
# Review-Logik: wenn review_required=True -> Risk.status = review_required
if instance.review_required and risk.status != "review_required":
Risk.objects.filter(pk=risk.pk).update(status="review_required")
_notify(_risk_stakeholders(risk), _("Review required for risk '{t}'").format(t=risk.title), "review_required")
elif old and old.review_required and not instance.review_required:
# Review abgeschlossen
if risk.status == "review_required":
Risk.objects.filter(pk=risk.pk).update(status="open")
_notify(_risk_stakeholders(risk), _("Review completed for risk '{t}'").format(t=risk.title), "review_completed")
# Standard-Events
event = "residual_created" if created else "residual_updated"
_notify(_risk_stakeholders(risk), _("Residual risk {state} for '{t}'").format(
state=_("created") if created else _("updated"), t=risk.title), event)
@receiver(post_delete, sender=ResidualRisk)
def residual_deleted(sender, instance: ResidualRisk, **kwargs):
_notify(_risk_stakeholders(instance.risk), _("Residual risk deleted for '{t}'").format(t=instance.risk.title), "residual_deleted")
@receiver(post_save, sender=ResidualRisk)
def log_residual_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="ResidualRisk",
object_id=instance.pk,
changes={
f.name: {
"old": None,
"new": serialize_value(getattr(instance, f.name))
} for f in instance._meta.fields
},
)
else:
old = ResidualRisk.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean_changes = {
field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])}
for field, vals in changes.items()
}
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="ResidualRisk",
object_id=instance.pk,
changes=clean_changes,
)
@receiver(post_delete, sender=ResidualRisk)
def log_residual_delete(sender, instance, **kwargs):
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user,
action="delete",
model="ResidualRisk",
object_id=instance.pk,
changes=None,
)
# ---------------------------------------------------------------------------
# Incidents
# ---------------------------------------------------------------------------
@receiver(post_save, sender=Incident)
def incident_saved(sender, instance: Incident, created, **kwargs):
event = "incident_created" if created else "incident_updated"
stakeholders = set([instance.reported_by]) | set(r.owner for r in instance.related_risks.all() if r.owner)
_notify(stakeholders, _("Incident '{t}' {s}").format(t=instance.title, s=_("created") if created else _("updated")), event)
@receiver(post_delete, sender=Incident)
def incident_deleted(sender, instance: Incident, **kwargs):
stakeholders = set([instance.reported_by]) | set(r.owner for r in instance.related_risks.all() if r.owner)
_notify(stakeholders, _("Incident '{t}' deleted").format(t=instance.title), "incident_deleted")
@receiver(post_save, sender=Incident)
def log_incident_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="Incident",
object_id=instance.pk,
changes={
f.name: {
"old": None,
"new": serialize_value(getattr(instance, f.name))
} for f in instance._meta.fields
},
)
else:
old = Incident.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean_changes = {
field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])}
for field, vals in changes.items()
}
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Incident",
object_id=instance.pk,
changes=clean_changes,
)
@receiver(m2m_changed, sender=Incident.related_risks.through)
def log_incident_risks_change(sender, instance, action, reverse, model, pk_set, **kwargs):
if action in ["post_add", "post_remove", "post_clear"]:
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user,
action="update",
model="Incident",
object_id=instance.pk,
changes={"related_risks": {"action": action, "ids": list(pk_set)}},
)
@receiver(post_delete, sender=Incident)
def log_incident_delete(sender, instance, **kwargs):
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user,
action="delete",
model="Incident",
object_id=instance.pk,
changes=None,
)