jobschleuder

Brief overview

jobschleuder is a miniApp to schedule jobs like backups. It uses a database bases registry to store the jobs and a scheduler to run them. The workers can run on multiple hosts. The scheduler and the workers communicate via a rabbit message queue.

Let it run

usage: jobschleuder.py [-h] [--loglevel LOGLEVEL] [--loghandler LOGHANDLER]
                       [--uuid UUID] [--config CONFIG] [--debug-veritas]
                       {registry,scheduler,worker,listener} ...

positional arguments:
{registry,scheduler,worker,listener}
    registry            task registry
    scheduler           task scheduler
    worker              task worker
    listener            lister to process webhooks

options:
-h, --help            show this help message and exit
--loglevel LOGLEVEL   used loglevel
--loghandler LOGHANDLER
                        used log handler
--uuid UUID           database logger uuid
--config CONFIG       updater config file
--debug-veritas       enable veritas debug logging

basic structure

The basic structure is a database with three tables. The jobs table contains the jobs, the registry table contains the schedule and the schedule table contains the next run of the job.

Each job has a job, a description, a preprocessing, a postprocessing and a arguments field. The job is the name of the job to run, the description is a short description of the job, the preprocessing is the name of the preprocessing function to run before the job, the postprocessing is the name of the postprocessing function to run after the job and the arguments is a json field with the arguments for the job.

registry

usage: jobschleuder.py registry [-h] [--import IMPORT_FILENAME] [--show-jobs] [--show-registry]

options:
-h, --help            show this help message and exit
--import IMPORT_FILENAME, -i IMPORT_FILENAME
                        import file to registry
--show-jobs, -sj      show all jobs
--show-registry, -sr  show registry

The registry is used to import jobs to the database. The import file is a yaml file with the following structure:

---
jobs:
  - id: hello world
    job: hello_world
    schedule: "*/1 * * * *"
    arguments:
      hello: world

  - id: backup devices
    job: simple_config_backup
    schedule: "0 1 * * *"
    preprocessing: preprocessing_backup
    arguments:
      where: name=lab.local
      # if sot is set to true the preprocessing get the sot object in kwargs
      sot: True
    exclude:
        # devices is a list of hostnames to exclude
        devices: ['labx.local']
        # pattern is a list of pattern to exclude
        pattern: ['.*xx']

  - id: retry failed backups
    job: simple_config_backup
    schedule: "0 1 * * *"
    preprocessing: get_failed_backups
    arguments:
      # if sot is set to true the preprocessing get the sot object in kwargs
      sot: True

To import jobs from a yaml file to the database use the –import option. All jobs in the registry are removed before the new jobs are imported.

To see all jobs in the database use the –show-jobs option.

./jobschleuder.py registry --show-jobs
                                    All Jobs
┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━┓
┃ id  job                   descr                 pre                  post  args ┃
┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━┩
│ 4   simple_config_backup  simple_config_backup  get_missing_devices        {}   │
└────┴──────────────────────┴──────────────────────┴─────────────────────┴──────┴──────┘

The registry table contains all jobs with the schedule. The schedule is a cron like schedule. The schedule is used to calculate the next run of the job.

To see the scheduled jobs use the –show-registry option.

./jobschleuder.py registry --show-registry
                Job registry
┏━━━━┳━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ id  job  description           schedule    ┃
┡━━━━╇━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ 4   4    simple_config_backup  */1 * * * * │
└────┴─────┴──────────────────────┴─────────────┘

scheduler

usage: jobschleuder.py scheduler [-h] [--init] [--schedule] [--show-scheduled-jobs]
                                 [--show-all-scheduled-jobs] [--run-now RUN_NOW] [--no-daemon]

options:
-h, --help            show this help message and exit
--init, -i            clean old values and initialize scheduler
--schedule, -s        start scheduling jobs
--show-scheduled-jobs, -sj
                        show scheduled jobs
--show-all-scheduled-jobs, -aj
                        show all scheduled jobs
--run-now RUN_NOW, -rn RUN_NOW
                        run job now
--no-daemon, -nd      exit after schedule jobs

To initialize the scheduler use the –init option. This command reads all jobs from the registry and calculates the next run of the job. The schedule is written to the schedule table. The old schedule is removed.

To start the scheduler use the –schedule option. The scheduler reads the schedule table and schedules the jobs. The scheduler runs in a loop and schedules the jobs. If the next run of the job is in the past the job is scheduled to run now.

To see the scheduled jobs use the –show-scheduled-jobs option. To see all scheduled jobs use the –show-all-scheduled-jobs option. If you use the –no-daemon option the scheduler runs once and exits.

To schedule a job to run now use the –run-now option. You have to use the job id to run the job. This id is the id of the job in the jobs table. You can see a list of all jobs using the registry command and the option –show-jobs.

worker

usage: jobschleuder.py worker [-h] [--id ID] [--worker WORKER]

options:
-h, --help       show this help message and exit
--id ID          worker id
--worker WORKER  number of workers to start

To start a worker use the worker command of the jobschleuder. You can start multiple workers with the –worker option. Each worker has a worker id. The worker id is used to identify the worker in the log. Each worker uses the queue to receive jobs and ack the job after the job is done.

database structure

CREATE TABLE IF NOT EXISTS public.jobs
(
    id serial,
    job character varying(50) NOT NULL,
    description character varying(100),
    preprocessing character varying(50),
    postprocessing character varying(50),
    arguments json,
    PRIMARY KEY (id)
)
CREATE TABLE public.registry
(
    id serial,
    job integer NOT NULL,
    schedule character varying(100) NOT NULL,
    PRIMARY KEY (id),
    CONSTRAINT job_id FOREIGN KEY (job)
        REFERENCES public.jobs (id) MATCH SIMPLE
        ON UPDATE NO ACTION
        ON DELETE CASCADE
        NOT VALID
)
CREATE TABLE public.schedule
(
    id serial,
    job integer NOT NULL,
    next_run timestamp with time zone NOT NULL,
    PRIMARY KEY (id),
    CONSTRAINT job_id FOREIGN KEY (job)
        REFERENCES public.registry (id) MATCH SIMPLE
        ON UPDATE NO ACTION
        ON DELETE CASCADE
        NOT VALID
)

implementing your own jobs

The preprocessing is a function that is called before the job is run. The preprocessing function is a function that can be found in the plugins directory. The preprocessing function gets the job arguments and the sot object. It returns a list of jobs to run. The jobs are added to the queue and run by the worker.

To implement your own preprocessing function create a python file in the plugins directory. To mark a function as preprocessing method use a decorator eg @jobschleuder(“nornir_config_backup”). The function has to return a list of jobs to run.

To initialize the plugin use a function with the name of the job and the suffix :on_startup. This function is called to initialize the plugin. The function has to return a dictionary with the values to pass to the plugin.

from veritas.plugin import jobschleuder

@jobschleuder("nornir_config_backup")
def nornir_config_backup(*args:list, **kwargs:dict) -> None:
    ... your code here ...

@jobschleuder("nornir_config_backup:on_startup")
def init() -> dict:
    ... this code is called to init the plugin ....

    ... all return values are passed to the plugin ...

    return {'profile': profile, 'local_config_file': local_config_file, 'sot': sot}

The job you want to implement is a simple python file. The file must be placed into the plugin directory. The name of the job (precisely the decorater) must be the same as the job name in the database. At startup the list of all known plugins and loaded. To load a plugin it must be specified in your config file.

... other options ...

plugins:
  # there is a dummy hello world plugin to illustrate how to use plugins
  # you can implement your own and add the name to the list of plugins
  hello_world:
    plugin_dir: plugins
    plugin: hello_world
  simple_config_backup:
    plugin_dir: plugins
    plugin: simple_config_backup
  nornir_config_backup:
    plugin_dir: plugins
    plugin: nornir_config_backup
  start_miniapp:
    plugin_dir: plugins
    plugin: start_miniapp
  summarize_backups:
    plugin_dir: plugins
    plugin: summarize_backups