Skip to content

Tasks

Big part of the Asyncz are the Tasks. Those are special objects with instructions and parameters that are created/sent to the scheduler and then executed.

Importing a task is as simple as:

from asyncz.tasks import Task

Parameters

  • id - The unique identifier of this task.
  • name - The description of this task.
  • fn - The callable function to execute.
  • args - Positional arguments to the callable.
  • kwargs - Keyword arguments to the callable.
  • coalesce - Whether to only run the task once when several run times are due.
  • trigger - The trigger object that controls the schedule of this task.
  • executor - The name of the executor that will run this task.
  • mistrigger_grace_time - The time (in seconds) how much this task's execution is allowed to be late (None means "allow the task to run no matter how late it is").
  • max_instances - The maximum number of concurrently executing instances allowed for this task.
  • next_run_time - The next scheduled run time of this task. (Note: this has not the pause mode effect like in add_task)

Create a task

Creating a task is as simple as:

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

scheduler.start()

# or manually submit one with a changed trigger

t = Task(
    id="my-task2",
    fn=check_status,
    max_instances=1,
    coalesce=True,
)

scheduler.add_task(
    t, trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1)
)

Update a task

You can also update a specific task and its properties directly.

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


# Create a task
task = Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

# Update the task
task.update(
    name="my-new-task-id",
    max_instances=5,
    coalesce=False,
)

Internally the task is using the given scheduler to be updated and then executed.

Warning

All attributes can be updated but the id as this is immutable.

Reschedule a task

You can also reschedule a task when need and by that what it means is changing its trigger only.

The trigger must be the alias of the trigger object.

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


# Create a task
task = Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

# Reschedule the task
task.reschedule("my-task", trigger="cron", hour=10, minute=5)

Tasks with lifecyle

Sometimes tasks need a setup and a cleanup routine. This is possible to implement via generators, no matter if asynchronous or synchronous.

Generators have two methods: send, throw (for asynchronous generators there are asend and athrow). The send methods take a parameter which is returned from yield:

def generator():
    print(yield)

g = generator()
# start generator, it is  required to be None
g.send(None)
# raises StopIteration
g.send("hello world")

Now let's adapt this for tasks with lifecycle:

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function

# Create a scheduler
scheduler = AsyncIOScheduler()


def lifecycle_task():
    # setup
    ...
    # we have to mask generator send so it could be set to a task
    scheduler.add_task(make_function(generator.send), args=[False], trigger="shutdown")
    running = yield
    while running:
        # do something
        running = yield
    # cleanup


# setup task
generator = lifecycle_task()
generator.send(None)

# Run every 5 minutes
scheduler.add_task(make_function(generator.send), args=[True], trigger="interval", minutes=5)

scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()

Note the make_function and make_async_function decorators. They are required because the generator methods have no signature. It is especially required for asend/athrow.

This is also possible with asynchronous generators.

Warning

Lifecycle tasks are only valid for the memory store. Generators cannot be pickled.

Tasks with lifecycle in multi-processing environments

Natively we cannot add tasks with a lifecycle to other stores than the MemoryStore. But we can combine both paradigmas with multiple stores or keeping the lifecycle task out of stores.

We have a memory store for the lifecycle and optionally an other store for the ticks in the lifecycle. For multi-process synchronization we have the asyncz.locks.FileProtectedLock.

Here are some options:

Option 1: Multiple life-cycle tasks, tick only one

Here the setup of the life-cycle tasks is executed simultanously. In case of a setting up database connections and having four worker processes, there will be after the setup 4 connections to the database.

When this is no problem, this is the easiest way.

lifecycle:

Tick only one lifecycle task, with shutdown
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function

# Create a scheduler
scheduler = AsyncIOScheduler(
    lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
    stores={
        "default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
        "memory": {"type": "memory"},
    },
)


def lifecycle_task():
    # setup
    ...
    # we have to mask generator send so it could be set to a task
    scheduler.add_task(
        make_function(generator.send), args=[False], trigger="shutdown", store="memory"
    )
    running = yield
    while running:
        # do something
        running = yield
    # cleanup


# setup task
generator = lifecycle_task()
generator.send(None)


# must be a global referencable function
def lifecycle_tick():
    generator.send(True)


# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)

scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()

The memory store is just required for the shutdown task and can be left out when having no shutdown tasks or the shutdown tasks use a global referencable function like lifecycle_tick.

Tick only one lifecycle task, without shutdown
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function

# Create a scheduler
scheduler = AsyncIOScheduler(
    lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
    stores={
        "default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
    },
)


def lifecycle_task():
    # setup
    ...
    running = yield
    while running:
        # do something
        running = yield
    # cleanup


# setup task
generator = lifecycle_task()
generator.send(None)


# must be a global referencable function
def lifecycle_tick():
    generator.send(True)


# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)

scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()

Option 2: Single life-cycle task, setup on demand

When there should be only one task creating for example connections to a database, this type is the way to go.

Here we split the setup and the cleanup process each in two phases.

Next to the global setup/cleanup exists a file lock protected setup. Here we start the clients and clean in the lock protected cleanup phase the clients up (e.g. disconnecting).

The clever part of the design is: whenever a process is stopped the next scheduler picks up:

Only one concurrent lifecycle task
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
from asyncz.locks import FileLockProtected

# Create a scheduler
scheduler = AsyncIOScheduler(
    lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
    stores={
        "default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
        "memory": {"type": "memory"},
    },
)


def lifecycle_task(name: str):
    # setup initial
    ...
    # intialize a file lock (multi-processing safe)
    file_lock = FileLockProtected(f"/tmp/asyncz_bg_{name}_{{pgrp}}.lock")
    while True:
        # don't block the generator
        with file_lock.protected(False) as got_the_lock:
            if not got_the_lock:
                running = yield
                if not running:
                    break
                continue
            # delayed setup phase. Only executed when the lock was grabbed. e.g. for  creating db clients.
            ...
            # we have to mask generator send so it could be set to a task
            scheduler.add_task(
                make_function(generator.send), args=[False], trigger="shutdown", store="memory"
            )
            running = yield
            while running:
                # do something safe
                try:
                    # do something risky
                    ...
                except Exception:
                    # log
                    ...
                running = yield
            try:
                # cleanup the loop setup
                ...
            except Exception:
                # log
                ...
            # break the loop
            break
    # extra cleanup which is always executed except an exception was raised


# setup task
generator = lifecycle_task("foo")
generator.send(None)


# must be a global referencable function
def lifecycle_tick():
    generator.send(True)


# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)

# should be better a context manager or lifespan wrapper (.asgi) to cleanup on unexpected errors
with scheduler:
    ...
    # Now the shutdown task is executed and the generator progresses in the cleanup

Can we simplify this? Yes. By sacrificing execution accuracy of the background job we can just remove the store lock from the scheduler and remove the file store. When a worker process is stopped, it is here possible that one cycle is skipped. But when this is no problem, this is the way to go.

Only one concurrent lifecycle task with lower accuracy
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
from asyncz.locks import FileLockProtected

# Create a scheduler
scheduler = AsyncIOScheduler()


def lifecycle_task(name: str):
    # setup initial
    ...
    # intialize a file lock (multi-processing safe)
    file_lock = FileLockProtected(f"/tmp/asyncz_bg_{name}_{{pgrp}}.lock")
    while True:
        # don't block the generator
        with file_lock.protected(False) as got_the_lock:
            if not got_the_lock:
                running = yield
                if not running:
                    break
                continue
            # delayed setup phase. Only executed when the lock was grabbed. e.g. for  creating db clients.
            ...
            # we have to mask generator send so it could be set to a task
            scheduler.add_task(make_function(generator.send), args=[False], trigger="shutdown")
            running = yield
            while running:
                # do something safe
                try:
                    # do something risky
                    ...
                except Exception:
                    # log
                    ...
                running = yield
            try:
                # cleanup the loop setup
                ...
            except Exception:
                # log
                ...
            # break the loop
            break
    # extra cleanup which is always executed except an exception was raised


# setup task
generator = lifecycle_task("foo")
generator.send(None)


# Run every 5 minutes
scheduler.add_task(make_function(generator.send), args=[True], trigger="interval", minutes=5)

# should be better a context manager or lifespan wrapper (.asgi) to cleanup on unexpected errors
with scheduler:
    ...
    # Now the shutdown task is executed and the generator progresses in the cleanup

Conclusions

That are only some options. In real-life setups it is even possible to mix the non-simplified options.