Add custom User model, OIDC backend, DB flexibility (SQLite/Postgres/MySQL), secured API endpoints, and initial Risk/Control models with enums, score calculation, and groups seeding.

This commit is contained in:
= 2025-09-07 20:52:19 +02:00
parent 37168500d1
commit 7f90f67f2a
47 changed files with 1288 additions and 6 deletions

70
TODO Normal file
View file

@ -0,0 +1,70 @@
✅Risiken
✅-Titel
✅-Asset
✅-Prozess
✅-Kategorie
✅-Eintrittswahrscheinlichkeit vor Maßnahmen
✅-Schadenshöhe vor Maßnahmen
✅-Score vor Maßnahmen (Berechnet sich aus Eintrittswahrscheinlichkeit und Schadenshöhe)
✅-Stufe vor Maßnahmen (Errechnet sich aus dem Score)
✅-Risikoeigner
✅-Maßnahmen (Ein Risiko kann mehrere Maßnahmen haben)
✅-Restrisiko
✅-Wiedervorlage
✅-Schutzziele
✅Eintrittswahrscheinlichkeiten
✅-1, sehr gering, Voraussichtliches Auftreten seltener als einmal in 5 Jahren.
✅-2, gering, Voraussichtliches Auftreten einmal in 1 bis 5 Jahren.
✅-3, wahrscheinlich, Voraussichtliches Auftreten einmal pro Jahr oder häufiger.
✅-4, sehr wahrscheinlich, Voraussichtliches Auftreten mehrmals pro Jahr oder monatlich.
✅Schadenshöhen
✅-1, niedrig, (z.B. Schaden < 1.000 €, geringer operativer Einfluss)
✅-2, mittel, (z.B. Schaden 1.000 € -5.000 €, lokaler Einfluss)
✅-3, hoch, (z.B. Schaden 5.000 € -15.000 €, Einfluss auf ein Team)
✅-4, erheblich, (z.B. Schaden 50.000 € -100.000 €, regionaler Einfluss)
✅-5, kritisch, (z.B. Schaden > 100.000 €, existenzbedrohend)
✅Maßnahmen
✅-Titel
✅-Status
✅-Frist
✅-Verantwortlicher
✅-Beschreibung
✅-Wiki-Link
✅Maßnahmenstatus
✅-Geplant, Die Maßnahme wurde identifiziert und im Risikoregister erfasst, die Umsetzung hat jedoch noch nicht begonnen. Dies ist der Ausgangsstatus für jede neue Maßnahme.
✅-In Bearbeitung, Die Umsetzung der Maßnahme hat begonnen.
✅-Abgeschlossen, Die Maßnahme wurde vollständig umgesetzt (Triggert Neubewertung durch Risikoeigner)
✅-Überprüft, Die Wirksamkeit der abgeschlossenen Maßnahme wurde verifiziert und bestätigt.
✅-Abgelehnt/Verworfen, Eine geplante Maßnahme wird nicht umgesetzt, weil sie entweder nicht mehr relevant ist, die Kosten zu hoch sind oder eine alternative, effektivere Maßnahme gefunden wurde. Dies muss gut dokumentiert und begründet werden.
✅Restrisiko
✅-Risiko identifikation
✅-Eintrittswahrscheinlichkeit nach Maßnahmen
✅-Schadenshöhe nach Maßnahmen
✅-Score nach Maßnahmen (Berechnet sich aus Eintrittswahrscheinlichkeit und Schadenshöhe)
✅-Stufe nach Maßnahmen (Errechnet sich aus dem Score)
✅Schutzziele (CIA)
✅-Verfügbarkeit
✅-Integrität
✅-Vertraulichkeit
✅Benutzer
✅-Benutzer ist Risikoverantwortlicher
✅-Benutzer ist Maßnahmenverantwortlicher
✅Audit
✅-Logging
✅-Audit-Trail
✅Vorfälle
Benachrichtigungen
Workflow
Riskomatrix

Binary file not shown.

Binary file not shown.

32
config/auth_backends.py Normal file
View file

@ -0,0 +1,32 @@
from mozilla_django_oidc.auth import OIDCAuthenticationBackend
from django.contrib.auth.models import Group
class CustomOIDCBackend(OIDCAuthenticationBackend):
"""
Custom authentication backend for OIDC.
- Ensures users are created/updated from OIDC claims
- Maps 'groups' claim from IdP into Django Groups
"""
def create_user(self, claims):
user = super().create_user(claims)
user.email = claims.get("email", "")
user.is_sso_user = True
user.save()
self._update_groups(user, claims)
return user
def update_user(self, user, claims):
user.email = claims.get("email", user.email)
self._update_groups(user, claims)
user.save()
return user
def _update_groups(self, user, claims):
"""
Synchronize groups from IdP claims to Django Groups.
"""
groups = claims.get("groups", [])
for g in groups:
group, _ = Group.objects.get_or_create(name=g)
user.groups.add(group)

View file

@ -34,9 +34,11 @@ INSTALLED_APPS = [
"django.contrib.sessions", "django.contrib.sessions",
"django.contrib.messages", "django.contrib.messages",
"django.contrib.staticfiles", "django.contrib.staticfiles",
"django_crontab",
# Third-party apps # Third-party apps
"rest_framework", "rest_framework",
"risks",
] ]
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -64,7 +66,7 @@ WSGI_APPLICATION = "config.wsgi.application"
TEMPLATES = [ TEMPLATES = [
{ {
"BACKEND": "django.template.backends.django.DjangoTemplates", "BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [], # can be extended if custom templates are needed "DIRS": [BASE_DIR / "templates"],
"APP_DIRS": True, "APP_DIRS": True,
"OPTIONS": { "OPTIONS": {
"context_processors": [ "context_processors": [
@ -120,6 +122,10 @@ else: # default: SQLite
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Authentication & password validation # Authentication & password validation
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
AUTH_USER_MODEL = "risks.User"
AUTHENTICATION_BACKENDS = [
"django.contrib.auth.backends.ModelBackend", # local auth
]
AUTH_PASSWORD_VALIDATORS = [ AUTH_PASSWORD_VALIDATORS = [
{"NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator"}, {"NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator"},
{"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator"}, {"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator"},
@ -130,7 +136,7 @@ AUTH_PASSWORD_VALIDATORS = [
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Internationalization # Internationalization
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
LANGUAGE_CODE = "en-us" LANGUAGE_CODE = "de"
TIME_ZONE = "UTC" TIME_ZONE = "UTC"
USE_I18N = True USE_I18N = True
USE_TZ = True USE_TZ = True
@ -139,6 +145,9 @@ USE_TZ = True
# Static files # Static files
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
STATIC_URL = "static/" STATIC_URL = "static/"
STATICFILES_DIRS = [
BASE_DIR / "static",
]
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Default primary key field type # Default primary key field type
@ -167,6 +176,10 @@ if SSO_ENABLED:
INSTALLED_APPS += ["mozilla_django_oidc"] INSTALLED_APPS += ["mozilla_django_oidc"]
MIDDLEWARE += ["mozilla_django_oidc.middleware.SessionRefresh"] MIDDLEWARE += ["mozilla_django_oidc.middleware.SessionRefresh"]
AUTHENTICATION_BACKENDS.append(
"config.auth_backends.CustomOIDCBackend",
)
LOGIN_URL = "/oidc/authenticate/" LOGIN_URL = "/oidc/authenticate/"
LOGIN_REDIRECT_URL = "/" LOGIN_REDIRECT_URL = "/"
LOGOUT_REDIRECT_URL = "/" LOGOUT_REDIRECT_URL = "/"
@ -181,3 +194,11 @@ if SSO_ENABLED:
OIDC_RP_SIGN_ALGO = "RS256" OIDC_RP_SIGN_ALGO = "RS256"
OIDC_STORE_ID_TOKEN = True OIDC_STORE_ID_TOKEN = True
OIDC_STORE_ACCESS_TOKEN = True OIDC_STORE_ACCESS_TOKEN = True
# ---------------------------------------------------------------------------
# Crojabs via Djnago-Crontabs
# ---------------------------------------------------------------------------
CRONJOBS = [
("0 8 * * *", "risks.utils.check_risk_followups"),
]

View file

@ -1,13 +1,28 @@
from api.views import ping, secure_ping
from django.conf import settings from django.conf import settings
from django.contrib import admin from django.contrib import admin
from django.urls import path, include from django.urls import path, include
from api.views import ping, secure_ping from django.shortcuts import render
from rest_framework import routers
from risks.views import RiskViewSet, ControlViewSet, ResidualRiskViewSet, UserViewSet, AuditViewSet
# DEBUG
def home_view(request):
return render(request, "base.html")
router = routers.DefaultRouter()
router.register(r"risks", RiskViewSet)
router.register(r"controls", ControlViewSet)
router.register(r"residual-risks", ResidualRiskViewSet)
router.register(r"users", UserViewSet)
router.register(r"logs", AuditViewSet)
urlpatterns = [ urlpatterns = [
path("admin/", admin.site.urls), path("admin/", admin.site.urls),
path("api/ping/", ping), # Public healthcheck endpoint path("api/ping/", ping), # Public healthcheck endpoint
path("api/secure-ping/", secure_ping) # Protected API endpoint path("api/secure-ping/", secure_ping), # Protected API endpoint
path("api/", include(router.urls)),
path("", home_view, name="home"), # DEBUG
] ]
# Add OIDC routes only if Single Sign-On is enabled # Add OIDC routes only if Single Sign-On is enabled

Binary file not shown.

View file

@ -3,9 +3,11 @@ certifi==2025.8.3
cffi==1.17.1 cffi==1.17.1
charset-normalizer==3.4.3 charset-normalizer==3.4.3
cryptography==45.0.7 cryptography==45.0.7
Django==4.2.24 Django==5.2.6
django-crontab==0.7.1
django-environ==0.12.0 django-environ==0.12.0
django-filter==25.1 django-filter==25.1
django-multiselectfield==1.0.1
djangorestframework==3.16.1 djangorestframework==3.16.1
idna==3.10 idna==3.10
josepy==2.1.0 josepy==2.1.0

0
risks/__init__.py Normal file
View file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

112
risks/admin.py Normal file
View file

@ -0,0 +1,112 @@
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from .models import User, Risk, ResidualRisk, Control, Incident
@admin.register(User)
class UserAdmin(BaseUserAdmin):
fieldsets = BaseUserAdmin.fieldsets + (
("SSO Information", {"fields": ("is_sso_user",)}),
)
list_display = ("username", "email", "is_staff", "is_superuser", "is_sso_user",
"owned_risks_count", "responsible_controls_count")
def owned_risks_count(self, obj):
return obj.risks_owned.count()
owned_risks_count.short_description = "Risks Owned"
def responsible_controls_count(self, obj):
return obj.controls_responsible.count()
responsible_controls_count.short_description = "Controls Responsible"
class ControlInline(admin.TabularInline):
model = Control
extra = 1
fields = ("title", "status", "due_date", "responsible", "wiki_link")
autocomplete_fields = ("responsible",)
class ResidualRiskInline(admin.StackedInline):
"""
Inline editor for ResidualRisk, linked one-to-one with Risk
"""
model = ResidualRisk
extra = 0
can_delete = False # Since each Risk can have at most one residual risk
readonly_fields = ("score", "level", "review_required")
fields = ("likelihood", "impact", "score", "level", "review_required")
@admin.register(Risk)
class RiskAdmin(admin.ModelAdmin):
list_display = (
"title",
"owner",
"score",
"level",
"likelihood",
"impact",
"follow_up",
)
list_filter = ("level", "likelihood", "impact", "owner")
search_fields = ("title", "asset", "process", "category")
inlines = [ControlInline, ResidualRiskInline]
def save_model(self, request, obj, form, change):
obj._changed_by = request.user
super().save_model(request, obj, form, change)
def delete_model(self, request, obj):
obj._changed_by = request.user
super().delete_model(request, obj)
@admin.register(ResidualRisk)
class ResidualRiskAdmin(admin.ModelAdmin):
list_display = (
"risk",
"score",
"level",
"likelihood",
"impact",
"review_required"
)
list_filter = ("level", "likelihood", "impact", "review_required")
def save_model(self, request, obj, form, change):
obj._changed_by = request.user
super().save_model(request, obj, form, change)
def delete_model(self, request, obj):
obj._changed_by = request.user
super().delete_model(request, obj)
@admin.register(Control)
class ControlAdmin(admin.ModelAdmin):
list_display = ("title", "status", "due_date", "responsible", "risk")
list_filter = ("status", "due_date")
search_fields = ("title", "description")
autocomplete_fields = ("responsible", "risk")
def save_model(self, request, obj, form, change):
obj._changed_by = request.user
super().save_model(request, obj, form, change)
def delete_model(self, request, obj):
obj._changed_by = request.user
super().delete_model(request, obj)
@admin.register(Incident)
class IncidentAdmin(admin.ModelAdmin):
list_display = ("title", "date_reported", "reported_by", "status")
list_filter = ("status", "date_reported", "reported_by")
filter_horizontal = ("related_risks",)
search_fields = ("title", "description")
autocomplete_fields = ("related_risks",)
def save_model(self, request, obj, form, change):
obj._changed_by = request.user
super().save_model(request, obj, form, change)
def delete_model(self, request, obj):
obj._changed_by = request.user
super().delete_model(request, obj)

10
risks/apps.py Normal file
View file

@ -0,0 +1,10 @@
from django.apps import AppConfig
class RisksConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'risks'
def ready(self):
# Import signals when app is ready
import risks.signals

View file

@ -0,0 +1,79 @@
# Generated by Django 5.2.6 on 2025-09-05 20:00
import django.contrib.auth.models
import django.contrib.auth.validators
import django.db.models.deletion
import django.utils.timezone
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
]
operations = [
migrations.CreateModel(
name='User',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('password', models.CharField(max_length=128, verbose_name='password')),
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
('is_sso_user', models.BooleanField(default=False)),
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
],
options={
'verbose_name': 'user',
'verbose_name_plural': 'users',
'abstract': False,
},
managers=[
('objects', django.contrib.auth.models.UserManager()),
],
),
migrations.CreateModel(
name='Risk',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('title', models.CharField(max_length=255)),
('asset', models.CharField(blank=True, max_length=255, null=True)),
('process', models.CharField(blank=True, max_length=255, null=True)),
('category', models.CharField(blank=True, max_length=255, null=True)),
('likelihood', models.IntegerField(choices=[(1, 'Very low occurs less than once every 5 years'), (2, 'Low once every 15 years'), (3, 'Likely once per year or more'), (4, 'Very likely multiple times per year/monthly')], default=1)),
('impact', models.IntegerField(choices=[(1, 'Low (< 1,000 € minor operational impact)'), (2, 'Medium (1,0005,000 € local impact)'), (3, 'High (5,00015,000 € team-level impact)'), (4, 'Severe (50,000100,000 € regional impact)'), (5, 'Critical (> 100,000 € existential threat)')], default=1)),
('score', models.IntegerField(editable=False)),
('level', models.CharField(editable=False, max_length=50)),
('follow_up', models.DateField(blank=True, null=True)),
('confidentiality', models.BooleanField(default=False)),
('integrity', models.BooleanField(default=False)),
('availability', models.BooleanField(default=False)),
('owner', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='owned_risks', to=settings.AUTH_USER_MODEL)),
],
),
migrations.CreateModel(
name='Control',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('title', models.CharField(max_length=255)),
('status', models.CharField(choices=[('planned', 'Planned'), ('in_progress', 'In progress'), ('completed', 'Completed'), ('verified', 'Verified'), ('rejected', 'Rejected')], default='planned', max_length=20)),
('due_date', models.DateField(blank=True, null=True)),
('description', models.TextField(blank=True, null=True)),
('wiki_link', models.URLField(blank=True, null=True)),
('responsible', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='responsible_controls', to=settings.AUTH_USER_MODEL)),
('risk', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='controls', to='risks.risk')),
],
),
]

View file

@ -0,0 +1,25 @@
# Generated by Django 5.2.6 on 2025-09-06 10:52
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('risks', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='ResidualRisk',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('likelihood', models.IntegerField(choices=[(1, 'Very low occurs less than once every 5 years'), (2, 'Low once every 15 years'), (3, 'Likely once per year or more'), (4, 'Very likely multiple times per year/monthly')], default=1)),
('impact', models.IntegerField(choices=[(1, 'Low (< 1,000 € minor operational impact)'), (2, 'Medium (1,0005,000 € local impact)'), (3, 'High (5,00015,000 € team-level impact)'), (4, 'Severe (50,000100,000 € regional impact)'), (5, 'Critical (> 100,000 € existential threat)')], default=1)),
('score', models.IntegerField(editable=False)),
('level', models.CharField(editable=False, max_length=50)),
('risk', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='residual_risk', to='risks.risk')),
],
),
]

View file

@ -0,0 +1,19 @@
# Generated by Django 5.2.6 on 2025-09-06 11:39
import multiselectfield.db.fields
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('risks', '0002_residualrisk'),
]
operations = [
migrations.AddField(
model_name='risk',
name='cia',
field=multiselectfield.db.fields.MultiSelectField(blank=True, choices=[(1, 'Confidentiality'), (2, 'Integrity'), (3, 'Availability')], max_length=100, null=True),
),
]

View file

@ -0,0 +1,44 @@
# Generated by Django 5.2.6 on 2025-09-07 09:52
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('risks', '0003_risk_cia'),
]
operations = [
migrations.RemoveField(
model_name='risk',
name='availability',
),
migrations.RemoveField(
model_name='risk',
name='confidentiality',
),
migrations.RemoveField(
model_name='risk',
name='integrity',
),
migrations.AddField(
model_name='residualrisk',
name='review_required',
field=models.BooleanField(default=False),
),
migrations.CreateModel(
name='AuditLog',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('action', models.CharField(choices=[('create', 'Created'), ('update', 'Updated'), ('delete', 'Deleted')], max_length=10)),
('model', models.CharField(max_length=100)),
('object_id', models.CharField(max_length=50)),
('changes', models.JSONField(blank=True, null=True)),
('timestamp', models.DateTimeField(auto_now_add=True)),
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='audit_logs', to=settings.AUTH_USER_MODEL)),
],
),
]

View file

@ -0,0 +1,27 @@
# Generated by Django 5.2.6 on 2025-09-07 10:37
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('risks', '0004_remove_risk_availability_remove_risk_confidentiality_and_more'),
]
operations = [
migrations.CreateModel(
name='Incidents',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('title', models.CharField(max_length=255)),
('description', models.TextField(blank=True, null=True)),
('date_reported', models.DateField(blank=True, null=True)),
('status', models.CharField(choices=[('open', 'Opened'), ('in_progress', 'In Progress'), ('close', 'Closed')], max_length=12)),
('related_risks', models.ManyToManyField(blank=True, related_name='incidents', to='risks.risk')),
('reported_by', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='incidents', to=settings.AUTH_USER_MODEL)),
],
),
]

View file

@ -0,0 +1,17 @@
# Generated by Django 5.2.6 on 2025-09-07 16:55
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('risks', '0005_incidents'),
]
operations = [
migrations.RenameModel(
old_name='Incidents',
new_name='Incident',
),
]

View file

@ -0,0 +1,26 @@
# Generated by Django 5.2.6 on 2025-09-07 18:31
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('risks', '0006_rename_incidents_incident'),
]
operations = [
migrations.CreateModel(
name='Notification',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('message', models.TextField()),
('created_at', models.DateTimeField(auto_now_add=True)),
('read', models.BooleanField(default=False)),
('send', models.BooleanField(default=False)),
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='notifications', to=settings.AUTH_USER_MODEL)),
],
),
]

View file

Binary file not shown.

234
risks/models.py Normal file
View file

@ -0,0 +1,234 @@
from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.db import models
from multiselectfield import MultiSelectField
class User(AbstractUser):
"""
Custom user model to support both local and SSO users.
"""
is_sso_user = models.BooleanField(default=False)
@property
def risks_owned(self):
""" All risks where the user is the risk owner. """
return self.owned_risks.all()
@property
def controls_responsible(self):
""" All controls where the user is responsible. """
return self.responsible_controls.all()
class Risk(models.Model):
"""
Represents an information security risk.
"""
LIKELIHOOD_CHOICES = [
(1, "Very low occurs less than once every 5 years"),
(2, "Low once every 15 years"),
(3, "Likely once per year or more"),
(4, "Very likely multiple times per year/monthly"),
]
IMPACT_CHOICES = [
(1, "Low (< 1,000 € minor operational impact)"),
(2, "Medium (1,0005,000 € local impact)"),
(3, "High (5,00015,000 € team-level impact)"),
(4, "Severe (50,000100,000 € regional impact)"),
(5, "Critical (> 100,000 € existential threat)"),
]
CIA_CHOICES = [
(1, "Confidentiality"),
(2, "Integrity"),
(3, "Availability")
]
# Basic information
title = models.CharField(max_length=255)
asset = models.CharField(max_length=255, blank=True, null=True)
process = models.CharField(max_length=255, blank=True, null=True)
category = models.CharField(max_length=255, blank=True, null=True)
# CIA Protection Goals
cia = MultiSelectField(choices=CIA_CHOICES, max_length=100, blank=True, null=True)
# Risk evaluation before controls
likelihood = models.IntegerField(
choices=LIKELIHOOD_CHOICES,
default=1
)
impact = models.IntegerField(
choices=IMPACT_CHOICES,
default=1
)
# Calculated fields
score = models.IntegerField(editable=False)
level = models.CharField(max_length=50, editable=False)
# Ownership
owner = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
null=True,
related_name="owned_risks"
)
# Reminder / follow-up date
follow_up = models.DateField(blank=True, null=True)
def save(self, *args, **kwargs):
# Calculate risk score
self.score = self.likelihood * self.impact
# Determine level based on score
if self.score <= 4:
self.level = "Low"
elif self.score <= 8:
self.level = "Medium"
elif self.score <= 12:
self.level = "High"
else:
self.level = "Critical"
super().save(*args, **kwargs)
def __str__(self):
return f"{self.title} (Score: {self.score}, Level: {self.level})"
class ResidualRisk(models.Model):
"""
Residual Risk after implementing controls
"""
risk = models.OneToOneField(
Risk,
on_delete=models.CASCADE,
related_name="residual_risk")
likelihood = models.IntegerField(
choices=Risk.LIKELIHOOD_CHOICES,
default=1
)
impact = models.IntegerField(
choices=Risk.IMPACT_CHOICES,
default=1
)
score = models.IntegerField(editable=False)
level = models.CharField(max_length=50, editable=False)
review_required = models.BooleanField(default=False)
def save(self, *args, **kwargs):
# Load previous state (if it exists)
if self.pk:
old = ResidualRisk.objects.get(pk=self.pk)
if old.likelihood != self.likelihood or old.impact != self.impact:
self.review_required = False
self.score = self.likelihood * self.impact
# Determine level based on score
if self.score <= 4:
self.level = "Low"
elif self.score <= 8:
self.level = "Medium"
elif self.score <= 12:
self.level = "High"
else:
self.level = "Critical"
super().save(*args, **kwargs)
def __str__(self):
return f"Residual Risk for {self.risk.title} (Score: {self.score}, Level: {self.level})"
class Control(models.Model):
"""
A security control/measure linked to a risk.
"""
STATUS_CHOICES = [
("planned", "Planned"),
("in_progress", "In progress"),
("completed", "Completed"),
("verified", "Verified"),
("rejected", "Rejected"),
]
title = models.CharField(max_length=255)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default="planned")
due_date = models.DateField(blank=True, null=True)
responsible = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
null=True,
related_name="responsible_controls"
)
description = models.TextField(blank=True, null=True)
wiki_link = models.URLField(blank=True, null=True)
# Relation to risk
risk = models.ForeignKey(Risk, on_delete=models.CASCADE, related_name="controls")
def __str__(self):
return f"{self.title} ({self.get_status_display()})"
class AuditLog(models.Model):
"""
Generic audit log entry for tracking changes.
"""
ACTION_CHOICES = [
("create", "Created"),
("update", "Updated"),
("delete", "Deleted"),
]
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
null=True, blank=True,
on_delete=models.SET_NULL,
related_name="audit_logs"
)
action = models.CharField(max_length=10, choices=ACTION_CHOICES)
model = models.CharField(max_length=100)
object_id = models.CharField(max_length=50)
changes = models.JSONField(null=True, blank=True)
timestamp = models.DateTimeField(auto_now_add=True)
def __str__(self):
return f"[{self.timestamp}] {self.user} {self.action} {self.model}({self.object_id})"
class Incident(models.Model):
"""
Incidents and related risks
"""
STATUS_CHOICES = [
("open", "Opened"),
("in_progress", "In Progress"),
("close", "Closed"),
]
title = models.CharField(max_length=255)
description = models.TextField(blank=True, null=True)
date_reported = models.DateField(blank=True, null=True)
reported_by = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, blank=True, on_delete=models.SET_NULL, related_name="incidents")
status = models.CharField(max_length=12, choices=STATUS_CHOICES)
related_risks = models.ManyToManyField("Risk", blank=True, related_name="incidents")
class Notification(models.Model):
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True, related_name="notifications")
message = models.TextField()
#related_objects =
created_at = models.DateTimeField(auto_now_add=True)
read = models.BooleanField(default=False) # Read in WebApp
sent = models.BooleanField(default=False) # Sent via Mail (optional)
def __str__(self):
user_display = self.user.username if self.user else "System"
return f"{user_display}: {self.message[:50]}..."

105
risks/serializers.py Normal file
View file

@ -0,0 +1,105 @@
from django.contrib.auth import get_user_model
from rest_framework import serializers
from .models import Risk, Control, ResidualRisk, AuditLog, Incident
class ResidualRiskSerializer(serializers.ModelSerializer):
class Meta:
model = ResidualRisk
fields = [
"id",
"risk",
"likelihood",
"impact",
"score",
"level",
"review_required",
]
read_only_fields = ["score", "level"]
class ControlSerializer(serializers.ModelSerializer):
class Meta:
model = Control
fields = [
"id",
"title",
"status",
"due_date",
"responsible",
"description",
"wiki_link",
"risk",
]
class RiskSerializer(serializers.ModelSerializer):
# Nested representation of related controls
controls = ControlSerializer(many=True, read_only=True)
class Meta:
model = Risk
fields = [
"id",
"title",
"asset",
"process",
"category",
"likelihood",
"impact",
"score",
"level",
"owner",
"follow_up",
"confidentiality",
"integrity",
"availability",
"controls",
]
class AuditSerializer(serializers.ModelSerializer):
class Meta:
model = AuditLog
fields = [
"id",
"user",
"action",
"model",
"object_id",
"changes",
"timestamp",
]
User = get_user_model()
class UserSerializer(serializers.ModelSerializer):
risks_owned = serializers.PrimaryKeyRelatedField(many=True, read_only=True)
controls_responsible = serializers.PrimaryKeyRelatedField(many=True, read_only=True)
class Meta:
model = User
fields = [
"id",
"username",
"email",
"is_sso_user",
"risks_owned",
"controls_responsible",
]
class RiskSummarySerializer(serializers.ModelSerializer):
class Meta:
model = Risk
fields = ["id", "title", "score", "level"]
class IncidentSerializer(serializers.ModelSerializer):
related_risks = RiskSummarySerializer(many=True, read_only=True)
class Meta:
model = Incident
fields = [
"id",
"title",
"description",
"date_reported",
"status",
"related_risks",
]

171
risks/signals.py Normal file
View file

@ -0,0 +1,171 @@
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.dispatch import receiver
from .models import Control, Risk, ResidualRisk, AuditLog, Incident
from .utils import model_diff
@receiver(post_save, sender=Control)
def update_residual_risk_on_control_change(sender, instance, **kwargs):
"""
Whenever a control is saved, check if the related risk has a residual risk.
If a control is completed or verified, flag the residual risk for review.
"""
risk = instance.risk
# Ensure residual risk exists
residual, created = ResidualRisk.objects.get_or_create(risk=risk)
# If a control is marked as completed or verified, we mark residual risk for review
if instance.status in ["completed", "verified"]:
residual.review_required = True
residual.save()
@receiver(post_save, sender=Risk)
def log_risk_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="Risk",
object_id=instance.pk,
changes={f.name: {"old": None, "new": getattr(instance, f.name)} for f in instance._meta.fields},
)
else:
old = Risk.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Risk",
object_id=instance.pk,
changes=changes,
)
@receiver(post_delete, sender=Risk)
def log_risk_delete(sender, instance, **kwargs):
"""
Signal that runs after a Risk is deleted.
"""
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="delete",
model="Risk",
object_id=instance.pk,
changes=None, # no fields to track on deletion
)
@receiver(post_save, sender=Control)
def log_control_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="Control",
object_id=instance.pk,
changes={f.name: {"old": None, "new": getattr(instance, f.name)} for f in instance._meta.fields},
)
else:
old = Control.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Control",
object_id=instance.pk,
changes=changes,
)
@receiver(post_delete, sender=Control)
def log_control_delete(sender, instance, **kwargs):
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="delete",
model="Control",
object_id=instance.pk,
changes=None,
)
@receiver(post_save, sender=ResidualRisk)
def log_residual_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="ResidualRisk",
object_id=instance.pk,
changes={f.name: {"old": None, "new": getattr(instance, f.name)} for f in instance._meta.fields},
)
else:
old = ResidualRisk.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="ResidualRisk",
object_id=instance.pk,
changes=changes,
)
@receiver(post_delete, sender=ResidualRisk)
def log_residual_delete(sender, instance, **kwargs):
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="delete",
model="ResidualRisk",
object_id=instance.pk,
changes=None,
)
@receiver(post_save, sender=Incident)
def log_incident_save(sender, instance, created, **kwargs):
if created:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="create",
model="Incident",
object_id=instance.pk,
changes={f.name: {"old": None, "new": getattr(instance, f.name)} for f in instance._meta.fields},
)
else:
old = Incident.objects.get(pk=instance.pk)
changes = model_diff(old, instance)
if changes:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Incident",
object_id=instance.pk,
changes=changes,
)
@receiver(m2m_changed, sender=Incident.related_risks.through)
def log_incident_risks_change(sender, instance, action, reverse, model, pk_set, **kwargs):
if action in ["post_add", "post_remove", "post_clear"]:
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="update",
model="Incident",
object_id=instance.pk,
changes={"related_risks": {"action": action, "ids": list(pk_set)}},
)
@receiver(post_delete, sender=Incident)
def log_incident_delete(sender, instance, **kwargs):
AuditLog.objects.create(
user=getattr(instance, "_changed_by", None),
action="delete",
model="Incident",
object_id=instance.pk,
changes=None,
)

3
risks/tests.py Normal file
View file

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

49
risks/utils.py Normal file
View file

@ -0,0 +1,49 @@
from django.utils.timezone import now
from .models import AuditLog, Risk, Notification
def model_diff(old, new, fields=None):
"""
Compare two model instances and return a dict of changed fields.
- old: previous model instance (from DB)
- new: updated model instance (unsaved)
- fields: optional list of fields to check
"""
changes = {}
opts = new._meta
if fields is None:
fields = [f.name for f in opts.fields]
for field_name in fields:
old_value = getattr(old, field_name, None)
new_value = getattr(new, field_name, None)
if old_value != new_value:
changes[field_name] = {"old": old_value, "new": new_value}
return changes
def check_risk_followups():
"""
Check if follow ups need attention and create notifications.
Ensures no duplicate notifications per risk per day
"""
today = now().date()
risks = Risk.objects.filter(follow_up__lte=today)
for risk in risks:
if risk.owner:
notification, created = Notification.objects.get_or_create(
user=risk.owner,
message=f"Follow-up required for risk: {risk.title}",
defaults={"read": False, "sent": False},
)
if created:
AuditLog.objects.create(
user=None, # system action
action="create",
model="Notification",
object_id=notification.pk,
changes={"message": notification.message, "user": risk.owner.username},
)

94
risks/views.py Normal file
View file

@ -0,0 +1,94 @@
from django.contrib.auth import get_user_model
from rest_framework import viewsets
from rest_framework.permissions import IsAuthenticated
from .models import Risk, Control, ResidualRisk, AuditLog, Incident
from .serializers import ControlSerializer, RiskSerializer, ResidualRiskSerializer, UserSerializer, AuditSerializer, IncidentSerializer
class RiskViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing Risks.
Provides CRUD operations.
"""
queryset = Risk.objects.all()
serializer_class = RiskSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save()
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save()
class ControlViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing Controls.
Provides CRUD operations.
"""
queryset = Control.objects.all()
serializer_class = ControlSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save()
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save()
class ResidualRiskViewSet(viewsets.ModelViewSet):
queryset = ResidualRisk.objects.all()
serializer_class = ResidualRiskSerializer
permission_classes = [IsAuthenticated]
User = get_user_model()
class UserViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint for listing users and their responsibilities.
"""
queryset = User.objects.all()
serializer_class = UserSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save()
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save()
class AuditViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint for view audit logging.
"""
queryset = AuditLog.objects.all()
serializer_class = AuditSerializer
permission_classes = [IsAuthenticated]
class IncidentViewSet(viewsets.ModelViewSet):
"""
API endpoint for listing incidents and its related risks.
"""
queryset = Incident.objects.all()
serializer_class = IncidentSerializer
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
instance = serializer.save(reported_by=self.request.user)
instance._changed_by = self.request.user
instance.save()
def perform_update(self, serializer):
instance = serializer.save()
instance._changed_by = self.request.user
instance.save()

3
static/css/bulma.min.css vendored Normal file

File diff suppressed because one or more lines are too long

37
static/css/design.css Normal file
View file

@ -0,0 +1,37 @@
html, body {
height: 100%;
}
.sidebar {
position: fixed;
top: 60px;
left: 0;
width: 220px;
height: calc(100% - 60px);
padding: 1rem;
border-right: 1px solid #ddd;
overflow-y: auto;
}
.content {
margin-left: 220px;
padding: 2rem;
padding-top: 80px;
}
.navbar.is-fixed-top {
z-index: 1000;
}
@media (max-width: 768px) {
.sidebar {
position: relative;
width: 100%;
height: auto;
border-right: none;
padding-top: 0;
}
.content {
margin-left: 0;
padding-top: 2rem;
}
}

0
static/css/print.css Normal file
View file

87
templates/base.html Normal file
View file

@ -0,0 +1,87 @@
{% load static %}
<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Risiko Management</title>
<link rel="stylesheet" href="{% static '/css/bulma.min.css' %}">
<link rel="stylesheet" href="{% static '/css/design.css' %}">
<link rel="stylesheet" href="{% static '/css/print.css' %}" media="print">
<meta name="description" content="Effizientes Risiko Management für Unternehmen: Analysieren, bewerten und steuern Sie Risiken sicher und zuverlässig.">
<link rel="icon" href="favicon.ico">
<link rel="icon" href="favicon.svg" type="image/svg+xml">
<link rel="apple-touch-icon" href="apple-touch-icon.png">
</head>
<body>
<!-- Obere Navbar -->
<nav class="navbar is-fixed-top has-shadow">
<div class="navbar-brand">
<a class="navbar-item" href="#">
<strong>Risiko Management</strong>
</a>
<a role="button" class="navbar-burger" aria-label="menu" aria-expanded="false" data-target="navbarMenu">
<span aria-hidden="true"></span>
<span aria-hidden="true"></span>
<span aria-hidden="true"></span>
</a>
</div>
<div id="navbarMenu" class="navbar-menu">
<div class="navbar-end">
<div class="navbar-item has-dropdown is-hoverable">
<a class="navbar-link">
Profil
</a>
<div class="navbar-dropdown is-right">
<a class="navbar-item" href="#">Einstellungen</a>
<a class="navbar-item" href="#">Benachrichtigungen</a>
<hr class="navbar-divider">
<a class="navbar-item" href="#">Abmelden</a>
</div>
</div>
</div>
</div>
</nav>
<!-- Sidebar -->
<aside class="menu sidebar">
<p class="menu-label">Allgemein</p>
<ul class="menu-list">
<li><a class="is-active" href="#">Dashboard</a></li>
<li><a href="#">Aufgaben</a></li>
<li><a href="#">Statistiken</a></li>
</ul>
<p class="menu-label">Verwaltung</p>
<ul class="menu-list">
<li><a href="#">Risiken</a></li>
<li><a href="#">Maßnahmen</a></li>
<li><a href="#">Adminbereich</a></li>
</ul>
</aside>
<!-- Hauptinhalt -->
<main class="content">
<h1 class="title">Willkommen zum Dashboard</h1>
<p>Hier ist der Hauptinhalt des Dashboards.</p>
</main>
<!-- Bulma Navbar Burger Script -->
<script>
document.addEventListener('DOMContentLoaded', () => {
const $navbarBurgers = Array.prototype.slice.call(document.querySelectorAll('.navbar-burger'), 0);
if ($navbarBurgers.length > 0) {
$navbarBurgers.forEach( el => {
el.addEventListener('click', () => {
const target = el.dataset.target;
const $target = document.getElementById(target);
el.classList.toggle('is-active');
$target.classList.toggle('is-active');
});
});
}
});
</script>
</body>
</html>