
- Updated `item_incident.html` to implement ERP-style tabs for better navigation and added action icons for editing and deleting incidents. - Enhanced the overview tab with translated labels and improved layout for incident details. - Introduced linked risks and history tabs with appropriate translations and table structures. - Modified `item_risk.html` to include action icons for editing and deleting risks. - Refined `list_controls.html` to improve filter section layout and added translations for filter labels. - Updated `list_incidents.html` to enhance filter functionality and table layout, including translations for headers and buttons. - Improved `list_risks.html` by adding an action icon for adding new risks. - Adjusted `notifications.html` to enhance the display of new notifications with improved formatting and links.
346 lines
13 KiB
Python
346 lines
13 KiB
Python
from datetime import date, datetime
|
|
from django.contrib.auth import get_user_model
|
|
from django.db.models import Model
|
|
from django.db.models.signals import post_save, post_delete, m2m_changed
|
|
from django.dispatch import receiver
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from .audit_context import get_current_user
|
|
from .models import (
|
|
Control, Risk, ResidualRisk, AuditLog, Incident,
|
|
Notification, NotificationKind, NotificationPreference
|
|
)
|
|
from .utils import model_diff, notify_event
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# General definitions & helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
def serialize_value(value):
|
|
"""Serialize values for audit log (pk/isoformat)."""
|
|
if isinstance(value, Model):
|
|
return value.pk
|
|
if isinstance(value, (datetime, date)):
|
|
return value.isoformat()
|
|
return value
|
|
|
|
|
|
def _pref(user: User) -> NotificationPreference | None:
|
|
"""Ensure NotificationPreference exists for user."""
|
|
if not user:
|
|
return None
|
|
pref = getattr(user, "notification_preference", None)
|
|
if not pref:
|
|
pref = NotificationPreference.objects.create(user=user)
|
|
return pref
|
|
|
|
|
|
def _notify(users, message: str, event_code: str):
|
|
"""Create notifications for all users that want this event."""
|
|
for u in set(filter(None, users)):
|
|
pref = _pref(u)
|
|
if pref and pref.should_notify(event_code):
|
|
Notification.objects.create(user=u, message=message)
|
|
|
|
|
|
def _risk_stakeholders(risk: Risk):
|
|
"""Return risk owner + all control responsibles."""
|
|
owners = [risk.owner] if risk.owner else []
|
|
responsibles = list(
|
|
User.objects.filter(responsible_controls__risks=risk).distinct()
|
|
)
|
|
return set(owners + responsibles)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# User
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@receiver(post_save, sender=User)
|
|
def user_saved(sender, instance: User, created, **kwargs):
|
|
"""Auto-create prefs + notify staff."""
|
|
_pref(instance)
|
|
if created:
|
|
staff = User.objects.filter(
|
|
is_staff=True, notification_preference__user_created=True
|
|
)
|
|
_notify(staff, _("User '{u}' created").format(u=instance.username), "user_created")
|
|
|
|
|
|
@receiver(post_delete, sender=User)
|
|
def user_deleted(sender, instance: User, **kwargs):
|
|
staff = User.objects.filter(
|
|
is_staff=True, notification_preference__user_deleted=True
|
|
)
|
|
_notify(staff, _("User '{u}' deleted").format(u=instance.username), "user_deleted")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Risks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@receiver(post_save, sender=Risk)
|
|
def risk_saved(sender, instance: Risk, created, **kwargs):
|
|
"""Audit + notify on create/update."""
|
|
user = getattr(instance, "_changed_by", None)
|
|
if created:
|
|
# Initial audit log
|
|
AuditLog.objects.create(
|
|
user=user,
|
|
action="create",
|
|
model="Risk",
|
|
object_id=instance.pk,
|
|
changes={f.name: {"old": None, "new": serialize_value(getattr(instance, f.name))}
|
|
for f in instance._meta.fields},
|
|
)
|
|
notify_event(
|
|
NotificationKind.RISK_CREATED,
|
|
message=_("Risk created: {t}").format(t=instance.title),
|
|
users=[instance.owner] if instance.owner_id else None,
|
|
)
|
|
else:
|
|
# Diff audit log
|
|
old = Risk.objects.get(pk=instance.pk)
|
|
changes = model_diff(old, instance)
|
|
if changes:
|
|
clean = {f: {"old": serialize_value(v["old"]), "new": serialize_value(v["new"])}
|
|
for f, v in changes.items()}
|
|
AuditLog.objects.create(
|
|
user=user,
|
|
action="update",
|
|
model="Risk",
|
|
object_id=instance.pk,
|
|
changes=clean,
|
|
)
|
|
notify_event(
|
|
NotificationKind.RISK_UPDATED,
|
|
message=_("Risk updated: {t}").format(t=instance.title),
|
|
users=[instance.owner] if instance.owner_id else None,
|
|
)
|
|
|
|
|
|
@receiver(post_delete, sender=Risk)
|
|
def risk_deleted(sender, instance: Risk, **kwargs):
|
|
user = getattr(instance, "_changed_by", None) or get_current_user()
|
|
AuditLog.objects.create(
|
|
user=user, action="delete", model="Risk", object_id=instance.pk, changes=None
|
|
)
|
|
notify_event(
|
|
NotificationKind.RISK_DELETED,
|
|
message=_("Risk deleted: {t}").format(t=instance.title),
|
|
users=[instance.owner] if instance.owner_id else None,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Controls
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@receiver(post_save, sender=Control)
|
|
def control_saved(sender, instance: Control, created, **kwargs):
|
|
"""Update residuals + audit + notify."""
|
|
# Force review on related residuals
|
|
for risk in instance.risks.all():
|
|
resid, _ = ResidualRisk.objects.get_or_create(risk=risk)
|
|
if not resid.review_required:
|
|
resid.review_required = True
|
|
resid.save()
|
|
if risk.status != "review_required":
|
|
Risk.objects.filter(pk=risk.pk).update(status="review_required")
|
|
|
|
# Audit log
|
|
user = getattr(instance, "_changed_by", None)
|
|
if created:
|
|
AuditLog.objects.create(
|
|
user=user,
|
|
action="create",
|
|
model="Control",
|
|
object_id=instance.pk,
|
|
changes={f.name: {"old": None, "new": serialize_value(getattr(instance, f.name))}
|
|
for f in instance._meta.fields},
|
|
)
|
|
kind = NotificationKind.CONTROL_CREATED
|
|
else:
|
|
old = Control.objects.get(pk=instance.pk)
|
|
changes = model_diff(old, instance)
|
|
if changes:
|
|
clean = {f: {"old": serialize_value(v["old"]), "new": serialize_value(v["new"])}
|
|
for f, v in changes.items()}
|
|
AuditLog.objects.create(
|
|
user=user, action="update", model="Control", object_id=instance.pk, changes=clean
|
|
)
|
|
kind = NotificationKind.CONTROL_UPDATED
|
|
|
|
# Notify
|
|
notify_event(
|
|
kind,
|
|
message=_("Control {e}: {t}").format(
|
|
e=_("created") if created else _("updated"), t=instance.title
|
|
),
|
|
users=[instance.responsible] if instance.responsible_id else None,
|
|
)
|
|
|
|
|
|
@receiver(post_delete, sender=Control)
|
|
def control_deleted(sender, instance: Control, **kwargs):
|
|
user = getattr(instance, "_changed_by", None) or get_current_user()
|
|
AuditLog.objects.create(
|
|
user=user, action="delete", model="Control", object_id=instance.pk, changes=None
|
|
)
|
|
notify_event(
|
|
NotificationKind.CONTROL_DELETED,
|
|
message=_("Control deleted: {t}").format(t=instance.title),
|
|
users=[instance.responsible] if instance.responsible_id else None,
|
|
)
|
|
|
|
|
|
@receiver(m2m_changed, sender=Control.risks.through)
|
|
def control_risks_changed(sender, instance: Control, action, **kwargs):
|
|
if action in {"post_add", "post_remove", "post_clear"}:
|
|
for risk in instance.risks.all():
|
|
resid, _ = ResidualRisk.objects.get_or_create(risk=risk)
|
|
if not resid.review_required:
|
|
resid.review_required = True
|
|
resid.save()
|
|
if risk.status != "review_required":
|
|
Risk.objects.filter(pk=risk.pk).update(status="review_required")
|
|
_notify(
|
|
_risk_stakeholders(risk),
|
|
_("Review required for risk '{t}' due to control change").format(t=risk.title),
|
|
"review_required",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Residual risks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@receiver(post_save, sender=ResidualRisk)
|
|
def residual_saved(sender, instance: ResidualRisk, created, **kwargs):
|
|
"""Audit + notify on create/update."""
|
|
user = getattr(instance, "_changed_by", None)
|
|
|
|
# Audit log
|
|
if created:
|
|
AuditLog.objects.create(
|
|
user=user,
|
|
action="create",
|
|
model="ResidualRisk",
|
|
object_id=instance.pk,
|
|
changes={f.name: {"old": None, "new": serialize_value(getattr(instance, f.name))}
|
|
for f in instance._meta.fields},
|
|
)
|
|
notify_event(
|
|
NotificationKind.RESIDUAL_CREATED,
|
|
message=_("Residual created for risk: {t}").format(t=instance.risk.title),
|
|
users=[instance.risk.owner] if instance.risk.owner_id else None,
|
|
)
|
|
else:
|
|
old = ResidualRisk.objects.get(pk=instance.pk)
|
|
changes = model_diff(old, instance)
|
|
if changes:
|
|
clean = {f: {"old": serialize_value(v["old"]), "new": serialize_value(v["new"])}
|
|
for f, v in changes.items()}
|
|
AuditLog.objects.create(
|
|
user=user, action="update", model="ResidualRisk",
|
|
object_id=instance.pk, changes=clean
|
|
)
|
|
|
|
# Special handling: review_required
|
|
if "review_required" in changes:
|
|
if instance.review_required:
|
|
kind = NotificationKind.RESIDUAL_REVIEW_REQUIRED
|
|
msg = _("Residual review required for risk: {t}")
|
|
else:
|
|
kind = NotificationKind.RESIDUAL_REVIEW_COMPLETED
|
|
msg = _("Residual review completed for risk: {t}")
|
|
else:
|
|
kind = NotificationKind.RESIDUAL_UPDATED
|
|
msg = _("Residual updated for risk: {t}")
|
|
|
|
notify_event(
|
|
kind,
|
|
message=msg.format(t=instance.risk.title),
|
|
users=[instance.risk.owner] if instance.risk.owner_id else None,
|
|
)
|
|
|
|
|
|
@receiver(post_delete, sender=ResidualRisk)
|
|
def residual_deleted(sender, instance: ResidualRisk, **kwargs):
|
|
user = getattr(instance, "_changed_by", None) or get_current_user()
|
|
AuditLog.objects.create(
|
|
user=user, action="delete", model="ResidualRisk", object_id=instance.pk, changes=None
|
|
)
|
|
notify_event(
|
|
NotificationKind.RESIDUAL_DELETED,
|
|
message=_("Residual deleted for risk: {t}").format(t=instance.risk.title),
|
|
users=[instance.risk.owner] if instance.risk.owner_id else None,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Incidents
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@receiver(post_save, sender=Incident)
|
|
def incident_saved(sender, instance: Incident, created, **kwargs):
|
|
"""Audit + notify on create/update."""
|
|
user = getattr(instance, "_changed_by", None)
|
|
if created:
|
|
AuditLog.objects.create(
|
|
user=user,
|
|
action="create",
|
|
model="Incident",
|
|
object_id=instance.pk,
|
|
changes={f.name: {"old": None, "new": serialize_value(getattr(instance, f.name))}
|
|
for f in instance._meta.fields},
|
|
)
|
|
kind = NotificationKind.INCIDENT_CREATED
|
|
else:
|
|
old = Incident.objects.get(pk=instance.pk)
|
|
changes = model_diff(old, instance)
|
|
if changes:
|
|
clean = {f: {"old": serialize_value(v["old"]), "new": serialize_value(v["new"])}
|
|
for f, v in changes.items()}
|
|
AuditLog.objects.create(
|
|
user=user, action="update", model="Incident", object_id=instance.pk, changes=clean
|
|
)
|
|
kind = NotificationKind.INCIDENT_UPDATED
|
|
|
|
notify_event(
|
|
kind,
|
|
message=_("Incident {e}: {t}").format(
|
|
e=_("created") if created else _("updated"), t=instance.title
|
|
),
|
|
users=[instance.reported_by] if instance.reported_by_id else None,
|
|
)
|
|
|
|
|
|
@receiver(post_delete, sender=Incident)
|
|
def incident_deleted(sender, instance: Incident, **kwargs):
|
|
user = getattr(instance, "_changed_by", None) or get_current_user()
|
|
AuditLog.objects.create(
|
|
user=user, action="delete", model="Incident", object_id=instance.pk, changes=None
|
|
)
|
|
notify_event(
|
|
NotificationKind.INCIDENT_DELETED,
|
|
message=_("Incident deleted: {t}").format(t=instance.title),
|
|
users=[instance.reported_by] if instance.reported_by_id else None,
|
|
)
|
|
|
|
|
|
@receiver(m2m_changed, sender=Incident.related_risks.through)
|
|
def incident_risks_changed(sender, instance, action, pk_set, **kwargs):
|
|
if action in {"post_add", "post_remove", "post_clear"}:
|
|
user = getattr(instance, "_changed_by", None) or get_current_user()
|
|
AuditLog.objects.create(
|
|
user=user,
|
|
action="update",
|
|
model="Incident",
|
|
object_id=instance.pk,
|
|
changes={"related_risks": {"action": action, "ids": list(pk_set)}},
|
|
)
|