Tasks¶
Big part of the Asyncz are the Tasks. Those are special objects with instructions and parameters that are created/sent to the scheduler and then executed.
Importing a task is as simple as:
from asyncz.tasks import Task
Parameters¶
- id - The unique identifier of this task.
- name - The description of this task.
- fn - The callable function to execute.
- args - Positional arguments to the callable.
- kwargs - Keyword arguments to the callable.
- coalesce - Whether to only run the task once when several run times are due.
- trigger - The trigger object that controls the schedule of this task.
- executor - The name of the executor that will run this task.
- mistrigger_grace_time - The time (in seconds) how much this task's execution is allowed to be late (None means "allow the task to run no matter how late it is").
- max_instances - The maximum number of concurrently executing instances allowed for this task.
- next_run_time - The next scheduled run time of this task. (Note: this has not the pause mode effect like in add_task)
Create a task¶
Creating a task is as simple as:
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger
# Create a scheduler
scheduler = AsyncIOScheduler()
def check_status():
# Logic to check statuses
...
Task(
id="my-task",
fn=check_status,
name="my-func",
scheduler=scheduler,
trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
max_instances=3,
coalesce=True,
)
scheduler.start()
# or manually submit one with a changed trigger
t = Task(
id="my-task2",
fn=check_status,
max_instances=1,
coalesce=True,
)
scheduler.add_task(
t, trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1)
)
Update a task¶
You can also update a specific task and its properties directly.
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger
# Create a scheduler
scheduler = AsyncIOScheduler()
def check_status():
# Logic to check statuses
...
# Create a task
task = Task(
id="my-task",
fn=check_status,
name="my-func",
scheduler=scheduler,
trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
max_instances=3,
coalesce=True,
)
# Update the task
task.update(
name="my-new-task-id",
max_instances=5,
coalesce=False,
)
Internally the task is using the given scheduler to be updated and then executed.
Warning
All attributes can be updated but the id as this is immutable.
Reschedule a task¶
You can also reschedule a task when need and by that what it means is changing its trigger only.
The trigger must be the alias of the trigger object.
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger
# Create a scheduler
scheduler = AsyncIOScheduler()
def check_status():
# Logic to check statuses
...
# Create a task
task = Task(
id="my-task",
fn=check_status,
name="my-func",
scheduler=scheduler,
trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
max_instances=3,
coalesce=True,
)
# Reschedule the task
task.reschedule("my-task", trigger="cron", hour=10, minute=5)
Tasks with lifecyle¶
Sometimes tasks need a setup and a cleanup routine. This is possible to implement via generators, no matter if asynchronous or synchronous.
Generators have two methods: send, throw (for asynchronous generators there are asend and athrow). The send methods take a parameter which is returned from yield:
def generator():
print(yield)
g = generator()
# start generator, it is required to be None
g.send(None)
# raises StopIteration
g.send("hello world")
Now let's adapt this for tasks with lifecycle:
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
# Create a scheduler
scheduler = AsyncIOScheduler()
def lifecycle_task():
# setup
...
# we have to mask generator send so it could be set to a task
scheduler.add_task(make_function(generator.send), args=[False], trigger="shutdown")
running = yield
while running:
# do something
running = yield
# cleanup
# setup task
generator = lifecycle_task()
generator.send(None)
# Run every 5 minutes
scheduler.add_task(make_function(generator.send), args=[True], trigger="interval", minutes=5)
scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()
Note the make_function
and make_async_function
decorators. They are required because the generator
methods have no signature. It is especially required for asend/athrow.
This is also possible with asynchronous generators.
Warning
Lifecycle tasks are only valid for the memory store. Generators cannot be pickled.
Tasks with lifecycle in multi-processing environments¶
Natively we cannot add tasks with a lifecycle to other stores than the MemoryStore. But we can combine both paradigmas with multiple stores or keeping the lifecycle task out of stores.
We have a memory store for the lifecycle and optionally an other store for the ticks in the lifecycle.
For multi-process synchronization we have the asyncz.locks.FileProtectedLock
.
Here are some options:
Option 1: Multiple life-cycle tasks, tick only one¶
Here the setup of the life-cycle tasks is executed simultanously. In case of a setting up database connections and having four worker processes, there will be after the setup 4 connections to the database.
When this is no problem, this is the easiest way.
lifecycle:
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
# Create a scheduler
scheduler = AsyncIOScheduler(
lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
stores={
"default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
"memory": {"type": "memory"},
},
)
def lifecycle_task():
# setup
...
# we have to mask generator send so it could be set to a task
scheduler.add_task(
make_function(generator.send), args=[False], trigger="shutdown", store="memory"
)
running = yield
while running:
# do something
running = yield
# cleanup
# setup task
generator = lifecycle_task()
generator.send(None)
# must be a global referencable function
def lifecycle_tick():
generator.send(True)
# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)
scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()
The memory store is just required for the shutdown task and can be left out when having no shutdown tasks or the shutdown tasks use a global referencable function
like lifecycle_tick
.
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
# Create a scheduler
scheduler = AsyncIOScheduler(
lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
stores={
"default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
},
)
def lifecycle_task():
# setup
...
running = yield
while running:
# do something
running = yield
# cleanup
# setup task
generator = lifecycle_task()
generator.send(None)
# must be a global referencable function
def lifecycle_tick():
generator.send(True)
# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)
scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()
Option 2: Single life-cycle task, setup on demand¶
When there should be only one task creating for example connections to a database, this type is the way to go.
Here we split the setup and the cleanup process each in two phases.
Next to the global setup/cleanup exists a file lock protected setup. Here we start the clients and clean in the lock protected cleanup phase the clients up (e.g. disconnecting).
The clever part of the design is: whenever a process is stopped the next scheduler picks up:
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
from asyncz.locks import FileLockProtected
# Create a scheduler
scheduler = AsyncIOScheduler(
lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
stores={
"default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
"memory": {"type": "memory"},
},
)
def lifecycle_task(name: str):
# setup initial
...
# intialize a file lock (multi-processing safe)
file_lock = FileLockProtected(f"/tmp/asyncz_bg_{name}_{{pgrp}}.lock")
while True:
# don't block the generator
with file_lock.protected(False) as got_the_lock:
if not got_the_lock:
running = yield
if not running:
break
continue
# delayed setup phase. Only executed when the lock was grabbed. e.g. for creating db clients.
...
# we have to mask generator send so it could be set to a task
scheduler.add_task(
make_function(generator.send), args=[False], trigger="shutdown", store="memory"
)
running = yield
while running:
# do something safe
try:
# do something risky
...
except Exception:
# log
...
running = yield
try:
# cleanup the loop setup
...
except Exception:
# log
...
# break the loop
break
# extra cleanup which is always executed except an exception was raised
# setup task
generator = lifecycle_task("foo")
generator.send(None)
# must be a global referencable function
def lifecycle_tick():
generator.send(True)
# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)
# should be better a context manager or lifespan wrapper (.asgi) to cleanup on unexpected errors
with scheduler:
...
# Now the shutdown task is executed and the generator progresses in the cleanup
Can we simplify this? Yes. By sacrificing execution accuracy of the background job we can just remove the store lock from the scheduler and remove the file store. When a worker process is stopped, it is here possible that one cycle is skipped. But when this is no problem, this is the way to go.
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
from asyncz.locks import FileLockProtected
# Create a scheduler
scheduler = AsyncIOScheduler()
def lifecycle_task(name: str):
# setup initial
...
# intialize a file lock (multi-processing safe)
file_lock = FileLockProtected(f"/tmp/asyncz_bg_{name}_{{pgrp}}.lock")
while True:
# don't block the generator
with file_lock.protected(False) as got_the_lock:
if not got_the_lock:
running = yield
if not running:
break
continue
# delayed setup phase. Only executed when the lock was grabbed. e.g. for creating db clients.
...
# we have to mask generator send so it could be set to a task
scheduler.add_task(make_function(generator.send), args=[False], trigger="shutdown")
running = yield
while running:
# do something safe
try:
# do something risky
...
except Exception:
# log
...
running = yield
try:
# cleanup the loop setup
...
except Exception:
# log
...
# break the loop
break
# extra cleanup which is always executed except an exception was raised
# setup task
generator = lifecycle_task("foo")
generator.send(None)
# Run every 5 minutes
scheduler.add_task(make_function(generator.send), args=[True], trigger="interval", minutes=5)
# should be better a context manager or lifespan wrapper (.asgi) to cleanup on unexpected errors
with scheduler:
...
# Now the shutdown task is executed and the generator progresses in the cleanup
Conclusions¶
That are only some options. In real-life setups it is even possible to mix the non-simplified options.