Schedulers¶
Schedulers are the thing that makes all magic and binds everything together. You can see it as a glue.
Usually the developer does not deal/handle the stores, executors or even triggers manually, instead that is managed by the scheduler that acts as an interface amongst them all.
Asyncz being dedicated to ASGI and asyncio brings the AsyncIOScheduler out of the box and only supports this one natively but like everything in Asyncz, you can also create your own custom scheduler that does not necessarily need to be for async. You can build your own scheduler for blocking/background applications.
In fact, Asyncz is used by Esmerald as internal scheduling system and uses the supported scheduler from Asyncz to perform its tasks.
Parameters¶
All schedulers contain at least:
-
global_config - A python dictionary containing configurations for the schedulers. See the examples of how to configure a scheduler.
Default:
None
-
kwargs - Any keyword parameters being passed to the scheduler up instantiation.
Default:
None
Configuring the scheduler¶
Due its simplificy, Asyncz provides some ways of configuring the scheduler for you.
from asyncz.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
What is happening here?:
When you create the scheduler like the examples above, it is creating an AsyncIOScheduler
with a MemoryStore named default
and starting the
asyncio event loop.
Example configuration¶
Let us assume you now need a very custom configuration with more than one store, executors and custom settings.
- Two stores - A mongo and a redis.
- Two executors - An asyncio and a thread pool.
- Coalesce turned off for new tasks by default.
- Maximum instance limiting to 4 for new tasks.
First option¶
The first way of doing the configuration is in a simple pythonic fashion.
import pytz
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.stores.mongo import MongoDBStore
from asyncz.stores.redis import RedisStore
# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"mongo": MongoDBStore(), "default": RedisStore(database=0)}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {"default": AsyncIOExecutor(), "threadpool": ThreadPoolExecutor(max_workers=20)}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
# Start the scheduler
scheduler = AsyncIOScheduler(
stores=stores, executors=executors, task_defaults=task_defaults, timezone=pytz.utc
)
Second option¶
The second option is by starting the scheduler and injecting a dictionary directly upon instantiation.
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Start the scheduler
scheduler = AsyncIOScheduler(
{
"asyncz.stores.mongo": {"type": "mongodb"},
"asyncz.stores.default": {"type": "redis", "database": "0"},
"asyncz.executors.threadpool": {
"max_workers": "20",
"class": "asyncz.executors.threadpool:ThreadPoolExecutor",
},
"asyncz.executors.default": {"class": "asyncz.executors.asyncio::AsyncIOExecutor"},
"asyncz.task_defaults.coalesce": "false",
"asyncz.task_defaults.max_instances": "3",
"asyncz.task_defaults.timezone": "UTC",
},
)
Third option¶
The third option is by starting the scheduler and use the configure
method.
import pytz
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.stores import MongoDBStore, RedisStore
# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"mongo": MongoDBStore(), "default": RedisStore(database=0)}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {"default": AsyncIOExecutor(), "threadpool": ThreadPoolExecutor(max_workers=20)}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
# Start the scheduler
scheduler = AsyncIOScheduler()
## Add some tasks here or anything else (for instance 3 tasks)
scheduler.add_task(...)
scheduler.add_task(...)
scheduler.add_task(...)
scheduler.setup(stores=stores, executors=executors, task_defaults=task_defaults, timezone=pytz.utc)
Starting and stopping the scheduler¶
Every scheduler inherits from the BaseScheduler and therefore implement the
mandatory functions such as start
and shutdown
.
To start the scheduler simply run:
from asyncz.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
# Start the scheduler
scheduler.start()
To stop the scheduler simply run:
from asyncz.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
# Shutdown the scheduler
scheduler.shutdown()
Adding tasks¶
A scheduler to work needs tasks, of course and Asyncz offers some ways of adding tasks into the scheduler.
There is also a third option but that is related with the integration with ASGI frameworks, for instance esmerald which it should not be used in this agnostic context.
Add tasks¶
Adding a task via add_task
is the most common and probably the one you will be using more times
than the scheduled_task
.
The add_task
returns an instance of Task.
So, how can you add a task?
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def collect_www_info():
# Add logic to collect information from the internet
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every 2 minutes
scheduler.add_task(
fn=collect_www_info,
trigger=IntervalTrigger(minutes=2),
max_instances=1,
replace_existing=True,
coalesce=True,
)
# Run every 10 minutes
scheduler.add_task(
fn=check_status,
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# Run every 10 minutes collect_www_info before sending the newsletter
feed_data = collect_www_info()
scheduler.add_task(
fn=send_email_newsletter,
args=[feed_data],
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# Start the scheduler
scheduler.start()
What happen here is actually very simple. We created an instance of the AsyncIOScheduler
and
added the functions send_email_newsletter
, collect_www_info
, check_status
to the scheduler
and started it.
Why then passing CronTrigger
and IntervalTrigger
instances instead of simply passing cron
or interval
?
Well, we want to pass some attributes to the object and this way makes it cleaner and simpler.
When adding tasks there is not a specific order. You can add tasks at any given time. If the scheduler is not yet running, once it does it will add the tasks to it.
Scheduled tasks¶
Scheduled tasks works in the same way as add_tasks with the unique difference that
the replacing_existing
is always True
and it is used as a decorator.
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
# Run every Monday, Wednesday and Friday
@scheduler.scheduled_task(
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5")
)
def send_email_newsletter():
# Add logic to send emails here
...
# Run every 2 minutes
@scheduler.scheduled_task(
trigger=IntervalTrigger(minutes=2),
max_instances=1,
coalesce=True,
)
def collect_www_info():
# Add logic to collect information from the internet
...
@scheduler.scheduled_task(
trigger=IntervalTrigger(minutes=10),
max_instances=1,
coalesce=False,
)
def check_status():
# Logic to check a given status of whatever needed
...
# Start the scheduler
scheduler.start()
Deleting tasks¶
In the same way you can add tasks you can also remove them with the same ease and there are also different ways of removing them.
Delete task¶
This is probably the most common way of removing tasks from the scheduler using the task id and the store alias.
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def collect_www_info():
# Add logic to collect information from the internet
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every hour and minute 1
scheduler.add_task(
id="status",
fn=check_status,
trigger=CronTrigger(hour="0-23", minute="1"),
)
# Remove the tasks by ID and store alias
scheduler.delete_task("send_newsletter")
scheduler.delete_task("status")
Delete¶
The delete
function is probably more convenient but it requires that you store the Task
somewhere ocne the instance is received and for tasks scheduled by scheduled task
this method does not work, instead only the delete task will work.
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")
# delete task
task.delete()
# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")
# delete task
task.delete()
Pause and resume task¶
As shown above, you can add and remove tasks but you can pause and resume tasks as well. When a task is paused, there is no next time to run since the action is no longer being validate. That can be again reactivated by resuming that same Task.
Like the previous examples, there are also multiple ways of achieving that.
Pause task¶
Like delete_task, you can pause a task using the id.
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every hour and minute 1
scheduler.add_task(
id="status",
fn=check_status,
trigger=CronTrigger(hour="0-23", minute="1"),
)
# Pause the tasks by ID and store alias
scheduler.pause_task("send_newsletter")
scheduler.pause_task("status")
Pause¶
The same is applied to the simple pause where you can do it directly via task instance.
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")
# Pause task
task.pause()
# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")
# Pause task
task.pause()
Resume task¶
Resuming a task is as simple as again, passing a task id.
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every hour and minute 1
scheduler.add_task(
id="status",
fn=check_status,
trigger=CronTrigger(hour="0-23", minute="1"),
)
# Resume the tasks by ID and store alias
scheduler.resume_task("send_newsletter")
scheduler.resume_task("status")
Resume¶
Same for the resume. You can resume a task directly from the instance.
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")
# resume task
task.resume()
# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")
# Resume task
task.resume()
Check
add_task, delete_task, pause_task and resume_task expect a mandatory task_id parameter as well an optional store name. Why the store name? Because you might want to store the tasks in different places and this points it out the right place.
Update task¶
As mentioned in the tasks section, internally the scheduler updates the information given to the task and then executes it.
You can update any attribute of the task by calling:
- asyncz.tasks.Task.update() - The update method from a task instance.
- update_task - The function from the scheduler.
From a task instance¶
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger
# Create a scheduler
scheduler = AsyncIOScheduler()
def check_status():
# Logic to check statuses
...
# Create a task
task = Task(
id="my-task",
fn=check_status,
name="my-func",
scheduler=scheduler,
trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
max_instances=3,
coalesce=True,
)
# Update the task
task.update(
name="my-new-task-id",
max_intances=5,
coalesce=False,
)
From the scheduler¶
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_email_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every 10 minutes
scheduler.add_task(
id="check_status",
fn=check_status,
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# Update the task
scheduler.update_task("send_email_newsletter", coalesce=False, max_instances=4)
scheduler.update_task("check_status", coalesce=True, max_instances=3)
# Start the scheduler
scheduler.start()
Important note¶
All attributes can be updated but the id as this is immutable.
Reschedule tasks¶
You can also reschedule a task if you want/need but by change what it means is changing only the trigger by using:
- asyncz.tasks.Taskk.reschedule() - The reschedule task from the Task instance. The trigger must be the alias of the trigger object.
- reschedule_task - The function from the scheduler instance to reschedule the task.
Reschedule the task instance¶
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger
# Create a scheduler
scheduler = AsyncIOScheduler()
def check_status():
# Logic to check statuses
...
# Create a task
task = Task(
id="my-task",
fn=check_status,
name="my-func",
scheduler=scheduler,
trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
max_instances=3,
coalesce=True,
)
# Reschedule the task
task.reschedule("my-task", trigger="cron", hour=10, minute=5)
Reschedule from the scheduler¶
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_email_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every 10 minutes
scheduler.add_task(
id="check_status",
fn=check_status,
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# Reschedule the tasks
scheduler.reschedule_task("send_email_newsletter", trigger="cron", day_of_week="mon", hour="1")
scheduler.reschedule_task("check_status", trigger="interval", minutes=20)
# Start the scheduler
scheduler.start()
Resume and pause the tasks¶
Resuming and pausing task processing (all tasks) is also allowed with simple instructions.
Pausing all tasks¶
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every hour and minute 1
scheduler.add_task(
id="status",
fn=check_status,
trigger=CronTrigger(hour="0-23", minute="1"),
)
# Pause all tasks
scheduler.pause()
Resuming all tasks¶
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every hour and minute 1
scheduler.add_task(
id="status",
fn=check_status,
trigger=CronTrigger(hour="0-23", minute="1"),
)
# Pause all tasks
scheduler.pause()
# Resume all tasks
scheduler.resume()
Start the scheduler in the paused state¶
Starting the scheduler without the paused state means without the first wakeup call.
import pytz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every hour and minute 1
scheduler.add_task(
id="status",
fn=check_status,
trigger=CronTrigger(hour="0-23", minute="1"),
)
# Pause all tasks
scheduler.start(paused=True)
BaseScheduler¶
The base of all available schedulers provided by Asyncz and it should be the base of any custom scheduler.
The parameters are the same as the ones described before.
from asyncz.schedulers.base import BaseScheduler
AsyncIOScheduler¶
This scheduler is the only one (at least for now) supported by Asyncz and as mentioned before, it inherits from the BaseScheduler.
from asyncz.schedulers import AsyncIOScheduler
This special scheduler besides the normal parameters of the scheduler, also contains some additional ones.
-
event_loop - An optional. async event_loop to be used. If nothing is provided, it will use the
asyncio.get_event_loop()
(global).Default:
None
-
timeout - A timeout used for start and stop the scheduler.
Default:
None
Custom Scheduler¶
As mentioned before, Asyncz and the nature of its existence is to be more focused on ASGI and asyncio applications but it is not limited to it.
You can create your own scheduler for any other use case, for example a blocking or background scheduler.
Usually when creating a custom scheduler you must override at least 3 functions.
- start() - Function used to start/wakeup the scheduler for the first time.
- shutdown() - Function used to stop the scheduler and release the resources created up
start()
. - wakeup() - Manage the timer to notify the scheduler of the changes in the store.
There are also some optional functionalities you can override if you want.
- create_default_executor - Override this function if you want a different default executor.
from asyncz.schedulers.base import BaseScheduler
from asyncz.typing import DictAny
class MyCustomScheduler(BaseScheduler):
def __init__(self, **kwargs: DictAny) -> None:
super().__init__(**kwargs)
def start(self, paused: bool = False):
# logic for the start
...
def shutdown(self, wait: bool = True):
# logic for the shutdown
...
def wakeup(self):
# logic for the wakeup
...
def create_default_executor(self):
# logic for your default executor
...
Limit the number of currently executing instances¶
By default, only one instance of each Task is allowed to run at the same time.
To change that when creating a task you can set the max_instances
to the number you desire and
this will let the scheduler know how many should run concurrently.
Events¶
It is also possible to attach event listeners to the schedule. The events are triggered on specific occasions and may carry some additional information with them regarding detauls of that specific event. Check the events section to see the available events.
import pytz
from loguru import logger
from asyncz.events.constants import TASK_ADDED, TASK_REMOVED
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=pytz.utc)
def my_custom_listener(event):
if not event.exception:
logger.info("All good")
else:
logger.exception("Problem with the task")
# Add event listener
scheduler.add_listener(my_custom_listener, TASK_ADDED | TASK_REMOVED)
Final thoughts¶
Asyncz since it is a revamp, simplified and rewritten version of APScheduler, you will find very common ground and similarities to it and that is intentional as you shouldn't be unfamiliar with a lot of concepts if you are already familiar with APScheduler.