jobschleuder
Brief overview
jobschleuder is a miniApp to schedule jobs like backups. It uses a database bases registry to store the jobs and a scheduler to run them. The workers can run on multiple hosts. The scheduler and the workers communicate via a rabbit message queue.
Let it run
usage: jobschleuder.py [-h] [--loglevel LOGLEVEL] [--loghandler LOGHANDLER]
[--uuid UUID] [--config CONFIG] [--debug-veritas]
{registry,scheduler,worker,listener} ...
positional arguments:
{registry,scheduler,worker,listener}
registry task registry
scheduler task scheduler
worker task worker
listener lister to process webhooks
options:
-h, --help show this help message and exit
--loglevel LOGLEVEL used loglevel
--loghandler LOGHANDLER
used log handler
--uuid UUID database logger uuid
--config CONFIG updater config file
--debug-veritas enable veritas debug logging
basic structure
The basic structure is a database with three tables. The jobs table contains the jobs, the registry table contains the schedule and the schedule table contains the next run of the job.
Each job has a job, a description, a preprocessing, a postprocessing and a arguments field. The job is the name of the job to run, the description is a short description of the job, the preprocessing is the name of the preprocessing function to run before the job, the postprocessing is the name of the postprocessing function to run after the job and the arguments is a json field with the arguments for the job.
registry
usage: jobschleuder.py registry [-h] [--import IMPORT_FILENAME] [--show-jobs] [--show-registry]
options:
-h, --help show this help message and exit
--import IMPORT_FILENAME, -i IMPORT_FILENAME
import file to registry
--show-jobs, -sj show all jobs
--show-registry, -sr show registry
The registry is used to import jobs to the database. The import file is a yaml file with the following structure:
---
jobs:
- id: hello world
job: hello_world
schedule: "*/1 * * * *"
arguments:
hello: world
- id: backup devices
job: simple_config_backup
schedule: "0 1 * * *"
preprocessing: preprocessing_backup
arguments:
where: name=lab.local
# if sot is set to true the preprocessing get the sot object in kwargs
sot: True
exclude:
# devices is a list of hostnames to exclude
devices: ['labx.local']
# pattern is a list of pattern to exclude
pattern: ['.*xx']
- id: retry failed backups
job: simple_config_backup
schedule: "0 1 * * *"
preprocessing: get_failed_backups
arguments:
# if sot is set to true the preprocessing get the sot object in kwargs
sot: True
To import jobs from a yaml file to the database use the –import option. All jobs in the registry are removed before the new jobs are imported.
To see all jobs in the database use the –show-jobs option.
./jobschleuder.py registry --show-jobs
All Jobs
┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━┓
┃ id ┃ job ┃ descr ┃ pre ┃ post ┃ args ┃
┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━┩
│ 4 │ simple_config_backup │ simple_config_backup │ get_missing_devices │ │ {} │
└────┴──────────────────────┴──────────────────────┴─────────────────────┴──────┴──────┘
The registry table contains all jobs with the schedule. The schedule is a cron like schedule. The schedule is used to calculate the next run of the job.
To see the scheduled jobs use the –show-registry option.
./jobschleuder.py registry --show-registry
Job registry
┏━━━━┳━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ id ┃ job ┃ description ┃ schedule ┃
┡━━━━╇━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ 4 │ 4 │ simple_config_backup │ */1 * * * * │
└────┴─────┴──────────────────────┴─────────────┘
scheduler
usage: jobschleuder.py scheduler [-h] [--init] [--schedule] [--show-scheduled-jobs]
[--show-all-scheduled-jobs] [--run-now RUN_NOW] [--no-daemon]
options:
-h, --help show this help message and exit
--init, -i clean old values and initialize scheduler
--schedule, -s start scheduling jobs
--show-scheduled-jobs, -sj
show scheduled jobs
--show-all-scheduled-jobs, -aj
show all scheduled jobs
--run-now RUN_NOW, -rn RUN_NOW
run job now
--no-daemon, -nd exit after schedule jobs
To initialize the scheduler use the –init option. This command reads all jobs from the registry and calculates the next run of the job. The schedule is written to the schedule table. The old schedule is removed.
To start the scheduler use the –schedule option. The scheduler reads the schedule table and schedules the jobs. The scheduler runs in a loop and schedules the jobs. If the next run of the job is in the past the job is scheduled to run now.
To see the scheduled jobs use the –show-scheduled-jobs option. To see all scheduled jobs use the –show-all-scheduled-jobs option. If you use the –no-daemon option the scheduler runs once and exits.
To schedule a job to run now use the –run-now option. You have to use the job id to run the job. This id is the id of the job in the jobs table. You can see a list of all jobs using the registry command and the option –show-jobs.
worker
usage: jobschleuder.py worker [-h] [--id ID] [--worker WORKER]
options:
-h, --help show this help message and exit
--id ID worker id
--worker WORKER number of workers to start
To start a worker use the worker command of the jobschleuder. You can start multiple workers with the –worker option. Each worker has a worker id. The worker id is used to identify the worker in the log. Each worker uses the queue to receive jobs and ack the job after the job is done.
database structure
CREATE TABLE IF NOT EXISTS public.jobs
(
id serial,
job character varying(50) NOT NULL,
description character varying(100),
preprocessing character varying(50),
postprocessing character varying(50),
arguments json,
PRIMARY KEY (id)
)
CREATE TABLE public.registry
(
id serial,
job integer NOT NULL,
schedule character varying(100) NOT NULL,
PRIMARY KEY (id),
CONSTRAINT job_id FOREIGN KEY (job)
REFERENCES public.jobs (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE CASCADE
NOT VALID
)
CREATE TABLE public.schedule
(
id serial,
job integer NOT NULL,
next_run timestamp with time zone NOT NULL,
PRIMARY KEY (id),
CONSTRAINT job_id FOREIGN KEY (job)
REFERENCES public.registry (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE CASCADE
NOT VALID
)
implementing your own jobs
The preprocessing is a function that is called before the job is run. The preprocessing function is a function that can be found in the plugins directory. The preprocessing function gets the job arguments and the sot object. It returns a list of jobs to run. The jobs are added to the queue and run by the worker.
To implement your own preprocessing function create a python file in the plugins directory. To mark a function as preprocessing method use a decorator eg @jobschleuder(“nornir_config_backup”). The function has to return a list of jobs to run.
To initialize the plugin use a function with the name of the job and the suffix :on_startup. This function is called to initialize the plugin. The function has to return a dictionary with the values to pass to the plugin.
from veritas.plugin import jobschleuder
@jobschleuder("nornir_config_backup")
def nornir_config_backup(*args:list, **kwargs:dict) -> None:
... your code here ...
@jobschleuder("nornir_config_backup:on_startup")
def init() -> dict:
... this code is called to init the plugin ....
... all return values are passed to the plugin ...
return {'profile': profile, 'local_config_file': local_config_file, 'sot': sot}
The job you want to implement is a simple python file. The file must be placed into the plugin directory. The name of the job (precisely the decorater) must be the same as the job name in the database. At startup the list of all known plugins and loaded. To load a plugin it must be specified in your config file.
... other options ...
plugins:
# there is a dummy hello world plugin to illustrate how to use plugins
# you can implement your own and add the name to the list of plugins
hello_world:
plugin_dir: plugins
plugin: hello_world
simple_config_backup:
plugin_dir: plugins
plugin: simple_config_backup
nornir_config_backup:
plugin_dir: plugins
plugin: nornir_config_backup
start_miniapp:
plugin_dir: plugins
plugin: start_miniapp
summarize_backups:
plugin_dir: plugins
plugin: summarize_backups