ISO-27001-Risk-Management/.venv/lib/python3.11/site-packages/django_crontab/crontab.py
2025-09-24 18:13:40 +02:00

175 lines
5.9 KiB
Python

from __future__ import print_function
import fcntl
import hashlib
import json
import logging
import os
import tempfile
import sys
from importlib import import_module
from django.conf import settings
from django_crontab.app_settings import Settings
string_type = basestring if sys.version_info[0] == 2 else str # flake8: noqa
logger = logging.getLogger(__name__)
class Crontab(object):
def __init__(self, **options):
self.verbosity = int(options.get('verbosity', 1))
self.readonly = options.get('readonly', False)
self.crontab_lines = []
self.settings = Settings(settings)
def __enter__(self):
"""
Automatically read crontab when used as with statement
"""
self.read()
return self
def __exit__(self, type, value, traceback):
"""
Automatically write back crontab when used as with statement if readonly is False
"""
if not self.readonly:
self.write()
def read(self):
"""
Reads the crontab into internal buffer
"""
self.crontab_lines = os.popen('%s -l' % self.settings.CRONTAB_EXECUTABLE).readlines()
def write(self):
"""
Writes internal buffer back to crontab
"""
fd, path = tempfile.mkstemp()
tmp = os.fdopen(fd, 'w')
for line in self.crontab_lines:
tmp.write(line)
tmp.close()
os.system('%s %s' % (self.settings.CRONTAB_EXECUTABLE, path))
os.unlink(path)
def add_jobs(self):
"""
Adds all jobs defined in CRONJOBS setting to internal buffer
"""
for job in self.settings.CRONJOBS:
# differ format and find job's suffix
if len(job) > 2 and isinstance(job[2], string_type):
# format 1 job
job_suffix = job[2]
elif len(job) > 4:
job_suffix = job[4]
else:
job_suffix = ''
self.crontab_lines.append(self.settings.CRONTAB_LINE_PATTERN % {
'time': job[0],
'comment': self.settings.CRONTAB_COMMENT,
'command': ' '.join(filter(None, [
self.settings.COMMAND_PREFIX,
self.settings.PYTHON_EXECUTABLE,
self.settings.DJANGO_MANAGE_PATH,
'crontab', 'run',
self.__hash_job(job),
'--settings=%s' % self.settings.DJANGO_SETTINGS_MODULE if self.settings.DJANGO_SETTINGS_MODULE else '',
job_suffix,
self.settings.COMMAND_SUFFIX
]))
})
if self.verbosity >= 1:
print(' adding cronjob: (%s) -> %s' % (self.__hash_job(job), job))
def show_jobs(self):
"""
Print the jobs from from crontab
"""
print(u'Currently active jobs in crontab:')
for line in self.crontab_lines[:]:
job = self.settings.CRONTAB_LINE_REGEXP.findall(line)
if job and job[0][4] == self.settings.CRONTAB_COMMENT:
if self.verbosity >= 1:
print(u'%s -> %s' % (
job[0][2].split()[4],
self.__get_job_by_hash(job[0][2][job[0][2].find('crontab run') + 12:].split()[0])
))
def remove_jobs(self):
"""
Removes all jobs defined in CRONJOBS setting from internal buffer
"""
for line in self.crontab_lines[:]:
job = self.settings.CRONTAB_LINE_REGEXP.findall(line)
if job and job[0][4] == self.settings.CRONTAB_COMMENT:
self.crontab_lines.remove(line)
if self.verbosity >= 1:
print('removing cronjob: (%s) -> %s' % (
job[0][2].split()[4],
self.__get_job_by_hash(job[0][2][job[0][2].find('crontab run') + 12:].split()[0])
))
# noinspection PyBroadException
def run_job(self, job_hash):
"""
Executes the corresponding function defined in CRONJOBS
"""
job = self.__get_job_by_hash(job_hash)
job_name = job[1]
job_args = job[2] if len(job) > 2 and not isinstance(job[2], string_type) else []
job_kwargs = job[3] if len(job) > 3 else {}
lock_file_name = None
if self.settings.LOCK_JOBS:
lock_file = open(os.path.join(tempfile.gettempdir(), 'django_crontab_%s.lock' % job_hash), 'w')
lock_file_name = lock_file.name
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
logger.warning('Tried to start cron job %s that is already running.', job)
return
module_path, function_name = job_name.rsplit('.', 1)
module = import_module(module_path)
func = getattr(module, function_name)
try:
func(*job_args, **job_kwargs)
except:
logger.exception('Failed to complete cronjob at %s', job)
if self.settings.LOCK_JOBS:
try:
fcntl.flock(lock_file, fcntl.LOCK_UN)
except IOError:
logger.exception('Error unlocking %s', lock_file_name)
return
def __hash_job(self, job):
"""
Builds an md5 hash representing the job
"""
j = json.JSONEncoder(sort_keys=True).encode(job)
h = hashlib.md5(j.encode('utf-8')).hexdigest()
return h
def __get_job_by_hash(self, job_hash):
"""
Finds the job by given hash
"""
for job in self.settings.CRONJOBS:
if self.__hash_job(job) == job_hash:
return job
raise RuntimeError(
'No job with hash %s found. It seems the crontab is out of sync with your settings.CRONJOBS. '
'Run "python manage.py crontab add" again to resolve this issue!' % job_hash
)