ISO-27001-Risk-Management/risks/utils.py
2025-09-16 14:15:04 +02:00

169 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import date, datetime
from typing import Any, Iterable, Optional
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.mail import send_mail
from django.urls import reverse
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from .models import (
AuditLog, Notification, NotificationRule,
NotificationKind, Risk, ResidualRisk,
)
User = get_user_model()
# ---------------------------------------------------------------------------
# notify_event()
# ---------------------------------------------------------------------------
def get_entity_url(obj):
if obj is None:
return None
model_name = obj.__class__.__name__.lower()
mapping = {
"risk": "risks:show_risk",
"control": "risks:show_control",
"residualrisk": "risks:show_risk",
"incident": "risks:show_incident",
"user": "admin:user_detail",
}
view_name = mapping.get(model_name)
if view_name:
return reverse(view_name, args=[obj.pk])
return None
def notify_event(kind: str, *, message: str, users: Optional[Iterable[Any]] = None, obj=None):
"""
Generates in-app notifications and/or emails depending on the NotificationRule.
- users: Basic recipients (owner/responsible/reporter) can be None.
- staff/extra recipients are added from the rule.
"""
rule = NotificationRule.objects.filter(kind=kind).first()
# Defaults (no rule → in-app only)
enabled_in_app = True
enabled_email = False
recipients_users = set()
extra_emails = []
# Base recipients
if users:
recipients_users.update(u for u in users if u and getattr(u, "is_active", False))
# Rule overrides
if rule:
enabled_in_app = rule.enabled_in_app
enabled_email = rule.enabled_email
if rule.to_staff:
recipients_users.update(User.objects.filter(is_staff=True, is_active=True))
extra_emails = _split_emails(rule.extra_recipients)
url = get_entity_url(obj)
# In-App Notifications
if enabled_in_app:
for u in recipients_users:
Notification.objects.create(user=u, message=message, target_url=url)
# Email Notifications
if enabled_email:
emails = [u.email for u in recipients_users if u and u.email] + extra_emails
emails = list(dict.fromkeys(emails))
if emails:
body = f"{message}\n\n{url}" if url else message
send_mail(
_("Notification"),
body,
getattr(settings, "DEFAULT_FROM_EMAIL", "webmaster@localhost"),
emails,
fail_silently=True,
)
# ---------------------------------------------------------------------------
# model_diff()
# ---------------------------------------------------------------------------
def model_diff(old, new, fields=None):
"""
Compare two model instances and return a dict of changed fields.
- old: previous model instance (from DB)
- new: updated model instance (unsaved)
- fields: optional list of fields to check
"""
changes = {}
opts = new._meta
if fields is None:
fields = [f.name for f in opts.fields]
for field_name in fields:
old_value = getattr(old, field_name, None)
new_value = getattr(new, field_name, None)
if old_value != new_value:
changes[field_name] = {"old": old_value, "new": new_value}
return changes
# ---------------------------------------------------------------------------
# check_risk_followups()
# ---------------------------------------------------------------------------
def check_risk_followups():
"""
Check if follow-ups need attention and create notifications.
Ensures no duplicate notifications per risk per day.
"""
today = now().date()
risks = Risk.objects.filter(follow_up__lte=today).select_related("owner")
for risk in risks:
# Status aktualisieren (außer wenn bereits closed/review_required)
if risk.status not in ("closed", "review_required"):
Risk.objects.filter(pk=risk.pk).update(status="review_required")
# ResidualRisk sicherstellen + Review-Flag setzen
resid, _ = ResidualRisk.objects.get_or_create(risk=risk)
if not resid.review_required:
resid.review_required = True
resid.save()
# Notification (einmalig pro Risk/Tag)
message = _("Follow-up reached: review required for risk '{t}'").format(t=risk.title)
notification, created = Notification.objects.get_or_create(
user=risk.owner,
message=message,
defaults={"read": False, "sent": False},
)
if created:
AuditLog.objects.create(
user=None,
action="create",
model="Notification",
object_id=notification.pk,
changes={
"message": notification.message,
"user": risk.owner.username if risk.owner else None,
},
)
notify_event(
NotificationKind.RISK_REVIEW_REQUIRED,
message=message,
users=[risk.owner] if risk.owner_id else None,
obj=risk,
)
# ---------------------------------------------------------------------------
# _split_emails()
# ---------------------------------------------------------------------------
def _split_emails(value: str) -> list[str]:
"""Normalize a comma/newline-separated list of emails into a clean list."""
if not value:
return []
raw = value.replace("\n", ",").split(",")
return [e.strip() for e in raw if "@" in e and e.strip()]