Triggers¶
Triggers decide when a task should run next. Every task has exactly one trigger instance.
Built-in trigger aliases¶
dateintervalcronandorshutdown
DateTrigger¶
Runs a task once at a specific point in time.
- Alias:
date
from datetime import date
import logging
from asyncz.schedulers import AsyncIOScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def my_task(number):
logger.info(number)
# Execute the task on December 25th, 2022
scheduler.add_task(my_task, run_at=date(2022, 12, 24), args=[25])
scheduler.start()
import logging
from asyncz.schedulers import AsyncIOScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def my_task(number):
logger.info(number)
# Execute the task on December 25th, 2022
scheduler.add_task(my_task, run_at="2022-12-25 00:01:00", args=[25])
scheduler.start()
If you omit run_at, Asyncz schedules the task for immediate execution.
import logging
from asyncz.schedulers import AsyncIOScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def my_task(number):
logger.info(number)
# Execute the task on December 25th, 2022
scheduler.add_task(my_task, args=[25])
scheduler.start()
IntervalTrigger¶
Runs a task repeatedly at a fixed interval.
- Alias:
interval
Supported parameters include:
weeksdayshoursminutessecondsstart_atend_attimezonejitter
import logging
from asyncz.schedulers import AsyncIOScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def my_task():
logger.info("My task working")
# Run every 5 hours
scheduler.add_task(my_task, "interval", hours=5)
scheduler.start()
import logging
from asyncz.schedulers import AsyncIOScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def my_task():
logger.info("My task working")
# Run every 5 hours and limits the window
scheduler.add_task(
my_task, "interval", hours=5, start_at="2022-12-24 00:00:00", end_at="2022-12-25 23:59:59"
)
scheduler.start()
import logging
from asyncz.schedulers import AsyncIOScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
# Run every 5 hours
@scheduler.add_task("interval", hours=5, id="my_task_id")
def my_task():
logger.info("My task working")
scheduler.start()
import logging
from asyncz.schedulers import AsyncIOScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def my_task():
logger.info("My task working")
# Run every 5 hours and limits the window
scheduler.add_task(my_task, "interval", hours=5, jitter=200)
scheduler.start()
CronTrigger¶
Runs a task according to cron-style rules.
- Alias:
cron
Useful parameters include:
yearmonthdayweekday_of_weekhourminutesecondstart_atend_attimezonejitter
import logging
from asyncz.schedulers import AsyncIOScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def my_task():
logger.info("Hello, world!")
# Schedules task_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
scheduler.add_task(my_task, "cron", month="6-8,11-12", day="3rd fri", hour="0-3")
scheduler.start()
import logging
from asyncz.schedulers import AsyncIOScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def my_task():
logger.info("Hello, world!")
# Runs from Monday to Friday at 1:00 (am) until 2022-12-25 00:00:00
scheduler.add_task(my_task, "cron", day_of_week="mon-fri", hour=1, end_at="2022-12-25")
scheduler.start()
import logging
from asyncz.schedulers import AsyncIOScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
@scheduler.add_task("cron", hours=1, id="my_task_id", day="last sat")
def my_task():
logger.info("My task working")
scheduler.start()
import logging
from asyncz.schedulers import AsyncIOScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def my_task():
logger.info("My task working")
scheduler.add_task(my_task, "cron", hours="*", jitter=200)
scheduler.start()
Combination triggers¶
AndTrigger¶
Returns the earliest next time that all child triggers agree on.
- Alias:
and
import logging
from asyncz.schedulers import AsyncIOScheduler
from asyncz.triggers import AndTrigger, CronTrigger, IntervalTrigger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def my_task():
logger.info("Hello, world!")
# Combine the triggers
trigger = AndTrigger(triggers=[IntervalTrigger(hours=5), CronTrigger(day_of_week="mon, tue")])
# Add the trigger to the task
scheduler.add_task(my_task, trigger)
OrTrigger¶
Returns the earliest next time produced by any child trigger.
- Alias:
or
import logging
from asyncz.schedulers import AsyncIOScheduler
from asyncz.triggers import CronTrigger, OrTrigger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def my_task():
logger.info("Hello, world!")
# Combine the triggers
trigger = OrTrigger(
triggers=[CronTrigger(day_of_week="sat", hour=5), CronTrigger(day_of_week="sun", hour=10)]
)
# Add the trigger to the task
scheduler.add_task(my_task, trigger)
ShutdownTrigger¶
Runs once when the scheduler shuts down. This is useful for cleanup hooks and lifecycle-style task patterns.
- Alias:
shutdown
import logging
from asyncz.schedulers import AsyncIOScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
def my_task():
logger.info("Goodbye!")
# Run on shutdown in an own thread
scheduler.add_task(my_task, "shutdown")
scheduler.start()
Custom triggers¶
Custom triggers must subclass BaseTrigger and implement get_next_trigger_time(...).
from datetime import datetime, tzinfo
import logging
from typing import Optional, Union
from asyncz.schedulers import AsyncIOScheduler
from asyncz.triggers.base import BaseTrigger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CustomTrigger(BaseTrigger):
alias: str = "custom"
def get_next_trigger_time(
self, timezone: tzinfo, previous_time: datetime, now: Optional[datetime] = None
) -> Union[datetime, None]:
# Add logic for the next trigger time of the custom trigger
...
def get_info():
logger.info("info...")
# Create an instance
trigger = CustomTrigger(...)
# Create a scheduler
scheduler = AsyncIOScheduler()
# Add custom trigger
scheduler.add_task(get_info, trigger)