Have you ever wondered what handles the tasks? Well, those are the executors and since Asyncz is designed to be more focused on ASGI and asyncio that also means it only provides the AsyncIOExecutor and the ThreadPoolExecutor/ProcessPoolExecutor out of the box but you can also create your own custom executor if you which as well.
When a task is done, it sends a notification to the scheduler informing that the task is done which triggers the appropriate event.
Runs the default executor on the event loop. If the task is a couroutine, meaning,
then it runs the task in the event loop directly or else it will run in the default that is
usually a thread pool.
from asyncz.executors import AsyncIOExecutor
- scheduler - The scheduler that is starting the executor.
- alias - The alias of the executor like it was assigned to the scheduler.
An executor that runs the tasks in a concurrent.futures thread pool.
from asyncz.executors import ThreadPoolExecutor
- max_workers – Maximum number of spawned threads.
- pool_kwargs – Dict of keyword arguments to pass to the underlying ThreadPoolExecutor constructor.
from asyncz.executors import ProcessPoolExecutor
- max_workers – Maximum number of spawned processes.
- pool_kwargs – Dict of keyword arguments to pass to the underlying ProcessPoolExecutor constructor.
You can also create a custom executor by subclassing the
from asyncz.executors.base import BaseExecutor
You should also implement the
shutdown() of that same executor.
The executor has some responsabilities such as:
- Initialiase when call
- Release resources when
- Track the number of instances of each task running on it and refusing to run more than the maximum.
- Notify the scheduler of the results of the task.
All the states are done via pydantic models so you should also keep that in mind.
from datetime import datetime from typing import Any, List from asyncz.executors.base import BaseExecutor from asyncz.schedulers import AsyncIOScheduler from asyncz.tasks.types import TaskType class CustomExecutor(BaseExecutor): """ A new custom executor. """ def start(self, scheduler: Any, alias: str): ... def shutdown(self, wait: bool = True): ... def send_task(self, task: "TaskType", run_times: List[datetime]): ... def do_send_task(self, task: "TaskType", run_times: List[datetime]) -> Any: ... # Create executor executor = CustomExecutor() scheduler = AsyncIOScheduler() # Add custom executor scheduler.add_executor(executor, "cutom")