ISO-27001-Risk-Management/risks/signals.py

347 lines
13 KiB
Python
Raw Normal View History

from datetime import date, datetime
from django.contrib.auth import get_user_model
from django.db.models import Model
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.dispatch import receiver
from django.utils.translation import gettext_lazy as _
from .audit_context import get_current_user
from .models import (
Control, Risk, ResidualRisk, AuditLog, Incident,
Notification, NotificationKind, NotificationPreference
)
from .utils import model_diff, notify_event
# ---------------------------------------------------------------------------
# General definitions & helpers
# ---------------------------------------------------------------------------
User = get_user_model()
def serialize_value(value):
"""Serialize values for audit log (pk/isoformat)."""
if isinstance(value, Model):
return value.pk
if isinstance(value, (datetime, date)):
return value.isoformat()
return value
def _pref(user: User) -> NotificationPreference | None:
"""Ensure NotificationPreference exists for user."""
if not user:
return None
pref = getattr(user, "notification_preference", None)
if not pref:
pref = NotificationPreference.objects.create(user=user)
return pref
def _notify(users, message: str, event_code: str):
"""Create notifications for all users that want this event."""
for u in set(filter(None, users)):
pref = _pref(u)
if pref and pref.should_notify(event_code):
Notification.objects.create(user=u, message=message)
def _risk_stakeholders(risk: Risk):
"""Return risk owner + all control responsibles."""
owners = [risk.owner] if risk.owner else []
responsibles = list(
User.objects.filter(responsible_controls__risks=risk).distinct()
)
return set(owners + responsibles)
# ---------------------------------------------------------------------------
# User
# ---------------------------------------------------------------------------
@receiver(post_save, sender=User)
def user_saved(sender, instance: User, created, **kwargs):
"""Auto-create prefs + notify staff."""
_pref(instance)
if created:
staff = User.objects.filter(
is_staff=True, notification_preference__user_created=True
)
_notify(staff, _("User '{u}' created").format(u=instance.username), "user_created")
@receiver(post_delete, sender=User)
def user_deleted(sender, instance: User, **kwargs):
staff = User.objects.filter(
is_staff=True, notification_preference__user_deleted=True
)
_notify(staff, _("User '{u}' deleted").format(u=instance.username), "user_deleted")
# ---------------------------------------------------------------------------
# Risks
# ---------------------------------------------------------------------------
@receiver(post_save, sender=Risk)
def risk_saved(sender, instance: Risk, created, **kwargs):
"""Audit + notify on create/update."""
user = getattr(instance, "_changed_by", None)
if created:
# Initial audit log
AuditLog.objects.create(
user=user,
action="create",
model="Risk",
object_id=instance.pk,
changes={f.name: {"old": None, "new": serialize_value(getattr(instance, f.name))}
for f in instance._meta.fields},
)
notify_event(
NotificationKind.RISK_CREATED,
message=_("Risk created: {t}").format(t=instance.title),
users=[instance.owner] if instance.owner_id else None,
)
else:
# Diff audit log
old = Risk.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean = {f: {"old": serialize_value(v["old"]), "new": serialize_value(v["new"])}
for f, v in changes.items()}
AuditLog.objects.create(
user=user,
action="update",
model="Risk",
object_id=instance.pk,
changes=clean,
)
notify_event(
NotificationKind.RISK_UPDATED,
message=_("Risk updated: {t}").format(t=instance.title),
users=[instance.owner] if instance.owner_id else None,
)
@receiver(post_delete, sender=Risk)
def risk_deleted(sender, instance: Risk, **kwargs):
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user, action="delete", model="Risk", object_id=instance.pk, changes=None
)
notify_event(
NotificationKind.RISK_DELETED,
message=_("Risk deleted: {t}").format(t=instance.title),
users=[instance.owner] if instance.owner_id else None,
)
# ---------------------------------------------------------------------------
# Controls
# ---------------------------------------------------------------------------
@receiver(post_save, sender=Control)
def control_saved(sender, instance: Control, created, **kwargs):
"""Update residuals + audit + notify."""
# Force review on related residuals
for risk in instance.risks.all():
resid, _ = ResidualRisk.objects.get_or_create(risk=risk)
if not resid.review_required:
resid.review_required = True
resid.save()
if risk.status != "review_required":
Risk.objects.filter(pk=risk.pk).update(status="review_required")
# Audit log
user = getattr(instance, "_changed_by", None)
if created:
AuditLog.objects.create(
user=user,
action="create",
model="Control",
object_id=instance.pk,
changes={f.name: {"old": None, "new": serialize_value(getattr(instance, f.name))}
for f in instance._meta.fields},
)
kind = NotificationKind.CONTROL_CREATED
else:
old = Control.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean = {f: {"old": serialize_value(v["old"]), "new": serialize_value(v["new"])}
for f, v in changes.items()}
AuditLog.objects.create(
user=user, action="update", model="Control", object_id=instance.pk, changes=clean
)
kind = NotificationKind.CONTROL_UPDATED
# Notify
notify_event(
kind,
message=_("Control {e}: {t}").format(
e=_("created") if created else _("updated"), t=instance.title
),
users=[instance.responsible] if instance.responsible_id else None,
)
@receiver(post_delete, sender=Control)
def control_deleted(sender, instance: Control, **kwargs):
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user, action="delete", model="Control", object_id=instance.pk, changes=None
)
notify_event(
NotificationKind.CONTROL_DELETED,
message=_("Control deleted: {t}").format(t=instance.title),
users=[instance.responsible] if instance.responsible_id else None,
)
@receiver(m2m_changed, sender=Control.risks.through)
def control_risks_changed(sender, instance: Control, action, **kwargs):
if action in {"post_add", "post_remove", "post_clear"}:
for risk in instance.risks.all():
resid, _ = ResidualRisk.objects.get_or_create(risk=risk)
if not resid.review_required:
resid.review_required = True
resid.save()
if risk.status != "review_required":
Risk.objects.filter(pk=risk.pk).update(status="review_required")
notify_event(
NotificationKind.RESIDUAL_REVIEW_REQUIRED,
message=_("Residual review required for risk '{t}' due to control change").format(t=risk.title),
users=_risk_stakeholders(risk)
)
# ---------------------------------------------------------------------------
# Residual risks
# ---------------------------------------------------------------------------
@receiver(post_save, sender=ResidualRisk)
def residual_saved(sender, instance: ResidualRisk, created, **kwargs):
"""Audit + notify on create/update."""
user = getattr(instance, "_changed_by", None)
# Audit log
if created:
AuditLog.objects.create(
user=user,
action="create",
model="ResidualRisk",
object_id=instance.pk,
changes={f.name: {"old": None, "new": serialize_value(getattr(instance, f.name))}
for f in instance._meta.fields},
)
notify_event(
NotificationKind.RESIDUAL_CREATED,
message=_("Residual created for risk: {t}").format(t=instance.risk.title),
users=[instance.risk.owner] if instance.risk.owner_id else None,
)
else:
old = ResidualRisk.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean = {f: {"old": serialize_value(v["old"]), "new": serialize_value(v["new"])}
for f, v in changes.items()}
AuditLog.objects.create(
user=user, action="update", model="ResidualRisk",
object_id=instance.pk, changes=clean
)
# Special handling: review_required
if "review_required" in changes:
if instance.review_required:
kind = NotificationKind.RESIDUAL_REVIEW_REQUIRED
msg = _("Residual review required for risk: {t}")
else:
kind = NotificationKind.RESIDUAL_REVIEW_COMPLETED
msg = _("Residual review completed for risk: {t}")
else:
kind = NotificationKind.RESIDUAL_UPDATED
msg = _("Residual updated for risk: {t}")
notify_event(
kind,
message=msg.format(t=instance.risk.title),
users=[instance.risk.owner] if instance.risk.owner_id else None,
)
@receiver(post_delete, sender=ResidualRisk)
def residual_deleted(sender, instance: ResidualRisk, **kwargs):
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user, action="delete", model="ResidualRisk", object_id=instance.pk, changes=None
)
notify_event(
NotificationKind.RESIDUAL_DELETED,
message=_("Residual deleted for risk: {t}").format(t=instance.risk.title),
users=[instance.risk.owner] if instance.risk.owner_id else None,
)
# ---------------------------------------------------------------------------
# Incidents
# ---------------------------------------------------------------------------
@receiver(post_save, sender=Incident)
def incident_saved(sender, instance: Incident, created, **kwargs):
"""Audit + notify on create/update."""
user = getattr(instance, "_changed_by", None)
if created:
AuditLog.objects.create(
user=user,
action="create",
model="Incident",
object_id=instance.pk,
changes={f.name: {"old": None, "new": serialize_value(getattr(instance, f.name))}
for f in instance._meta.fields},
)
kind = NotificationKind.INCIDENT_CREATED
else:
old = Incident.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean = {f: {"old": serialize_value(v["old"]), "new": serialize_value(v["new"])}
for f, v in changes.items()}
AuditLog.objects.create(
user=user, action="update", model="Incident", object_id=instance.pk, changes=clean
)
kind = NotificationKind.INCIDENT_UPDATED
notify_event(
kind,
message=_("Incident {e}: {t}").format(
e=_("created") if created else _("updated"), t=instance.title
),
users=[instance.reported_by] if instance.reported_by_id else None,
)
@receiver(post_delete, sender=Incident)
def incident_deleted(sender, instance: Incident, **kwargs):
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user, action="delete", model="Incident", object_id=instance.pk, changes=None
)
notify_event(
NotificationKind.INCIDENT_DELETED,
message=_("Incident deleted: {t}").format(t=instance.title),
users=[instance.reported_by] if instance.reported_by_id else None,
)
@receiver(m2m_changed, sender=Incident.related_risks.through)
def incident_risks_changed(sender, instance, action, pk_set, **kwargs):
if action in {"post_add", "post_remove", "post_clear"}:
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user,
action="update",
model="Incident",
object_id=instance.pk,
changes={"related_risks": {"action": action, "ids": list(pk_set)}},
)