from datetime import date, datetime from django.contrib.auth import get_user_model from django.db.models import Model from django.db.models.signals import post_save, post_delete, m2m_changed from django.dispatch import receiver from django.utils.translation import gettext_lazy as _ from .audit_context import get_current_user from .models import ( Control, Risk, ResidualRisk, AuditLog, Incident, Notification, NotificationKind, NotificationPreference ) from .utils import model_diff, notify_event # --------------------------------------------------------------------------- # General definitions & helpers # --------------------------------------------------------------------------- User = get_user_model() def serialize_value(value): """Serialize values for audit log (pk/isoformat).""" if isinstance(value, Model): return value.pk if isinstance(value, (datetime, date)): return value.isoformat() return value def _pref(user: User) -> NotificationPreference | None: """Ensure NotificationPreference exists for user.""" if not user: return None pref = getattr(user, "notification_preference", None) if not pref: pref = NotificationPreference.objects.create(user=user) return pref def _notify(users, message: str, event_code: str): """Create notifications for all users that want this event.""" for u in set(filter(None, users)): pref = _pref(u) if pref and pref.should_notify(event_code): Notification.objects.create(user=u, message=message) def _risk_stakeholders(risk: Risk): """Return risk owner + all control responsibles.""" owners = [risk.owner] if risk.owner else [] responsibles = list( User.objects.filter(responsible_controls__risks=risk).distinct() ) return set(owners + responsibles) # --------------------------------------------------------------------------- # User # --------------------------------------------------------------------------- @receiver(post_save, sender=User) def user_saved(sender, instance: User, created, **kwargs): """Auto-create prefs + notify staff.""" _pref(instance) if created: staff = User.objects.filter( is_staff=True, notification_preference__user_created=True ) _notify(staff, _("User '{u}' created").format(u=instance.username), "user_created") @receiver(post_delete, sender=User) def user_deleted(sender, instance: User, **kwargs): staff = User.objects.filter( is_staff=True, notification_preference__user_deleted=True ) _notify(staff, _("User '{u}' deleted").format(u=instance.username), "user_deleted") # --------------------------------------------------------------------------- # Risks # --------------------------------------------------------------------------- @receiver(post_save, sender=Risk) def risk_saved(sender, instance: Risk, created, **kwargs): """Audit + notify on create/update.""" user = getattr(instance, "_changed_by", None) if created: # Initial audit log AuditLog.objects.create( user=user, action="create", model="Risk", object_id=instance.pk, changes={f.name: {"old": None, "new": serialize_value(getattr(instance, f.name))} for f in instance._meta.fields}, ) notify_event( NotificationKind.RISK_CREATED, message=_("Risk created: {t}").format(t=instance.title), users=[instance.owner] if instance.owner_id else None, obj=instance, ) else: # Diff audit log old = Risk.objects.get(pk=instance.pk) changes = model_diff(old, instance) if changes: clean = {f: {"old": serialize_value(v["old"]), "new": serialize_value(v["new"])} for f, v in changes.items()} AuditLog.objects.create( user=user, action="update", model="Risk", object_id=instance.pk, changes=clean, ) notify_event( NotificationKind.RISK_UPDATED, message=_("Risk updated: {t}").format(t=instance.title), users=[instance.owner] if instance.owner_id else None, obj=instance, ) @receiver(post_delete, sender=Risk) def risk_deleted(sender, instance: Risk, **kwargs): user = getattr(instance, "_changed_by", None) or get_current_user() AuditLog.objects.create( user=user, action="delete", model="Risk", object_id=instance.pk, changes=None ) notify_event( NotificationKind.RISK_DELETED, message=_("Risk deleted: {t}").format(t=instance.title), users=[instance.owner] if instance.owner_id else None, obj=instance, ) # --------------------------------------------------------------------------- # Controls # --------------------------------------------------------------------------- @receiver(post_save, sender=Control) def control_saved(sender, instance: Control, created, **kwargs): """Update residuals + audit + notify.""" # Force review on related residuals for risk in instance.risks.all(): resid, _ = ResidualRisk.objects.get_or_create(risk=risk) if not resid.review_required: resid.review_required = True resid.save() if risk.status != "review_required": Risk.objects.filter(pk=risk.pk).update(status="review_required") # Audit log user = getattr(instance, "_changed_by", None) if created: AuditLog.objects.create( user=user, action="create", model="Control", object_id=instance.pk, changes={f.name: {"old": None, "new": serialize_value(getattr(instance, f.name))} for f in instance._meta.fields}, ) kind = NotificationKind.CONTROL_CREATED else: old = Control.objects.get(pk=instance.pk) changes = model_diff(old, instance) if changes: clean = {f: {"old": serialize_value(v["old"]), "new": serialize_value(v["new"])} for f, v in changes.items()} AuditLog.objects.create( user=user, action="update", model="Control", object_id=instance.pk, changes=clean ) kind = NotificationKind.CONTROL_UPDATED # Notify notify_event( kind, message=_("Control {e}: {t}").format( e=_("created") if created else _("updated"), t=instance.title ), users=[instance.responsible] if instance.responsible_id else None, obj=instance, ) @receiver(post_delete, sender=Control) def control_deleted(sender, instance: Control, **kwargs): user = getattr(instance, "_changed_by", None) or get_current_user() AuditLog.objects.create( user=user, action="delete", model="Control", object_id=instance.pk, changes=None ) notify_event( NotificationKind.CONTROL_DELETED, message=_("Control deleted: {t}").format(t=instance.title), users=[instance.responsible] if instance.responsible_id else None, obj=instance, ) @receiver(m2m_changed, sender=Control.risks.through) def control_risks_changed(sender, instance: Control, action, **kwargs): if action in {"post_add", "post_remove", "post_clear"}: for risk in instance.risks.all(): resid, _ = ResidualRisk.objects.get_or_create(risk=risk) if not resid.review_required: resid.review_required = True resid.save() if risk.status != "review_required": Risk.objects.filter(pk=risk.pk).update(status="review_required") notify_event( NotificationKind.RESIDUAL_REVIEW_REQUIRED, message=_("Residual review required for risk '{t}' due to control change").format(t=risk.title), users=_risk_stakeholders(risk), obj=instance, ) # --------------------------------------------------------------------------- # Residual risks # --------------------------------------------------------------------------- @receiver(post_save, sender=ResidualRisk) def residual_saved(sender, instance: ResidualRisk, created, **kwargs): """Audit + notify on create/update.""" user = getattr(instance, "_changed_by", None) # Audit log if created: AuditLog.objects.create( user=user, action="create", model="ResidualRisk", object_id=instance.pk, changes={f.name: {"old": None, "new": serialize_value(getattr(instance, f.name))} for f in instance._meta.fields}, ) notify_event( NotificationKind.RESIDUAL_CREATED, message=_("Residual created for risk: {t}").format(t=instance.risk.title), users=[instance.risk.owner] if instance.risk.owner_id else None, obj=instance, ) else: old = ResidualRisk.objects.get(pk=instance.pk) changes = model_diff(old, instance) if changes: clean = {f: {"old": serialize_value(v["old"]), "new": serialize_value(v["new"])} for f, v in changes.items()} AuditLog.objects.create( user=user, action="update", model="ResidualRisk", object_id=instance.pk, changes=clean ) # Special handling: review_required if "review_required" in changes: if instance.review_required: kind = NotificationKind.RESIDUAL_REVIEW_REQUIRED msg = _("Residual review required for risk: {t}") else: kind = NotificationKind.RESIDUAL_REVIEW_COMPLETED msg = _("Residual review completed for risk: {t}") else: kind = NotificationKind.RESIDUAL_UPDATED msg = _("Residual updated for risk: {t}") notify_event( kind, message=msg.format(t=instance.risk.title), users=[instance.risk.owner] if instance.risk.owner_id else None, obj=instance, ) @receiver(post_delete, sender=ResidualRisk) def residual_deleted(sender, instance: ResidualRisk, **kwargs): user = getattr(instance, "_changed_by", None) or get_current_user() AuditLog.objects.create( user=user, action="delete", model="ResidualRisk", object_id=instance.pk, changes=None ) notify_event( NotificationKind.RESIDUAL_DELETED, message=_("Residual deleted for risk: {t}").format(t=instance.risk.title), users=[instance.risk.owner] if instance.risk.owner_id else None, obj=instance, ) # --------------------------------------------------------------------------- # Incidents # --------------------------------------------------------------------------- @receiver(post_save, sender=Incident) def incident_saved(sender, instance: Incident, created, **kwargs): """Audit + notify on create/update.""" user = getattr(instance, "_changed_by", None) if created: AuditLog.objects.create( user=user, action="create", model="Incident", object_id=instance.pk, changes={f.name: {"old": None, "new": serialize_value(getattr(instance, f.name))} for f in instance._meta.fields}, ) kind = NotificationKind.INCIDENT_CREATED else: old = Incident.objects.get(pk=instance.pk) changes = model_diff(old, instance) if changes: clean = {f: {"old": serialize_value(v["old"]), "new": serialize_value(v["new"])} for f, v in changes.items()} AuditLog.objects.create( user=user, action="update", model="Incident", object_id=instance.pk, changes=clean ) kind = NotificationKind.INCIDENT_UPDATED notify_event( kind, message=_("Incident {e}: {t}").format( e=_("created") if created else _("updated"), t=instance.title ), users=[instance.reported_by] if instance.reported_by_id else None, obj=instance, ) @receiver(post_delete, sender=Incident) def incident_deleted(sender, instance: Incident, **kwargs): user = getattr(instance, "_changed_by", None) or get_current_user() AuditLog.objects.create( user=user, action="delete", model="Incident", object_id=instance.pk, changes=None ) notify_event( NotificationKind.INCIDENT_DELETED, message=_("Incident deleted: {t}").format(t=instance.title), users=[instance.reported_by] if instance.reported_by_id else None, obj=instance, ) @receiver(m2m_changed, sender=Incident.related_risks.through) def incident_risks_changed(sender, instance, action, pk_set, **kwargs): if action in {"post_add", "post_remove", "post_clear"}: user = getattr(instance, "_changed_by", None) or get_current_user() AuditLog.objects.create( user=user, action="update", model="Incident", object_id=instance.pk, changes={"related_risks": {"action": action, "ids": list(pk_set)}}, )