Skip to content

Triggers

In simple terms, a trigger is what contains the logic for the scheduler to execute. Internal tasks contain their own trigger which checks when a task should be the next to run.

There are five types of triggers available within the Asyncz.

All the triggers subclass the BaseTrigger and any custom trigger should be the same.

DateTrigger

The DateTrigger is the simplest way of scheduling a task. It schedules a task to be executed once at a given time.

alias - date

Parameters

  • run_at - The date/time to run the task at.

    Default: None

  • timezone - The time zone for the run_at if it does not have one already.

Default: None

Examples

from datetime import date

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()


def my_task(number):
    logger.info(number)


# Execute the task on December 25th, 2022
scheduler.add_task(my_task, run_at=date(2022, 12, 24), args=[25])

scheduler.start()

You can run the trigger by passing a date as text as well.

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()


def my_task(number):
    logger.info(number)


# Execute the task on December 25th, 2022
scheduler.add_task(my_task, run_at="2022-12-25 00:01:00", args=[25])

scheduler.start()

Do you want to run immediatly after adding the task? Don't specify any date then.

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()


def my_task(number):
    logger.info(number)


# Execute the task on December 25th, 2022
scheduler.add_task(my_task, args=[25])

scheduler.start()

IntervalTrigger

If you want to run the task periodically, this is the trigger for you then. The advantage of this trigger is that you can specify also the start_at and end_at making it more custom for your needs.

alias - interval

Parameters

  • weeks - Number of weeks to wait.

    Default: 0

  • days - Number of days to wait.

    Default: 0

  • hours - Number of hours to wait.

    Default: 0

  • minutes - Number of minutes to wait.

    Default: 0

  • seconds - Number of seconds to wait.

    Default: 0

  • start_at - Starting point for the interval calculation.

    Default: None

  • end_at - Latest possible date/time to trigger on.

    Default: None

  • timezone - Time zone to use gor the date/time calculations.

    Default: None

  • jitter - Delay the task execution by jitter seconds at most.

    Default: None

Examples

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("My task working")


# Run every 5 hours
scheduler.add_task(my_task, "interval", hours=5)

scheduler.start()

Use the start_at and end_at to provide a limit in which the scheduler should run.

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("My task working")


# Run every 5 hours and limits the window
scheduler.add_task(
    my_task, "interval", hours=5, start_at="2022-12-24 00:00:00", end_at="2022-12-25 23:59:59"
)

scheduler.start()

What about using the scheduled_task decorator?

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()


# Run every 5 hours
@scheduler.scheduled_task("interval", hours=5, id="my_task_id")
def my_task():
    logger.info("My task working")


scheduler.start()

What is the jitter? A simple random component that can be added to the execution. This can be useful if there are multiple server running tasks and you don't want them to run the same task at the exact same time or to prevent the same task to run concurrently.

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("My task working")


# Run every 5 hours and limits the window
scheduler.add_task(my_task, "interval", hours=5, jitter=200)

scheduler.start()

CronTrigger

This is similar to a UNIX cron like system and the most powerful trigger in Asyncz. The possiblities of this cron are endless and this can be also frightning if you are not familiar with how UNIX crons operate.

Tip

Get familiar with the UNIX cron and take advantage of the CronTrigger with ease.

A visual explanation of how a UNIX cron is might also help.

┌───────────── minute (0 - 59) ┌───────────── hour (0 - 23)  ┌───────────── day of the month (1 - 31)   ┌───────────── month (1 - 12)    ┌───────────── day of the week (0 - 6) (Sunday to Saturday;                                       7 is also Sunday on some systems)    │
│    │
* * * * * <command to execute>

Like the IntervalTrigger, you can also specify a start_at and an end_at parameters.

Since Asyncz is APScheduled revamped, it also inherited the CronTrigger in the way it was beautifully done which means you can omit fields that you don't need.

alias - cron

Parameters

  • year - 4-digit value.

    Default: None

  • month - Month (1-12).

    Default: None

  • day - Day of the month (1-31).

    Default: None

  • week - ISO week (1-53).

    Default: None

  • day_of_week - Number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun).

    Default: None

  • hour - Hour (0-23).

    Default: None

  • minute - Minute (0-59).

    Default: None

  • second - Second (0-59).

    Default: None

  • start_at - Earliest possible date/time to trigger on (inclusive).

    Default: None

  • end_at - Latest possible date/time to trier on (inclusive).

    Default: None

  • timezone - Time zone to use for the date/time calculations (defaults to scheduler timezone).

    Default: None

  • jitter - Delay the task executions by jitter seconds at most.

    Default: None

Expressions

The table below lists all the available expressions for the use in the fields from year to second.

Multiple expressions can be given in a single field (comma separated).

Expression Field Description
* any Fire on every clause
*/n any For every n values, staring from the minimum
a-n any Fire on any value within the a-n range (a must be smaller than n)
a-b/n any Fire every n values within the a-b range
xth y day Fire on the x -th occurrence of weekday y within the month
last x day Fire on the last occurrence of weekday x within the month
last day Fire on the last day within the month
x,y,z any Fire on any matching expression; can combine any number of any of the above expressions

Info

The month and day_of_week accept abbreviated English month and weekday names. Example: jan - dec and mon - sun respectively.

Daylight saving time behaviour

As mentioned numerous times, Asyncz comes from APScheduler and therefore similar behaviours were implemented and with also means the cron trigger works with the so-called "wall clock" time. If the selected time zone observes DST (daylight saving time), you should be aware that it may cause unexpected behavior with the cron trigger when entering or leaving DST.

Example of a problematic schedule:

# In the Europe/London timezone, this will not execute at all on the last sunday morning of March
# Likewise, it will execute twice on the last sunday morning of October
scheduler.add_task(my_task, 'cron', hour=4, minute=25)

Examples

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("Hello, world!")


# Schedules task_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
scheduler.add_task(my_task, "cron", month="6-8,11-12", day="3rd fri", hour="0-3")

scheduler.start()

Use the start_at and end_at to provide a limit in which the scheduler should run.

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("Hello, world!")


# Runs from Monday to Friday at 1:00 (am) until 2022-12-25 00:00:00
scheduler.add_task(my_task, "cron", day_of_week="mon-fri", hour=1, end_at="2022-12-25")

scheduler.start()

What about using the scheduled_task decorator?

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()


@scheduler.scheduled_task("cron", hours=1, id="my_task_id", day="last sat")
def my_task():
    logger.info("My task working")


scheduler.start()

What about some jitter?

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("My task working")


scheduler.add_task(my_task, "cron", hours="*", jitter=200)

scheduler.start()

Combination

The combinator is a special instance that allows joining different triggers in one place. Imagine in SQL when you use the AND and OR operators. This works in a similar fashion.

There are two combinators available, the OrTrigger and the AndTrigger.

AndTrigger

Always returns the earliest next trigger time that all the given trigger can agree on. The trigger is considered to be finished when any of the given triggers has finished the schedule.

alias - and

Parameters

  • triggers - List of triggers to combine.
  • jitter - Delay the task execution by the jitter seconds at most.

    Default: None

Examples
from loguru import logger

from asyncz.schedulers import AsyncIOScheduler
from asyncz.triggers import AndTrigger, CronTrigger, IntervalTrigger

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("Hello, world!")


# Combine the triggers
trigger = AndTrigger(triggers=[IntervalTrigger(hours=5), CronTrigger(day_of_week="mon, tue")])

# Add the trigger to the task
scheduler.add_task(my_task, trigger)

OrTrigger

Always returns the earliest next trigger produced by any of the given triggers. The trigger is considered to be finished when all of the given triggers have finished their schedule.

alias - or

Parameters

  • triggers - List of triggers to combine.
  • jitter - Delay the task execution by the jitter seconds at most.

    Default: None

Examples
from loguru import logger

from asyncz.schedulers import AsyncIOScheduler
from asyncz.triggers import CronTrigger, OrTrigger

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("Hello, world!")


# Combine the triggers
trigger = OrTrigger(
    triggers=[CronTrigger(day_of_week="sat", hour=5), CronTrigger(day_of_week="sun", hour=10)]
)

# Add the trigger to the task
scheduler.add_task(my_task, trigger)

ShutdownTrigger

The ShutdownTrigger is a special trigger: it is executed once when shutting down the scheduler and uses only the function as well as it's arguments and the store of a task.

It is useful to implement a lifecycle pattern.

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()

def my_task(): logger.info("Goodbye!")

Run on shutdown in an own thread

scheduler.add_task(my_task, "shutdown")

scheduler.start()

For easing the integration in lifecycle generators, the exceptions StopIteration and StopAsyncIteration are ignored.

BaseTrigger

All the triggers subclass the BaseTrigger and any custom trigger must be the same.

from asyncz.triggers.base import BaseTrigger

Custom trigger

If you see the triggers provided are not good enough for your use cases, you can always create one of your own and you must implement the get_next_trigger_time() on your custom trigger.

from datetime import datetime, tzinfo
from typing import Optional, Union

from loguru import logger

from asyncz.schedulers import AsyncIOScheduler
from asyncz.triggers.base import BaseTrigger


class CustomTrigger(BaseTrigger):
    alias: str = "custom"

    def get_next_trigger_time(
        self, timezone: tzinfo, previous_time: datetime, now: Optional[datetime] = None
    ) -> Union[datetime, None]:
        # Add logic for the next trigger time of the custom trigger
        ...


def get_info():
    logger.info("info...")


# Create an instance
trigger = CustomTrigger(...)

# Create a scheduler
scheduler = AsyncIOScheduler()

# Add custom trigger
scheduler.add_task(get_info, trigger)

Alias

Every trigger has an alias that sometimes and based on the given examples is passed instead of the object instance itself.