ISO-27001-Risk-Management/risks/signals.py

228 lines
7.8 KiB
Python
Raw Normal View History

from datetime import date, datetime
from django.db.models import Model
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.dispatch import receiver
from .models import Control, Risk, ResidualRisk, AuditLog, Incident
from .utils import model_diff
# ---------------------------------------------------------------------------
# General definitions
# ---------------------------------------------------------------------------
def serialize_value(value):
if isinstance(value, Model):
return value.pk # oder str(value), wenn du mehr Infos willst
if isinstance(value, (datetime, date)):
return value.isoformat()
return value
# ---------------------------------------------------------------------------
# Risks
# ---------------------------------------------------------------------------
@receiver(post_save, sender=Risk)
def log_risk_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="Risk",
object_id=instance.pk,
changes={
f.name: {
"old": None,
"new": serialize_value(getattr(instance, f.name))
} for f in instance._meta.fields
},
)
else:
old = Risk.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean_changes = {
field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])}
for field, vals in changes.items()
}
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Risk",
object_id=instance.pk,
changes=changes,
)
@receiver(post_delete, sender=Risk)
def log_risk_delete(sender, instance, **kwargs):
"""
Signal that runs after a Risk is deleted.
"""
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="delete",
model="Risk",
object_id=instance.pk,
changes=None, # no fields to track on deletion
)
# ---------------------------------------------------------------------------
# Controls
# ---------------------------------------------------------------------------
@receiver(post_save, sender=Control)
def log_control_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="Control",
object_id=instance.pk,
changes={
f.name: {
"old": None,
"new": serialize_value(getattr(instance, f.name))
} for f in instance._meta.fields
},
)
else:
old = Control.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean_changes = {
field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])}
for field, vals in changes.items()
}
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Control",
object_id=instance.pk,
changes=changes,
)
@receiver(post_delete, sender=Control)
def log_control_delete(sender, instance, **kwargs):
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="delete",
model="Control",
object_id=instance.pk,
changes=None,
)
@receiver(post_save, sender=Control)
def update_residual_risk_on_control_change(sender, instance, **kwargs):
"""
Whenever a control is saved, check if the related risk has a residual risk.
If a control is completed or verified, flag the residual risk for review.
"""
risk = instance.risk
# Ensure residual risk exists
residual, created = ResidualRisk.objects.get_or_create(risk=risk)
# If a control is marked as completed or verified, we mark residual risk for review
if instance.status in ["completed", "verified"]:
residual.review_required = True
residual.save()
# ---------------------------------------------------------------------------
# Residual risks
# ---------------------------------------------------------------------------
@receiver(post_save, sender=ResidualRisk)
def log_residual_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="ResidualRisk",
object_id=instance.pk,
changes={
f.name: {
"old": None,
"new": serialize_value(getattr(instance, f.name))
} for f in instance._meta.fields
},
)
else:
old = ResidualRisk.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean_changes = {
field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])}
for field, vals in changes.items()
}
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="ResidualRisk",
object_id=instance.pk,
changes=clean_changes,
)
@receiver(post_delete, sender=ResidualRisk)
def log_residual_delete(sender, instance, **kwargs):
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="delete",
model="ResidualRisk",
object_id=instance.pk,
changes=None,
)
# ---------------------------------------------------------------------------
# Incidents
# ---------------------------------------------------------------------------
@receiver(post_save, sender=Incident)
def log_incident_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="Incident",
object_id=instance.pk,
changes={f.name: {"old": None, "new": getattr(instance, f.name)} for f in instance._meta.fields},
)
else:
old = Incident.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean_changes = {
field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])}
for field, vals in changes.items()
}
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Incident",
object_id=instance.pk,
changes=changes,
)
@receiver(m2m_changed, sender=Incident.related_risks.through)
def log_incident_risks_change(sender, instance, action, reverse, model, pk_set, **kwargs):
if action in ["post_add", "post_remove", "post_clear"]:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Incident",
object_id=instance.pk,
changes={"related_risks": {"action": action, "ids": list(pk_set)}},
)
@receiver(post_delete, sender=Incident)
def log_incident_delete(sender, instance, **kwargs):
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="delete",
model="Incident",
object_id=instance.pk,
changes=None,
)