Skip to content

Schedulers

Schedulers are the thing that makes all magic and binds everything together. You can see it as a glue.

Usually the developer does not deal/handle the stores, executors or even triggers manually, instead that is managed by the scheduler that acts as an interface amongst them all.

Asyncz being dedicated to ASGI and asyncio brings the AsyncIOScheduler out of the box and only supports this one natively but like everything in Asyncz, you can also create your own custom scheduler that does not necessarily need to be for async. You can build your own scheduler for blocking/background applications.

In fact, Asyncz is used by Esmerald as internal scheduling system and uses the supported scheduler from Asyncz to perform its tasks.

Parameters

All schedulers contain at least:

  • global_config - A python dictionary containing configurations for the schedulers. See the examples of how to configure a scheduler.

    Default: None

  • kwargs - Any keyword parameters being passed to the scheduler up instantiation.

    Default: None

Configuring the scheduler

Due its simplificy, Asyncz provides some ways of configuring the scheduler for you.

from asyncz.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()
from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()

What is happening here?:

When you create the scheduler like the examples above, it is creating an AsyncIOScheduler with a MemoryStore named default and starting the asyncio event loop.

Example configuration

Let us assume you now need a very custom configuration with more than one store, executors and custom settings.

  • Two stores - A mongo and a redis.
  • Two executors - An asyncio and a thread pool.
  • Coalesce turned off for new tasks by default.
  • Maximum instance limiting to 4 for new tasks.

First option

The first way of doing the configuration is in a simple pythonic fashion.

from datetime import timezone as tz

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.stores.mongo import MongoDBStore
from asyncz.stores.redis import RedisStore

# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"mongo": MongoDBStore(), "default": RedisStore(database=0)}

# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
    "default": AsyncIOExecutor(),
    "threadpool": ThreadPoolExecutor(max_workers=20),
}

# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}

# Create the scheduler
scheduler = AsyncIOScheduler(
    stores=stores, executors=executors, task_defaults=task_defaults, timezone=tz.utc
)

# Start the scheduler
with scheduler:
    # note: you can also use start() and shutdown() manually
    # Nesting is also not a problem (start and shutdown are refcounted and only the outermost scope does start and shutdown the scheduler)

    scheduler.start()

    scheduler.stop()

# manually you have more control like:
scheduler.start(paused=True)
scheduler.resume()

# noop because not outermost scope
with scheduler:
    ...
scheduler.shutdown(wait=False)

Second option

The second option is by starting the scheduler and injecting a dictionary directly upon instantiation.

from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(
    global_config={
        "asyncz.stores.mongo": {"type": "mongodb"},
        "asyncz.stores.default": {"type": "redis", "database": "0"},
        "asyncz.executors.pool": {
            "max_workers": "20",
            "class": "asyncz.executors.pool:ThreadPoolExecutor",
        },
        "asyncz.executors.default": {"class": "asyncz.executors.asyncio:AsyncIOExecutor"},
        "asyncz.task_defaults.coalesce": "false",
        "asyncz.task_defaults.max_instances": "3",
        "asyncz.task_defaults.timezone": "UTC",
    },
)

# Start the scheduler
with scheduler:
    ...
    # your code

Third option

The third option is by starting the scheduler and use the setup method.

from datetime import timezone as tz

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.stores import MongoDBStore, RedisStore

# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"mongo": MongoDBStore(), "default": RedisStore(database=0)}

# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
    "default": AsyncIOExecutor(),
    "threadpool": ThreadPoolExecutor(max_workers=20),
}

# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}

# Create the scheduler
scheduler = AsyncIOScheduler()

## Add some tasks here or anything else (for instance 3 tasks)
scheduler.add_task(...)
scheduler.add_task(...)
scheduler.add_task(...)

scheduler.setup(stores=stores, executors=executors, task_defaults=task_defaults, timezone=tz.utc)

# Start the scheduler
with scheduler:
    ...
    # your code

Multi-Proccessing mode

Asyncz schedulers have an optional multiprocessing mode. It can be activated by setting the lock_path option to e.g. "/tmp/asyncz_{store}_{pgrp}.lock"

This defines a per-store process lock via a file.

Parameters:

  • {store} (no format string) - Set the store name. Should be provided.
  • ppid - Replaced by the ppid of the process. Formatting possible.
  • pgrp - Replaced by the pgrp of the process. Formatting possible.
from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(
    global_config={
        "asyncz.lock_path": "/tmp/asynzc_super_project_{store}_store.pid",
        "asyncz.startup_delay": 2,
        "asyncz.stores.mongo": {"type": "mongodb"},
        "asyncz.stores.default": {"type": "redis", "database": "0"},
        "asyncz.executors.pool": {
            "max_workers": "20",
            "class": "asyncz.executors.pool:ThreadPoolExecutor",
        },
        "asyncz.executors.default": {"class": "asyncz.executors.asyncio:AsyncIOExecutor"},
        "asyncz.task_defaults.coalesce": "false",
        "asyncz.task_defaults.max_instances": "3",
        "asyncz.task_defaults.timezone": "UTC",
    },
)

# Start the scheduler
with scheduler:
    ...
    # your code

Note

You may want to set an explicit startup_delay in case of the initial started tasks are behaving spurious. By default a startup_delay of 1 second is used in case of lock_path not empty. Otherwise the default is 0.

Changing logger name and class

asyncz uses a custom way of logging: it builds up a dictionary store with loggers of the standard logger interface. They are retrieved from schedulers via their alias name plus prefix.

e.g. asyncz.schedulers, asyncz.stores.default, asyncz.executors.default

Scheduler has an optional parameter named logger_name. If set the the schedulers logger becomes:

asyncz.schedulers.<name specified>

By default asyncz uses loguru as logger (when available) and falls back to classical logging.

If this is not wished there are some methods:

  • setting either via global config or direct the value of loggers_class to asyncz.schedulers.base:ClassicLogging (or the class itself instead of the string) when creating a scheduler object
  • setting asyncz.schedulers.base.default_loggers_class to ClassicLogging (same file, only class is possible here)

Starting and stopping the scheduler

Every scheduler inherits from the BaseScheduler and therefore implement the mandatory functions such as start and shutdown.

To start the scheduler simply run:

from asyncz.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()

# Start the scheduler
scheduler.start()

To stop the scheduler simply run:

from asyncz.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()

# Shutdown the scheduler
scheduler.shutdown()

Adding tasks

A scheduler to work needs tasks, of course and Asyncz offers some ways of adding tasks into the scheduler.

There is also a third option but that is related with the integration with ASGI frameworks, for instance esmerald which it should not be used in this agnostic context.

Add tasks

Adding a task via add_task is the most common.

The add_task returns an instance of Task.

So, how can you add a task?

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
from asyncz.tasks import Task

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def collect_www_info():
    # Add logic to collect information from the internet
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every 2 minutes
scheduler.add_task(
    collect_www_info,
    trigger=IntervalTrigger(minutes=2),
    max_instances=1,
    replace_existing=True,
    coalesce=True,
)

# Run every 10 minutes
scheduler.add_task(
    check_status,
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)

# Run every 10 minutes collect_www_info before sending the newsletter
feed_data = collect_www_info()

scheduler.add_task(
    fn_or_task=send_email_newsletter,
    args=[feed_data],
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)

# Add Task object

task = Task(
    fn=send_email_newsletter,
    args=[feed_data],
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)
# you can update most attributes here. Note: a task can be only submitted once
scheduler.add_task(task)

# Use Task as decorator (leave fn empty)
decorator = scheduler.add_task(
    args=[feed_data],
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)
decorator(send_email_newsletter)


# Start the scheduler
scheduler.start()

# Add paused Task
scheduler.add_task(
    send_email_newsletter,
    args=[feed_data],
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
    # this pauses the task on submit
    next_run_time=None,
)

What happen here is actually very simple. We created an instance of the AsyncIOScheduler and added the functions send_email_newsletter, collect_www_info, check_status to the scheduler and started it.

Why then passing CronTrigger and IntervalTrigger instances instead of simply passing cron or interval?

Well, we want to pass some attributes to the object and this way makes it cleaner and simpler.

When adding tasks there is not a specific order. You can add tasks at any given time. If the scheduler is not yet running, once it does it will add the tasks to it.

Parameters

  • fn_or_task - (positional or via this name). The callable function to execute or the task to submit.
  • id - The unique identifier of this task. Leave empty to autogenerate an id or switch to the
  • name - The description of this task.
  • args - Positional arguments to the callable.
  • kwargs - Keyword arguments to the callable.
  • coalesce - Whether to only run the task once when several run times are due.
  • trigger - The trigger object that controls the schedule of this task.
  • executor - The name of the executor that will run this task.
  • mistrigger_grace_time - The time (in seconds) how much this task's execution is allowed to be late (None means "allow the task to run no matter how late it is").
  • max_instances - The maximum number of concurrently executing instances allowed for this task.
  • next_run_time - The next scheduled run time of this task. If set to None, the task start paused.
  • replace_existing - The submitted task replaces an existing task with the same id. Otherwise a ConflictId error is thrown.

Note

add_task has a special pause mode: next_run_time can be set to None for starting a Task paused. This works also with Task objects.

Tip

When submitting a Task object, most attributes can be changed by providing arguments for e.g. trigger, name and other kwargs. However the task is updated in-place. No copy is made. This has interesting effects: for example a decorator mode Task can be turned in a normal one by providing an id and is submitted in-place.

Add tasks as decorator

When leaving out the fn parameter, you get back a decorator mode Task.

It has two submodes:

  • with provided id
  • without provided id

Both modes share, that a copy of the task is created and submitted (with the function applied on it).

With provided id

The Task copy is submitted to the scheduler with replacing_existing=True. Other tasks with the same id are replaced. For getting the copy the task.id can be used to retrieve it.

Without provided id

The Task copy is submitted to the scheduler with replacing_existing=False and has an autogenerated id.

Also the function decorated has now an attribute: asyncz_tasks containing the copy. If multiple decorator mode tasks are applied all of the copies are now saved in asyncz_tasks attribute of the function.

If this behavior is unwanted, scheduler.add_task can be used in a partial.

Examples

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


# Run every Monday, Wednesday and Friday
@scheduler.add_task(
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5")
)
def send_email_newsletter():
    # Add logic to send emails here
    ...


# Run every 2 minutes
@scheduler.add_task(
    trigger=IntervalTrigger(minutes=2),
    max_instances=1,
    coalesce=True,
)
def collect_www_info():
    # Add logic to collect information from the internet
    ...


@scheduler.add_task(
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    coalesce=False,
)
def check_status():
    # Logic to check a given status of whatever needed
    ...


# has now asyncz_tasks containing the copy
hasattr(check_status, "asyncz_tasks")

# Start the scheduler
scheduler.start()

Deleting tasks

In the same way you can add tasks you can also remove them with the same ease and there are also different ways of removing them.

Delete task

This is probably the most common way of removing tasks from the scheduler using the task id and the store alias.

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def collect_www_info():
    # Add logic to collect information from the internet
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every hour and minute 1
status = scheduler.add_task(
    id="status",
    fn=check_status,
    trigger=CronTrigger(hour="0-23", minute="1"),
)

# Remove the tasks by ID or task object
scheduler.delete_task("send_newsletter")
scheduler.delete_task(status)

Delete

The delete function is probably more convenient but it requires that you store the Task somewhere ocne the instance is received and for tasks scheduled by thetask decorator this method does not work, instead only the delete task will work.

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")

# delete task
task.delete()

# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")

# delete task
task.delete()

Pause and resume task

As shown above, you can add and remove tasks but you can pause and resume tasks as well. When a task is paused, there is no next time to run since the action is no longer being validate. That can be again reactivated by resuming that same Task.

Like the previous examples, there are also multiple ways of achieving that.

Pause task

Like delete_task, you can pause a task using the id.

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every hour and minute 1
status = scheduler.add_task(
    id="status",
    fn=check_status,
    trigger=CronTrigger(hour="0-23", minute="1"),
)

# Pause the tasks by ID and store alias
scheduler.pause_task("send_newsletter")
scheduler.pause_task(status)

Pause

The same is applied to the simple pause where you can do it directly via task instance.

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")

# Pause task
task.pause()

# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")

# Pause task
task.pause()

Resume task

Resuming a task is as simple as again, passing a task id.

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every hour and minute 1
status = scheduler.add_task(
    id="status",
    fn=check_status,
    trigger=CronTrigger(hour="0-23", minute="1"),
)

# Resume the tasks by ID or task object
scheduler.resume_task("send_newsletter")
scheduler.resume_task(status)

Resume

Same for the resume. You can resume a task directly from the instance.

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")

# resume task
task.resume()

# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")

# Resume task
task.resume()

Check

add_task, delete_task, pause_task and resume_task expect a mandatory task_id parameter as well an optional store name. Why the store name? Because you might want to store the tasks in different places and this points it out the right place.

Update task

As mentioned in the tasks section, internally the scheduler updates the information given to the task and then executes it.

You can update any attribute of the task by calling:

From a task instance

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


# Create a task
task = Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

# Update the task
task.update(
    name="my-new-task-id",
    max_instances=5,
    coalesce=False,
)

From the scheduler

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_email_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every 10 minutes
check_status = scheduler.add_task(
    id="check_status",
    fn=check_status,
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)

# Update the task by id or object
scheduler.update_task("send_email_newsletter", coalesce=False, max_instances=4)
scheduler.update_task(check_status, coalesce=True, max_instances=3)

# Start the scheduler
scheduler.start()

Important note

All attributes can be updated but the id as this is immutable.

Reschedule tasks

You can also reschedule a task if you want/need but by change what it means is changing only the trigger by using:

Reschedule the task instance

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


# Create a task
task = Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

# Reschedule the task
task.reschedule("my-task", trigger="cron", hour=10, minute=5)

Reschedule from the scheduler

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_email_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every 10 minutes
check_status = scheduler.add_task(
    id="check_status",
    fn=check_status,
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)

# Reschedule the tasks
scheduler.reschedule_task("send_email_newsletter", trigger="cron", day_of_week="mon", hour="1")
scheduler.reschedule_task(check_status, trigger="interval", minutes=20)

# Start the scheduler
scheduler.start()

Resume and pause the tasks

Resuming and pausing task processing (all tasks) is also allowed with simple instructions.

Pausing all tasks

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every hour and minute 1
scheduler.add_task(
    id="status",
    fn=check_status,
    trigger=CronTrigger(hour="0-23", minute="1"),
)

# Pause all tasks
scheduler.pause()

Resuming all tasks

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every hour and minute 1
scheduler.add_task(
    id="status",
    fn=check_status,
    trigger=CronTrigger(hour="0-23", minute="1"),
)

# Pause all tasks
scheduler.pause()

# Resume all tasks
scheduler.resume()

Start the scheduler in the paused state

Starting the scheduler without the paused state means without the first wakeup call.

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every hour and minute 1
scheduler.add_task(
    id="status",
    fn=check_status,
    trigger=CronTrigger(hour="0-23", minute="1"),
)

# Pause all tasks
scheduler.start(paused=True)

BaseScheduler

The base of all available schedulers provided by Asyncz and it should be the base of any custom scheduler.

The parameters are the same as the ones described before.

from asyncz.schedulers.base import BaseScheduler

AsyncIOScheduler

This scheduler has a mostly synchronous interface. It is handy for an synchronous environment and supports asynchronous functions. Because of the synchronous interface it has a slight delay when shutting down.

from asyncz.schedulers import AsyncIOScheduler

This scheduler has besides the normal parameters of the scheduler some additional ones.

  • event_loop - An optional. async event_loop to be used. If nothing is provided, it will use the asyncio.get_event_loop() (global) if isolated_event_loop is False.

    Default: None

  • isolated_event_loop - Instead of using an existing event_loop a new one is used.

    Default: False

  • timeout - A timeout used for start and stop the scheduler.

    Default: None

NativeAsyncIOScheduler

This scheduler uses an async start/shutdown interface and is very handy for asynchronous environments because it hasn't a shutdown delay and has less sync/async changes.

from asyncz.schedulers import AsyncIOScheduler

This scheduler has besides the normal parameters of the scheduler some additional ones.

  • isolated_event_loop - Instead of using an existing event_loop a new one is used.

    Default: False

  • timeout - A timeout used for start and stop the scheduler.

    Default: None

Note: in contrast to AsyncIOScheduler it is not possible to provide an event_loop (except via isolated_event_loop).

Custom Scheduler

As mentioned before, Asyncz and the nature of its existence is to be more focused on ASGI and asyncio applications but it is not limited to it.

You can create your own scheduler for any other use case, for example a blocking or background scheduler.

Usually when creating a custom scheduler you must override at least 3 functions.

  • start() - Function used to start/wakeup the scheduler for the first time.
  • shutdown() - Function used to stop the scheduler and release the resources created up start().
  • wakeup() - Manage the timer to notify the scheduler of the changes in the store.

There are also some optional functionalities you can override if you want.

  • create_default_executor - Override this function if you want a different default executor.
from asyncz.schedulers.base import BaseScheduler
from asyncz.typing import Any


class MyCustomScheduler(BaseScheduler):
    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)

    def start(self, paused: bool = False):
        # logic for the start
        ...

    def shutdown(self, wait: bool = True):
        # logic for the shutdown
        ...

    def wakeup(self):
        # logic for the wakeup
        ...

    def create_default_executor(self):
        # logic for your default executor
        ...

Limit the number of currently executing instances

By default, only one instance of each Task is allowed to run at the same time. To change that when creating a task you can set the max_instances to the number you desire and this will let the scheduler know how many should run concurrently.

Events

It is also possible to attach event listeners to the schedule. The events are triggered on specific occasions and may carry some additional information with them regarding detauls of that specific event. Check the events section to see the available events.

from datetime import timezone as tz
from loguru import logger

from asyncz.events.constants import TASK_ADDED, TASK_REMOVED
from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def my_custom_listener(event):
    if not event.exception:
        logger.info("All good")
    else:
        logger.exception("Problem with the task")


# Add event listener
scheduler.add_listener(my_custom_listener, TASK_ADDED | TASK_REMOVED)

Final thoughts

Asyncz since it is a revamp, simplified and rewritten version of APScheduler, you will find very common ground and similarities to it and that is intentional as you shouldn't be unfamiliar with a lot of concepts if you are already familiar with APScheduler.