Skip to content

Schedulers

Schedulers are the thing that makes all magic and binds everything together. You can see it as a glue.

Usually the developer does not deal/handle the stores, executors or even triggers manually, instead that is managed by the scheduler that acts as an interface amongst them all.

Asyncz being dedicated to ASGI and asyncio brings the AsyncIOScheduler out of the box and only supports this one natively but like everything in Asyncz, you can also create your own custom scheduler that does not necessarily need to be for async. You can build your own scheduler for blocking/background applications.

In fact, Asyncz is used by Esmerald as internal scheduling system and uses the supported scheduler from Asyncz to perform its tasks.

Parameters

All schedulers contain at least:

  • global_config - A python dictionary containing configurations for the schedulers. See the examples of how to configure a scheduler.

    Default: None

  • kwargs - Any keyword parameters being passed to the scheduler up instantiation.

    Default: None

Configuring the scheduler

Due its simplificy, Asyncz provides some ways of configuring the scheduler for you.

from asyncz.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()
from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()

What is happening here?:

When you create the scheduler like the examples above, it is creating an AsyncIOScheduler with a MemoryStore named default and starting the asyncio event loop.

Example configuration

Let us assume you now need a very custom configuration with more than one store, executors and custom settings.

  • Two stores - A mongo and a redis.
  • Two executors - An asyncio and a thread pool.
  • Coalesce turned off for new tasks by default.
  • Maximum instance limiting to 4 for new tasks.

First option

The first way of doing the configuration is in a simple pythonic fashion.

import pytz

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.stores.mongo import MongoDBStore
from asyncz.stores.redis import RedisStore

# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"mongo": MongoDBStore(), "default": RedisStore(database=0)}

# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {"default": AsyncIOExecutor(), "threadpool": ThreadPoolExecutor(max_workers=20)}

# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}

# Start the scheduler
scheduler = AsyncIOScheduler(
    stores=stores, executors=executors, task_defaults=task_defaults, timezone=pytz.utc
)

Second option

The second option is by starting the scheduler and injecting a dictionary directly upon instantiation.

from asyncz.schedulers.asyncio import AsyncIOScheduler

# Start the scheduler
scheduler = AsyncIOScheduler(
    {
        "asyncz.stores.mongo": {"type": "mongodb"},
        "asyncz.stores.default": {"type": "redis", "database": "0"},
        "asyncz.executors.threadpool": {
            "max_workers": "20",
            "class": "asyncz.executors.threadpool:ThreadPoolExecutor",
        },
        "asyncz.executors.default": {"class": "asyncz.executors.asyncio::AsyncIOExecutor"},
        "asyncz.task_defaults.coalesce": "false",
        "asyncz.task_defaults.max_instances": "3",
        "asyncz.task_defaults.timezone": "UTC",
    },
)

Third option

The third option is by starting the scheduler and use the configure method.

import pytz

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.stores import MongoDBStore, RedisStore

# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"mongo": MongoDBStore(), "default": RedisStore(database=0)}

# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {"default": AsyncIOExecutor(), "threadpool": ThreadPoolExecutor(max_workers=20)}

# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}

# Start the scheduler
scheduler = AsyncIOScheduler()

## Add some tasks here or anything else (for instance 3 tasks)
scheduler.add_task(...)
scheduler.add_task(...)
scheduler.add_task(...)

scheduler.setup(stores=stores, executors=executors, task_defaults=task_defaults, timezone=pytz.utc)

Starting and stopping the scheduler

Every scheduler inherits from the BaseScheduler and therefore implement the mandatory functions such as start and shutdown.

To start the scheduler simply run:

from asyncz.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()

# Start the scheduler
scheduler.start()

To stop the scheduler simply run:

from asyncz.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()

# Shutdown the scheduler
scheduler.shutdown()

Adding tasks

A scheduler to work needs tasks, of course and Asyncz offers some ways of adding tasks into the scheduler.

There is also a third option but that is related with the integration with ASGI frameworks, for instance esmerald which it should not be used in this agnostic context.

Add tasks

Adding a task via add_task is the most common and probably the one you will be using more times than the scheduled_task.

The add_task returns an instance of Task.

So, how can you add a task?

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def collect_www_info():
    # Add logic to collect information from the internet
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every 2 minutes
scheduler.add_task(
    fn=collect_www_info,
    trigger=IntervalTrigger(minutes=2),
    max_instances=1,
    replace_existing=True,
    coalesce=True,
)

# Run every 10 minutes
scheduler.add_task(
    fn=check_status,
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)

# Run every 10 minutes collect_www_info before sending the newsletter
feed_data = collect_www_info()

scheduler.add_task(
    fn=send_email_newsletter,
    args=[feed_data],
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)


# Start the scheduler
scheduler.start()

What happen here is actually very simple. We created an instance of the AsyncIOScheduler and added the functions send_email_newsletter, collect_www_info, check_status to the scheduler and started it.

Why then passing CronTrigger and IntervalTrigger instances instead of simply passing cron or interval?

Well, we want to pass some attributes to the object and this way makes it cleaner and simpler.

When adding tasks there is not a specific order. You can add tasks at any given time. If the scheduler is not yet running, once it does it will add the tasks to it.

Scheduled tasks

Scheduled tasks works in the same way as add_tasks with the unique difference that the replacing_existing is always True and it is used as a decorator.

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


# Run every Monday, Wednesday and Friday
@scheduler.scheduled_task(
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5")
)
def send_email_newsletter():
    # Add logic to send emails here
    ...


# Run every 2 minutes
@scheduler.scheduled_task(
    trigger=IntervalTrigger(minutes=2),
    max_instances=1,
    coalesce=True,
)
def collect_www_info():
    # Add logic to collect information from the internet
    ...


@scheduler.scheduled_task(
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    coalesce=False,
)
def check_status():
    # Logic to check a given status of whatever needed
    ...


# Start the scheduler
scheduler.start()

Deleting tasks

In the same way you can add tasks you can also remove them with the same ease and there are also different ways of removing them.

Delete task

This is probably the most common way of removing tasks from the scheduler using the task id and the store alias.

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def collect_www_info():
    # Add logic to collect information from the internet
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every hour and minute 1
scheduler.add_task(
    id="status",
    fn=check_status,
    trigger=CronTrigger(hour="0-23", minute="1"),
)

# Remove the tasks by ID and store alias
scheduler.delete_task("send_newsletter")
scheduler.delete_task("status")

Delete

The delete function is probably more convenient but it requires that you store the Task somewhere ocne the instance is received and for tasks scheduled by scheduled task this method does not work, instead only the delete task will work.

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")

# delete task
task.delete()

# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")

# delete task
task.delete()

Pause and resume task

As shown above, you can add and remove tasks but you can pause and resume tasks as well. When a task is paused, there is no next time to run since the action is no longer being validate. That can be again reactivated by resuming that same Task.

Like the previous examples, there are also multiple ways of achieving that.

Pause task

Like delete_task, you can pause a task using the id.

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every hour and minute 1
scheduler.add_task(
    id="status",
    fn=check_status,
    trigger=CronTrigger(hour="0-23", minute="1"),
)

# Pause the tasks by ID and store alias
scheduler.pause_task("send_newsletter")
scheduler.pause_task("status")

Pause

The same is applied to the simple pause where you can do it directly via task instance.

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")

# Pause task
task.pause()

# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")

# Pause task
task.pause()

Resume task

Resuming a task is as simple as again, passing a task id.

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every hour and minute 1
scheduler.add_task(
    id="status",
    fn=check_status,
    trigger=CronTrigger(hour="0-23", minute="1"),
)

# Resume the tasks by ID and store alias
scheduler.resume_task("send_newsletter")
scheduler.resume_task("status")

Resume

Same for the resume. You can resume a task directly from the instance.

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")

# resume task
task.resume()

# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")

# Resume task
task.resume()

Check

add_task, delete_task, pause_task and resume_task expect a mandatory task_id parameter as well an optional store name. Why the store name? Because you might want to store the tasks in different places and this points it out the right place.

Update task

As mentioned in the tasks section, internally the scheduler updates the information given to the task and then executes it.

You can update any attribute of the task by calling:

From a task instance

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


# Create a task
task = Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

# Update the task
task.update(
    name="my-new-task-id",
    max_instances=5,
    coalesce=False,
)

From the scheduler

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_email_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every 10 minutes
scheduler.add_task(
    id="check_status",
    fn=check_status,
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)

# Update the task
scheduler.update_task("send_email_newsletter", coalesce=False, max_instances=4)
scheduler.update_task("check_status", coalesce=True, max_instances=3)

# Start the scheduler
scheduler.start()

Important note

All attributes can be updated but the id as this is immutable.

Reschedule tasks

You can also reschedule a task if you want/need but by change what it means is changing only the trigger by using:

Reschedule the task instance

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


# Create a task
task = Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

# Reschedule the task
task.reschedule("my-task", trigger="cron", hour=10, minute=5)

Reschedule from the scheduler

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_email_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every 10 minutes
scheduler.add_task(
    id="check_status",
    fn=check_status,
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)

# Reschedule the tasks
scheduler.reschedule_task("send_email_newsletter", trigger="cron", day_of_week="mon", hour="1")
scheduler.reschedule_task("check_status", trigger="interval", minutes=20)

# Start the scheduler
scheduler.start()

Resume and pause the tasks

Resuming and pausing task processing (all tasks) is also allowed with simple instructions.

Pausing all tasks

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every hour and minute 1
scheduler.add_task(
    id="status",
    fn=check_status,
    trigger=CronTrigger(hour="0-23", minute="1"),
)

# Pause all tasks
scheduler.pause()

Resuming all tasks

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every hour and minute 1
scheduler.add_task(
    id="status",
    fn=check_status,
    trigger=CronTrigger(hour="0-23", minute="1"),
)

# Pause all tasks
scheduler.pause()

# Resume all tasks
scheduler.resume()

Start the scheduler in the paused state

Starting the scheduler without the paused state means without the first wakeup call.

import pytz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every hour and minute 1
scheduler.add_task(
    id="status",
    fn=check_status,
    trigger=CronTrigger(hour="0-23", minute="1"),
)

# Pause all tasks
scheduler.start(paused=True)

BaseScheduler

The base of all available schedulers provided by Asyncz and it should be the base of any custom scheduler.

The parameters are the same as the ones described before.

from asyncz.schedulers.base import BaseScheduler

AsyncIOScheduler

This scheduler is the only one (at least for now) supported by Asyncz and as mentioned before, it inherits from the BaseScheduler.

from asyncz.schedulers import AsyncIOScheduler

This special scheduler besides the normal parameters of the scheduler, also contains some additional ones.

  • event_loop - An optional. async event_loop to be used. If nothing is provided, it will use the asyncio.get_event_loop() (global).

    Default: None

  • timeout - A timeout used for start and stop the scheduler.

    Default: None

Custom Scheduler

As mentioned before, Asyncz and the nature of its existence is to be more focused on ASGI and asyncio applications but it is not limited to it.

You can create your own scheduler for any other use case, for example a blocking or background scheduler.

Usually when creating a custom scheduler you must override at least 3 functions.

  • start() - Function used to start/wakeup the scheduler for the first time.
  • shutdown() - Function used to stop the scheduler and release the resources created up start().
  • wakeup() - Manage the timer to notify the scheduler of the changes in the store.

There are also some optional functionalities you can override if you want.

  • create_default_executor - Override this function if you want a different default executor.
from asyncz.schedulers.base import BaseScheduler
from asyncz.typing import DictAny


class MyCustomScheduler(BaseScheduler):
    def __init__(self, **kwargs: DictAny) -> None:
        super().__init__(**kwargs)

    def start(self, paused: bool = False):
        # logic for the start
        ...

    def shutdown(self, wait: bool = True):
        # logic for the shutdown
        ...

    def wakeup(self):
        # logic for the wakeup
        ...

    def create_default_executor(self):
        # logic for your default executor
        ...

Limit the number of currently executing instances

By default, only one instance of each Task is allowed to run at the same time. To change that when creating a task you can set the max_instances to the number you desire and this will let the scheduler know how many should run concurrently.

Events

It is also possible to attach event listeners to the schedule. The events are triggered on specific occasions and may carry some additional information with them regarding detauls of that specific event. Check the events section to see the available events.

import pytz
from loguru import logger

from asyncz.events.constants import TASK_ADDED, TASK_REMOVED
from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)


def my_custom_listener(event):
    if not event.exception:
        logger.info("All good")
    else:
        logger.exception("Problem with the task")


# Add event listener
scheduler.add_listener(my_custom_listener, TASK_ADDED | TASK_REMOVED)

Final thoughts

Asyncz since it is a revamp, simplified and rewritten version of APScheduler, you will find very common ground and similarities to it and that is intentional as you shouldn't be unfamiliar with a lot of concepts if you are already familiar with APScheduler.