PolarSPARC

Using Python APScheduler


Bhaskar S 09/04/2022


Introduction

Often times, there is a need for an application to fetch some data from a source (be internal or external) on a regular interval (say daily at around 7.00 AM EST). This is where the Python APScheduler comes in handy.

APScheduler (or Advanced Python Scheduler for long) is an open source Python library that allows one to schedule jobs (or tasks), which can be executed on a regular periodic basis (such as hourly, daily, weekly, etc).

By default, a scheduled job's data in APScheduler is serialized and stored in-memory. This implies that the scheduled job does NOT survive a system crash or a system restart. To ensure a scheduled job survives a system crash or a system restart, APScheduler can be configured to use a database so that the data related to the scheduled jobs can be persisted to disk (in a database).

The APScheduler library comprises of the following core components:

Installation and Setup

Installation and setup will be on a Linux desktop running Ubuntu 22.04 LTS. Note that the stable Python version on Ubuntu is 3.10.

For our demonstration, we will create a directory called APScheduler under the users home directory by executing the following command in a terminal window:

$ mkdir -p $HOME/APScheduler

Next, we will create a project specific Python virtual environment using the venv module. In order to do that, we first need to install the package for venv by executing the following command in a terminal window:

$ sudo apt install -y python3.10-venv

The Python venv module allows one to create a lightweight virtual environments, each with its own directory structure, that are isolated from the system specific directory structure. To create a Python virtual environment, execute the following command(s) in the terminal window:

$ cd $HOME/APScheduler

$ python3 -m venv venv

This will create a directory called venv under the current directory. On needs to activate the newly created virtual environment by executing the following command in the terminal window:

$ source venv/bin/activate

On successful virtual environment activation, the prompt will be prefixed with (venv).

We will now install the following Python modules:

Execute the following command(s) in the terminal window (with venv activated):

$ pip install apscheduler sqlalchemy

Next, we will install a small, fast, self-contained, highly-reliable, full-featured, open-source SQL database engine called sqlite by executing the following command in a terminal window:

$ sudo apt install -y sqlite3 sqlitebrowser

For storing the sqlite database file, we will create a data directory under the directory APScheduler by executing the following command in the terminal window:

$ mkdir -p $HOME/APScheduler/data


Hands-on Python APScheduler

The following is the Python script called sample-1.py that demonstrates a simple scheduling application:


sample-1.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   04 Sep 2022
#

import logging
import time
from apscheduler.schedulers.background import BackgroundScheduler

logging.basicConfig(format='%(levelname)s %(asctime)s - %(message)s', level=logging.INFO)

logger = logging.getLogger('sample-1')


def task():
    logger.info('Started sample-1 task...')
    time.sleep(1)
    logger.info('Completed sample-1 task !!!')


def main():
    scheduler = BackgroundScheduler(daemon=True)
    scheduler.add_job(task, trigger='interval', seconds=60, misfire_grace_time=5*60)
    scheduler.start()
    try:
        while True:
            time.sleep(5)
    except KeyboardInterrupt:
        scheduler.shutdown()


if __name__ == '__main__':
    main()

Some aspects of the sample-1.py from the above needs a little explanation.

The try-except block with the sleep in a loop is important as it will keep the background thread running. Else the program will terminate and nothing will happen.

To test our the Python scheduler application, execute the following command in the venv terminal window and wait for about 3 minutes before interrupting by pressing CTRL-C:

$ python3 sample-1.py

The following would be a typical output:

Output.1

INFO 2022-09-04 14:14:18,803 - Adding job tentatively -- it will be properly scheduled when the scheduler starts
INFO 2022-09-04 14:14:18,803 - Added job "task" to job store "default"
INFO 2022-09-04 14:14:18,803 - Scheduler started
INFO 2022-09-04 14:15:18,803 - Running job "task (trigger: interval[0:01:00], next run at: 2022-09-04 14:15:18 EDT)" (scheduled at 2022-09-04 14:15:18.803045-04:00)
INFO 2022-09-04 14:15:18,803 - Started sample-1 task...
INFO 2022-09-04 14:15:19,805 - Completed sample-1 task !!!
INFO 2022-09-04 14:15:19,805 - Job "task (trigger: interval[0:01:00], next run at: 2022-09-04 14:16:18 EDT)" executed successfully
INFO 2022-09-04 14:16:18,804 - Running job "task (trigger: interval[0:01:00], next run at: 2022-09-04 14:17:18 EDT)" (scheduled at 2022-09-04 14:16:18.803045-04:00)
INFO 2022-09-04 14:16:18,804 - Started sample-1 task...
INFO 2022-09-04 14:16:19,805 - Completed sample-1 task !!!
INFO 2022-09-04 14:16:19,805 - Job "task (trigger: interval[0:01:00], next run at: 2022-09-04 14:17:18 EDT)" executed successfully
<CTRL-C>
INFO 2022-09-04 14:16:23,814 - Scheduler has been shut down

As was indicated in the introduction, the default jobstore used by APScheduler is memory. So, if sample-1.py crashes and restarts, it will appear as though its a fresh start.

For the next demostration, we will configure the sqlite database as the jobstore.

The following is the Python script sample-2.py that is functionally the same as the previous case, except that it uses a persistent jobstore:


sample-2.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   04 Sep 2022
#

import logging
import time
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler

logging.basicConfig(format='%(levelname)s %(asctime)s - %(message)s', level=logging.INFO)

logger = logging.getLogger('sample-2')

jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:////home/alice/APScheduler/data/jobs.db')
}

def task():
    logger.info('Started sample-2 task...')
    time.sleep(2)
    logger.info('Completed sample-2 task !!!')


def main():
    job_id = 'sample-2'
    scheduler = BackgroundScheduler(jobstores=jobstores, daemon=True)
    scheduler.add_job(task, id=job_id, trigger='interval', seconds=60, misfire_grace_time=5*60)
    scheduler.start()
    try:
        while True:
            scheduler.print_jobs(jobstore="default")
            time.sleep(15)
    except KeyboardInterrupt:
        scheduler.remove_job(job_id)
        scheduler.shutdown()


if __name__ == '__main__':
    main()

Some aspects of the sample-2.py from the above needs a little explanation.

Notice how the jobstore is configured during the instantiation of the BackgroundScheduler using the option jobstores, which takes in a Python dictionary.

To test our the Python scheduler application, execute the following command in the venv terminal window and wait for about 3 minutes before interrupting by pressing CTRL-C:

$ python3 sample-2.py

The following would be a typical output:

Output.2

INFO 2022-09-04 19:55:36,543 - Adding job tentatively -- it will be properly scheduled when the scheduler starts
INFO 2022-09-04 19:55:36,554 - Added job "task" to job store "default"
INFO 2022-09-04 19:55:36,554 - Scheduler started
Jobstore default:
    task (trigger: interval[0:01:00], next run at: 2022-09-04 19:56:36 EDT)
Jobstore default:
    task (trigger: interval[0:01:00], next run at: 2022-09-04 19:56:36 EDT)
Jobstore default:
    task (trigger: interval[0:01:00], next run at: 2022-09-04 19:56:36 EDT)
Jobstore default:
    task (trigger: interval[0:01:00], next run at: 2022-09-04 19:56:36 EDT)
INFO 2022-09-04 19:56:36,551 - Running job "task (trigger: interval[0:01:00], next run at: 2022-09-04 19:56:36 EDT)" (scheduled at 2022-09-04 19:56:36.543060-04:00)
INFO 2022-09-04 19:56:36,551 - Started sample-2 task...
Jobstore default:
    task (trigger: interval[0:01:00], next run at: 2022-09-04 19:57:36 EDT)
INFO 2022-09-04 19:56:38,555 - Completed sample-2 task !!!
INFO 2022-09-04 19:56:38,555 - Job "task (trigger: interval[0:01:00], next run at: 2022-09-04 19:57:36 EDT)" executed successfully
Jobstore default:
    task (trigger: interval[0:01:00], next run at: 2022-09-04 19:57:36 EDT)
<CTRL-C>
INFO 2022-09-04 19:56:52,103 - Removed job sample-2
INFO 2022-09-04 19:56:52,103 - Scheduler has been shut down

Launch the sqlite browser and access the database /home/alice/APScheduler/data/jobs.db (before pressing CTRL-C). The following illustration shows the row from the job table:

SQLite Browser
Figure.1

What if we want to be notified of either the success or the failure of a job ??? This is where the job event listener comes to the rescue.

For the next demostration, we will specify a job event listener, which will get invoked post a job execution.

The following is the Python script sample-3.py that is functionally the same as the previous case, except that it uses a job status listener:


sample-3.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   04 Sep 2022
#

import logging
import time
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR

logging.basicConfig(format='%(levelname)s %(asctime)s - %(message)s', level=logging.INFO)

logger = logging.getLogger('sample-3')

jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:////home/alice/APScheduler/data/jobs.db')
}


def task():
    logger.info('Started sample-3 task...')
    time.sleep(3)
    logger.info('Completed sample-3 task !!!')


def job_status_listener(event):
    if event.exception:
        logger.error('The job [%s] encountered exception ...' % event.job_id)
    else:
        logger.info('The job [%s] succeed !!!' % event.job_id)


def main():
    job_id = 'sample-3'
    scheduler = BackgroundScheduler(jobstores=jobstores, daemon=True)
    scheduler.add_job(task, id=job_id, trigger='interval', seconds=60, misfire_grace_time=5 * 60)
    scheduler.add_listener(job_status_listener, EVENT_JOB_ERROR | EVENT_JOB_EXECUTED)
    scheduler.start()
    try:
        while True:
            scheduler.print_jobs(jobstore="default")
            time.sleep(30)
    except KeyboardInterrupt:
        scheduler.remove_job(job_id)
        scheduler.shutdown()


if __name__ == '__main__':
    main()

Some aspects of the sample-3.py from the above needs a little explanation.

To test our the Python scheduler application, execute the following command in the venv terminal window and wait for about 3 minutes before interrupting by pressing CTRL-C:

$ python3 sample-3.py

The following would be a typical output:

Output.3

INFO 2022-09-04 20:24:59,910 - Adding job tentatively -- it will be properly scheduled when the scheduler starts
INFO 2022-09-04 20:24:59,915 - Added job "task" to job store "default"
INFO 2022-09-04 20:24:59,915 - Scheduler started
Jobstore default:
    task (trigger: interval[0:01:00], next run at: 2022-09-04 20:25:59 EDT)
Jobstore default:
    task (trigger: interval[0:01:00], next run at: 2022-09-04 20:25:59 EDT)
INFO 2022-09-04 20:25:59,918 - Running job "task (trigger: interval[0:01:00], next run at: 2022-09-04 20:25:59 EDT)" (scheduled at 2022-09-04 20:25:59.909965-04:00)
INFO 2022-09-04 20:25:59,919 - Started sample-3 task...
Jobstore default:
    task (trigger: interval[0:01:00], next run at: 2022-09-04 20:26:59 EDT)
INFO 2022-09-04 20:26:02,920 - Completed sample-3 task !!!
INFO 2022-09-04 20:26:02,920 - Job "task (trigger: interval[0:01:00], next run at: 2022-09-04 20:26:59 EDT)" executed successfully
INFO 2022-09-04 20:26:02,920 - The job [sample-3] succeed !!!
Jobstore default:
    task (trigger: interval[0:01:00], next run at: 2022-09-04 20:26:59 EDT)
<CTRL-C>
INFO 2022-09-04 20:26:31,947 - Removed job sample-3
INFO 2022-09-04 20:26:31,947 - Scheduler has been shut down

Now that we have a basic understanding and working knowledge of the core parts of APScheduler, it is time to tackle a real use-case. Typically, a job (or a task) needs some input parameters (arguments to the task function). For example, the job may be to process a daily feed from a particular location in the filesystem. Also, let us assume that the daily feed arrives at around 7 AM EST. In reality, it is possible that the feed may be delayed a little bit. In such cases, one may have to adjust the job's schedule to retry again after a fixed interval till the feed is successfully processed. This use-case of our next demonstration.

The following is the Python script sample-4.py that will look for a file (dummy.dat in this case) in a specific folder (/tmp in this case) and adjust the job's schedule on exception to run more frequently:


sample-4.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   04 Sep 2022
#

import logging
import time
import os
import zoneinfo
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR

logging.basicConfig(format='%(levelname)s %(asctime)s - %(message)s', level=logging.INFO)

logger = logging.getLogger('sample-4')

jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:////home/bswamina/MyProjects/Python/APScheduler/data/jobs.db')
}

tz_NYC = zoneinfo.ZoneInfo('America/New_York')
scheduler = BackgroundScheduler(jobstores=jobstores, misfire_grace_time=5*60, daemon=True)


def task(jid, root, file):
    logger.info('Started [%s] task...' % jid)
    time.sleep(2)
    if not os.path.exists(os.path.join(root, file)):
        raise FileNotFoundError
    logger.info('Completed [%s] task !!!' % jid)


def job_status_listener(event):
    if event.exception:
        logger.error('*** The job [%s] encountered exception !!!' % event.job_id)
        # Failure - reschedule for sooner
        scheduler.reschedule_job(event.job_id, trigger='interval', seconds=15)
    else:
        logger.info('The job [%s] succeed.' % event.job_id)
        # Success - Back to default
        scheduler.reschedule_job(event.job_id, trigger='cron', day_of_week='mon-fri', minute='*/1', timezone=tz_NYC)


def main():
    job_id = 'sample-4'
    root = '/tmp'
    file = 'dummy.dat'
    scheduler.add_job(task, id=job_id, args=[job_id, root, file], trigger='cron', day_of_week='mon-fri',
                      minute='*/1', timezone=tz_NYC)
    scheduler.add_listener(job_status_listener, EVENT_JOB_ERROR | EVENT_JOB_EXECUTED)
    scheduler.start()
    try:
        while True:
            scheduler.print_jobs(jobstore="default")
            time.sleep(30)
    except KeyboardInterrupt:
        scheduler.remove_job(job_id)
        scheduler.shutdown()


if __name__ == '__main__':
    main()

Some aspects of the sample-4.py from the above needs a little explanation.

To test our the Python scheduler application, execute the following command in the venv terminal window and wait for about a minute or so:

$ python3 sample-4.py

The following would be a typical output:

Output.4

INFO 2022-09-04 20:32:41,726 - Adding job tentatively -- it will be properly scheduled when the scheduler starts
INFO 2022-09-04 20:32:41,733 - Added job "task" to job store "default"
INFO 2022-09-04 20:32:41,733 - Scheduler started
Jobstore default:
    task (trigger: cron[day_of_week='mon-fri', minute='*/1'], next run at: 2022-09-04 20:33:00 EDT)
INFO 2022-09-04 20:33:00,008 - Running job "task (trigger: cron[day_of_week='mon-fri', minute='*/1'], next run at: 2022-09-04 20:33:00 EDT)" (scheduled at 2022-09-04 20:33:00-04:00)
INFO 2022-09-04 20:33:00,009 - Started [sample-4] task...
ERROR 2022-09-04 20:33:02,011 - Job "task (trigger: cron[day_of_week='mon-fri', minute='*/1'], next run at: 2022-09-04 20:34:00 EDT)" raised an exception
Traceback (most recent call last):
  File "/home/alice/APScheduler/venv/lib/python3.10/site-packages/apscheduler/executors/base.py", line 125, in run_job
    retval = job.func(*job.args, **job.kwargs)
  File "/home/alice/APScheduler/sample-4.py", line 31, in task
    raise FileNotFoundError
FileNotFoundError
ERROR 2022-09-04 20:33:02,012 - *** The job [sample-4] encountered exception !!!
Jobstore default:
    task (trigger: interval[0:00:15], next run at: 2022-09-04 20:33:17 EDT)
INFO 2022-09-04 20:33:17,017 - Running job "task (trigger: interval[0:00:15], next run at: 2022-09-04 20:33:32 EDT)" (scheduled at 2022-09-04 20:33:17.013184-04:00)
INFO 2022-09-04 20:33:17,018 - Started [sample-4] task...
ERROR 2022-09-04 20:33:19,020 - Job "task (trigger: interval[0:00:15], next run at: 2022-09-04 20:33:32 EDT)" raised an exception
Traceback (most recent call last):
  File "/home/alice/APScheduler/venv/lib/python3.10/site-packages/apscheduler/executors/base.py", line 125, in run_job
    retval = job.func(*job.args, **job.kwargs)
  File "/home/alice/APScheduler/sample-4.py", line 31, in task
    raise FileNotFoundError
FileNotFoundError
ERROR 2022-09-04 20:33:19,021 - *** The job [sample-4] encountered exception !!!

Open another terminal window and execute the following commands:

$ cd /tmp

$ > dummy.dat

Wait for a few seconds and then press CTRL-C on the terminal running sample-4.py and we will observe the following output:

Output.5

INFO 2022-09-04 20:33:34,025 - Running job "task (trigger: interval[0:00:15], next run at: 2022-09-04 20:33:49 EDT)" (scheduled at 2022-09-04 20:33:34.021253-04:00)
INFO 2022-09-04 20:33:34,026 - Started [sample-4] task...
INFO 2022-09-04 20:33:36,028 - Completed [sample-4] task !!!
INFO 2022-09-04 20:33:36,028 - Job "task (trigger: interval[0:00:15], next run at: 2022-09-04 20:33:49 EDT)" executed successfully
INFO 2022-09-04 20:33:36,028 - The job [sample-4] succeed.
Jobstore default:
    task (trigger: cron[day_of_week='mon-fri', minute='*/1'], next run at: 2022-09-04 20:34:00 EDT)
<CTRL-C>
INFO 2022-09-04 20:33:46,254 - Removed job sample-4
INFO 2022-09-04 20:33:46,254 - Scheduler has been shut down

The following is the link to the Github Repo that provides all the code samples from this article:


References

Advanced Python Scheduler Documentation



© PolarSPARC