Skip to content

Executors

Have you ever wondered what handles the tasks? Well, those are the executors and since Asyncz is designed to be more focused on ASGI and asyncio that also means it only provides the AsyncIOExecutor and the ThreadPoolExecutor/ProcessPoolExecutor out of the box but you can also create your own custom executor if you which as well.

When a task is done, it sends a notification to the scheduler informing that the task is done which triggers the appropriate event.

AsyncIOExecutor

Runs the default executor on the event loop. If the task is a couroutine, meaning, async def, then it runs the task in the event loop directly or else it will run in the default that is usually a thread pool.

Plugin Alias: asyncio

from asyncz.executors import AsyncIOExecutor

Parameters

  • scheduler - The scheduler that is starting the executor.
  • alias - The alias of the executor like it was assigned to the scheduler.

ThreadPoolExecutor

An executor that runs the tasks in a concurrent.futures thread pool.

Plugin Alias: threadpool

from asyncz.executors import ThreadPoolExecutor

Parameters

  • max_workers – Maximum number of spawned threads.
  • pool_kwargs – Dict of keyword arguments to pass to the underlying ThreadPoolExecutor constructor.
  • cancel_futures – (Default: False) Cancel futures on shutdown. Has only an effect since python 3.9.
  • overwrite_wait – (Default: Unset) Overwrite wait method used.

    cancel_futures: bool = False overwrite_wait: Optional[bool] = None

ProcessPoolExecutor

Plugin Alias: processpool

from asyncz.executors import ProcessPoolExecutor

Parameters

  • max_workers – Maximum number of spawned processes.
  • pool_kwargs – Dict of keyword arguments to pass to the underlying ProcessPoolExecutor constructor.
  • cancel_futures – (Default: False) Cancel futures on shutdown. Has only an effect since python 3.9.
  • overwrite_wait – (Default: Unset) Overwrite wait method used.

Custom executor

You can also create a custom executor by subclassing the BaseExecutor.

from asyncz.executors.base import BaseExecutor

You should also implement the start() and shutdown() of that same executor. The executor has some responsabilities such as:

  • Initialiase when call start()
  • Release resources when shutdown()
  • Track the number of instances of each task running on it and refusing to run more than the maximum.
  • Notify the scheduler of the results of the task.

All the states are done via pydantic models so you should also keep that in mind.

from datetime import datetime
from typing import Any, List

from asyncz.executors.base import BaseExecutor
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks.types import TaskType


class CustomExecutor(BaseExecutor):
    """
    A new custom executor.
    """

    def start(self, scheduler: Any, alias: str): ...

    def shutdown(self, wait: bool = True): ...

    def send_task(self, task: "TaskType", run_times: List[datetime]): ...

    def do_send_task(self, task: "TaskType", run_times: List[datetime]) -> Any: ...


# Create executor
executor = CustomExecutor()

scheduler = AsyncIOScheduler()

# Add custom executor
scheduler.add_executor(executor, "cutom")