Skip to content

Triggers

Triggers decide when a task should run next. Every task has exactly one trigger instance.

Built-in trigger aliases

  • date
  • interval
  • cron
  • and
  • or
  • shutdown

DateTrigger

Runs a task once at a specific point in time.

  • Alias: date
from datetime import date
import logging

from asyncz.schedulers import AsyncIOScheduler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


def my_task(number):
    logger.info(number)


# Execute the task on December 25th, 2022
scheduler.add_task(my_task, run_at=date(2022, 12, 24), args=[25])

scheduler.start()
import logging

from asyncz.schedulers import AsyncIOScheduler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


def my_task(number):
    logger.info(number)


# Execute the task on December 25th, 2022
scheduler.add_task(my_task, run_at="2022-12-25 00:01:00", args=[25])

scheduler.start()

If you omit run_at, Asyncz schedules the task for immediate execution.

import logging

from asyncz.schedulers import AsyncIOScheduler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


def my_task(number):
    logger.info(number)


# Execute the task on December 25th, 2022
scheduler.add_task(my_task, args=[25])

scheduler.start()

IntervalTrigger

Runs a task repeatedly at a fixed interval.

  • Alias: interval

Supported parameters include:

  • weeks
  • days
  • hours
  • minutes
  • seconds
  • start_at
  • end_at
  • timezone
  • jitter
import logging

from asyncz.schedulers import AsyncIOScheduler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("My task working")


# Run every 5 hours
scheduler.add_task(my_task, "interval", hours=5)

scheduler.start()
import logging

from asyncz.schedulers import AsyncIOScheduler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("My task working")


# Run every 5 hours and limits the window
scheduler.add_task(
    my_task, "interval", hours=5, start_at="2022-12-24 00:00:00", end_at="2022-12-25 23:59:59"
)

scheduler.start()
import logging

from asyncz.schedulers import AsyncIOScheduler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


# Run every 5 hours
@scheduler.add_task("interval", hours=5, id="my_task_id")
def my_task():
    logger.info("My task working")


scheduler.start()
import logging

from asyncz.schedulers import AsyncIOScheduler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("My task working")


# Run every 5 hours and limits the window
scheduler.add_task(my_task, "interval", hours=5, jitter=200)

scheduler.start()

CronTrigger

Runs a task according to cron-style rules.

  • Alias: cron

Useful parameters include:

  • year
  • month
  • day
  • week
  • day_of_week
  • hour
  • minute
  • second
  • start_at
  • end_at
  • timezone
  • jitter
import logging

from asyncz.schedulers import AsyncIOScheduler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("Hello, world!")


# Schedules task_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
scheduler.add_task(my_task, "cron", month="6-8,11-12", day="3rd fri", hour="0-3")

scheduler.start()
import logging

from asyncz.schedulers import AsyncIOScheduler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("Hello, world!")


# Runs from Monday to Friday at 1:00 (am) until 2022-12-25 00:00:00
scheduler.add_task(my_task, "cron", day_of_week="mon-fri", hour=1, end_at="2022-12-25")

scheduler.start()
import logging

from asyncz.schedulers import AsyncIOScheduler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


@scheduler.add_task("cron", hours=1, id="my_task_id", day="last sat")
def my_task():
    logger.info("My task working")


scheduler.start()
import logging

from asyncz.schedulers import AsyncIOScheduler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("My task working")


scheduler.add_task(my_task, "cron", hours="*", jitter=200)

scheduler.start()

Combination triggers

AndTrigger

Returns the earliest next time that all child triggers agree on.

  • Alias: and
import logging

from asyncz.schedulers import AsyncIOScheduler
from asyncz.triggers import AndTrigger, CronTrigger, IntervalTrigger

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("Hello, world!")


# Combine the triggers
trigger = AndTrigger(triggers=[IntervalTrigger(hours=5), CronTrigger(day_of_week="mon, tue")])

# Add the trigger to the task
scheduler.add_task(my_task, trigger)

OrTrigger

Returns the earliest next time produced by any child trigger.

  • Alias: or
import logging

from asyncz.schedulers import AsyncIOScheduler
from asyncz.triggers import CronTrigger, OrTrigger

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("Hello, world!")


# Combine the triggers
trigger = OrTrigger(
    triggers=[CronTrigger(day_of_week="sat", hour=5), CronTrigger(day_of_week="sun", hour=10)]
)

# Add the trigger to the task
scheduler.add_task(my_task, trigger)

ShutdownTrigger

Runs once when the scheduler shuts down. This is useful for cleanup hooks and lifecycle-style task patterns.

  • Alias: shutdown
import logging

from asyncz.schedulers import AsyncIOScheduler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

scheduler = AsyncIOScheduler()


def my_task():
    logger.info("Goodbye!")


# Run on shutdown in an own thread
scheduler.add_task(my_task, "shutdown")

scheduler.start()

Custom triggers

Custom triggers must subclass BaseTrigger and implement get_next_trigger_time(...).

from datetime import datetime, tzinfo
import logging
from typing import Optional, Union

from asyncz.schedulers import AsyncIOScheduler
from asyncz.triggers.base import BaseTrigger

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class CustomTrigger(BaseTrigger):
    alias: str = "custom"

    def get_next_trigger_time(
        self, timezone: tzinfo, previous_time: datetime, now: Optional[datetime] = None
    ) -> Union[datetime, None]:
        # Add logic for the next trigger time of the custom trigger
        ...


def get_info():
    logger.info("info...")


# Create an instance
trigger = CustomTrigger(...)

# Create a scheduler
scheduler = AsyncIOScheduler()

# Add custom trigger
scheduler.add_task(get_info, trigger)