ISO-27001-Risk-Management/risks/signals.py
Kevin Heyer 686030e4cb feat: Enhance risk management application with user auditing and improved incident handling
- Added AuditUserMiddleware to track the current user for auditing purposes.
- Introduced audit_context for managing the current user in thread-local storage.
- Updated Control and Incident models to include created_at and updated_at timestamps.
- Refactored Control and Incident serializers to handle related risks and timestamps.
- Modified views to set the _changed_by attribute for user actions.
- Enhanced incident listing and detail views to display related risks and user actions.
- Updated templates for better presentation of risks and incidents.
- Added migrations for new fields and relationships in the database.
- Improved filtering options in the incident list view.
2025-09-09 12:00:29 +02:00

236 lines
No EOL
8.3 KiB
Python

from datetime import date, datetime
from django.db.models import Model
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.dispatch import receiver
from .audit_context import get_current_user
from .models import Control, Risk, ResidualRisk, AuditLog, Incident
from .utils import model_diff
# ---------------------------------------------------------------------------
# General definitions
# ---------------------------------------------------------------------------
def serialize_value(value):
if isinstance(value, Model):
return value.pk # oder str(value), wenn du mehr Infos willst
if isinstance(value, (datetime, date)):
return value.isoformat()
return value
# ---------------------------------------------------------------------------
# Risks
# ---------------------------------------------------------------------------
@receiver(post_save, sender=Risk)
def log_risk_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="Risk",
object_id=instance.pk,
changes={
f.name: {
"old": None,
"new": serialize_value(getattr(instance, f.name))
} for f in instance._meta.fields
},
)
else:
old = Risk.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean_changes = {
field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])}
for field, vals in changes.items()
}
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Risk",
object_id=instance.pk,
changes=clean_changes,
)
@receiver(post_delete, sender=Risk)
def log_risk_delete(sender, instance, **kwargs):
"""
Signal that runs after a Risk is deleted.
"""
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user,
action="delete",
model="Risk",
object_id=instance.pk,
changes=None, # no fields to track on deletion
)
# ---------------------------------------------------------------------------
# Controls
# ---------------------------------------------------------------------------
@receiver(post_save, sender=Control)
def log_control_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="Control",
object_id=instance.pk,
changes={
f.name: {
"old": None,
"new": serialize_value(getattr(instance, f.name))
} for f in instance._meta.fields
},
)
else:
old = Control.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean_changes = {
field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])}
for field, vals in changes.items()
}
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Control",
object_id=instance.pk,
changes=clean_changes,
)
@receiver(post_delete, sender=Control)
def log_control_delete(sender, instance, **kwargs):
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user,
action="delete",
model="Control",
object_id=instance.pk,
changes=None,
)
@receiver(m2m_changed, sender=Control.risks.through)
def control_risks_changed(sender, instance, action, reverse, pk_set, **kwargs):
if action in {"post_add", "post_remove", "post_clear"}:
if action == "post_clear":
affected_risks = instance.risks.all()
elif pk_set:
if reverse:
from .models import Risk
affected_risks = Risk.objects.filter(pk__in=pk_set)
else:
affected_risks = Risk.objects.filter(pk__in=pk_set)
else:
affected_risks = instance.risks.all()
from .models import ResidualRisk
for risk in affected_risks:
residual, _ = ResidualRisk.objects.get_or_create(risk=risk)
residual.review_required = True
residual.save()
# ---------------------------------------------------------------------------
# Residual risks
# ---------------------------------------------------------------------------
@receiver(post_save, sender=ResidualRisk)
def log_residual_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="ResidualRisk",
object_id=instance.pk,
changes={
f.name: {
"old": None,
"new": serialize_value(getattr(instance, f.name))
} for f in instance._meta.fields
},
)
else:
old = ResidualRisk.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean_changes = {
field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])}
for field, vals in changes.items()
}
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="ResidualRisk",
object_id=instance.pk,
changes=clean_changes,
)
@receiver(post_delete, sender=ResidualRisk)
def log_residual_delete(sender, instance, **kwargs):
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user,
action="delete",
model="ResidualRisk",
object_id=instance.pk,
changes=None,
)
# ---------------------------------------------------------------------------
# Incidents
# ---------------------------------------------------------------------------
@receiver(post_save, sender=Incident)
def log_incident_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="Incident",
object_id=instance.pk,
changes={
f.name: {
"old": None,
"new": serialize_value(getattr(instance, f.name))
} for f in instance._meta.fields
},
)
else:
old = Incident.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
clean_changes = {
field: {"old": serialize_value(vals["old"]), "new": serialize_value(vals["new"])}
for field, vals in changes.items()
}
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Incident",
object_id=instance.pk,
changes=clean_changes,
)
@receiver(m2m_changed, sender=Incident.related_risks.through)
def log_incident_risks_change(sender, instance, action, reverse, model, pk_set, **kwargs):
if action in ["post_add", "post_remove", "post_clear"]:
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user,
action="update",
model="Incident",
object_id=instance.pk,
changes={"related_risks": {"action": action, "ids": list(pk_set)}},
)
@receiver(post_delete, sender=Incident)
def log_incident_delete(sender, instance, **kwargs):
user = getattr(instance, "_changed_by", None) or get_current_user()
AuditLog.objects.create(
user=user,
action="delete",
model="Incident",
object_id=instance.pk,
changes=None,
)