from datetime import date, datetime from django.db.models import Model from django.db.models.signals import post_save, post_delete, m2m_changed from django.dispatch import receiver from .audit_context import get_current_user from .models import Control, Risk, ResidualRisk, AuditLog, Incident from .utils import model_diff # --------------------------------------------------------------------------- # General definitions # --------------------------------------------------------------------------- def serialize_value(value): if isinstance(value, Model): return value.pk # oder str(value), wenn du mehr Infos willst if isinstance(value, (datetime, date)): return value.isoformat() return value # --------------------------------------------------------------------------- # Risks # --------------------------------------------------------------------------- @receiver(post_save, sender=Risk) def log_risk_save(sender, instance, created, **kwargs): if created: AuditLog.objects.create( user=getattr(instance, "_changed_by", None), action="create", model="Risk", object_id=instance.pk, changes={ f.name: { "old": None, "new": serialize_value(getattr(instance, f.name)) } for f in instance._meta.fields }, ) else: old = Risk.objects.get(pk=instance.pk) changes = model_diff(old, instance) if changes: clean_changes = { field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])} for field, vals in changes.items() } AuditLog.objects.create( user=getattr(instance, "_changed_by", None), action="update", model="Risk", object_id=instance.pk, changes=clean_changes, ) @receiver(post_delete, sender=Risk) def log_risk_delete(sender, instance, **kwargs): """ Signal that runs after a Risk is deleted. """ user = getattr(instance, "_changed_by", None) or get_current_user() AuditLog.objects.create( user=user, action="delete", model="Risk", object_id=instance.pk, changes=None, # no fields to track on deletion ) # --------------------------------------------------------------------------- # Controls # --------------------------------------------------------------------------- @receiver(post_save, sender=Control) def log_control_save(sender, instance, created, **kwargs): if created: AuditLog.objects.create( user=getattr(instance, "_changed_by", None), action="create", model="Control", object_id=instance.pk, changes={ f.name: { "old": None, "new": serialize_value(getattr(instance, f.name)) } for f in instance._meta.fields }, ) else: old = Control.objects.get(pk=instance.pk) changes = model_diff(old, instance) if changes: clean_changes = { field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])} for field, vals in changes.items() } AuditLog.objects.create( user=getattr(instance, "_changed_by", None), action="update", model="Control", object_id=instance.pk, changes=clean_changes, ) @receiver(post_delete, sender=Control) def log_control_delete(sender, instance, **kwargs): user = getattr(instance, "_changed_by", None) or get_current_user() AuditLog.objects.create( user=user, action="delete", model="Control", object_id=instance.pk, changes=None, ) @receiver(m2m_changed, sender=Control.risks.through) def control_risks_changed(sender, instance, action, reverse, pk_set, **kwargs): if action in {"post_add", "post_remove", "post_clear"}: if action == "post_clear": affected_risks = instance.risks.all() elif pk_set: if reverse: from .models import Risk affected_risks = Risk.objects.filter(pk__in=pk_set) else: affected_risks = Risk.objects.filter(pk__in=pk_set) else: affected_risks = instance.risks.all() from .models import ResidualRisk for risk in affected_risks: residual, _ = ResidualRisk.objects.get_or_create(risk=risk) residual.review_required = True residual.save() # --------------------------------------------------------------------------- # Residual risks # --------------------------------------------------------------------------- @receiver(post_save, sender=ResidualRisk) def log_residual_save(sender, instance, created, **kwargs): if created: AuditLog.objects.create( user=getattr(instance, "_changed_by", None), action="create", model="ResidualRisk", object_id=instance.pk, changes={ f.name: { "old": None, "new": serialize_value(getattr(instance, f.name)) } for f in instance._meta.fields }, ) else: old = ResidualRisk.objects.get(pk=instance.pk) changes = model_diff(old, instance) if changes: clean_changes = { field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])} for field, vals in changes.items() } AuditLog.objects.create( user=getattr(instance, "_changed_by", None), action="update", model="ResidualRisk", object_id=instance.pk, changes=clean_changes, ) @receiver(post_delete, sender=ResidualRisk) def log_residual_delete(sender, instance, **kwargs): user = getattr(instance, "_changed_by", None) or get_current_user() AuditLog.objects.create( user=user, action="delete", model="ResidualRisk", object_id=instance.pk, changes=None, ) # --------------------------------------------------------------------------- # Incidents # --------------------------------------------------------------------------- @receiver(post_save, sender=Incident) def log_incident_save(sender, instance, created, **kwargs): if created: AuditLog.objects.create( user=getattr(instance, "_changed_by", None), action="create", model="Incident", object_id=instance.pk, changes={ f.name: { "old": None, "new": serialize_value(getattr(instance, f.name)) } for f in instance._meta.fields }, ) else: old = Incident.objects.get(pk=instance.pk) changes = model_diff(old, instance) if changes: clean_changes = { field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])} for field, vals in changes.items() } AuditLog.objects.create( user=getattr(instance, "_changed_by", None), action="update", model="Incident", object_id=instance.pk, changes=clean_changes, ) @receiver(m2m_changed, sender=Incident.related_risks.through) def log_incident_risks_change(sender, instance, action, reverse, model, pk_set, **kwargs): if action in ["post_add", "post_remove", "post_clear"]: user = getattr(instance, "_changed_by", None) or get_current_user() AuditLog.objects.create( user=user, action="update", model="Incident", object_id=instance.pk, changes={"related_risks": {"action": action, "ids": list(pk_set)}}, ) @receiver(post_delete, sender=Incident) def log_incident_delete(sender, instance, **kwargs): user = getattr(instance, "_changed_by", None) or get_current_user() AuditLog.objects.create( user=user, action="delete", model="Incident", object_id=instance.pk, changes=None, )