Skip to content

API Reference

This page collects the main public Asyncz types in one place.

Schedulers

asyncz.schedulers.asyncio.AsyncIOScheduler

AsyncIOScheduler(global_config=None, **kwargs)

Bases: BaseScheduler

A scheduler that runs on an asyncio event loop.

This scheduler is typically to run with asyncio which means that any ASGI framework can also use it internally if needed. For example, Ravyn and Starlette.

PARAMETER DESCRIPTION
event_loop

AsyncIO event loop to use. Default to the global event loop.

isolated_event_loop

Use a fresh, isolated event_loop instead the existing.

Source code in asyncz/schedulers/base.py
def __init__(
    self,
    global_config: Optional[Any] = None,
    **kwargs: Any,
) -> None:
    super().__init__()
    self.trigger_plugins = dict(defaults.triggers.items())
    self.trigger_classes: dict[str, type[TriggerType]] = {}
    self.executor_plugins: dict[str, str] = dict(defaults.executors.items())
    self.executor_classes: dict[str, type[ExecutorType]] = {}
    self.store_plugins: dict[str, str] = dict(defaults.stores.items())
    self.store_classes: dict[str, type[StoreType]] = {}
    self.executors: dict[str, ExecutorType] = {}
    self.executor_lock: RLock = self.create_lock()
    self.stores: dict[str, StoreType] = {}
    self.store_processing_lock: LockProtectedProtocol = self.create_processing_lock()
    self.store_lock: RLock = self.create_lock()
    self.listeners: list[Any] = []
    self.listeners_lock: RLock = self.create_lock()
    self.pending_tasks: list[tuple[TaskType, bool, bool]] = []
    self.state: Union[SchedulerState, Any] = SchedulerState.STATE_STOPPED

    self.ref_counter: int = 0
    self.ref_lock: Lock = Lock()
    self.instances: dict[str, int] = defaultdict(lambda: 0)
    self.setup(global_config, **kwargs)

isolated_event_loop class-attribute instance-attribute

isolated_event_loop = False

event_loop_thread class-attribute instance-attribute

event_loop_thread = None

timer class-attribute instance-attribute

timer = None

event_loop class-attribute instance-attribute

event_loop = None

loggers instance-attribute

loggers

instances instance-attribute

instances = defaultdict(lambda: 0)

running property

running

Return True if the scheduler has been started. This is a shortcut for scheduler.state != SchedulerState.STATE_STOPPED.

trigger_plugins instance-attribute

trigger_plugins = dict(items())

trigger_classes instance-attribute

trigger_classes = {}

executor_plugins instance-attribute

executor_plugins = dict(items())

executor_classes instance-attribute

executor_classes = {}

store_plugins instance-attribute

store_plugins = dict(items())

store_classes instance-attribute

store_classes = {}

executors instance-attribute

executors = {}

executor_lock instance-attribute

executor_lock = create_lock()

stores instance-attribute

stores = {}

store_processing_lock instance-attribute

store_processing_lock = create_processing_lock()

store_lock instance-attribute

store_lock = create_lock()

listeners instance-attribute

listeners = []

listeners_lock instance-attribute

listeners_lock = create_lock()

pending_tasks instance-attribute

pending_tasks = []

state instance-attribute

state = STATE_STOPPED

ref_counter instance-attribute

ref_counter = 0

ref_lock instance-attribute

ref_lock = Lock()

start

start(paused=False)
Source code in asyncz/schedulers/asyncio.py
def start(self, paused: bool = False) -> bool:
    if not self.event_loop:
        try:
            if self.isolated_event_loop:
                raise RuntimeError()
            self.event_loop = asyncio.get_running_loop()
        except RuntimeError:
            event = Event()
            self.event_loop_thread = Thread(
                target=self._init_new_loop, args=[event], daemon=True
            )
            self.event_loop_thread.start()
            event.wait()
    return super().start(paused)

shutdown

shutdown(wait=True)
Source code in asyncz/schedulers/asyncio.py
def shutdown(self, wait: bool = True) -> bool:
    # not decremented yet so +1
    result = self.ref_counter
    self._shutdown(wait)
    return result > 1

start_timer

start_timer(wait_seconds=None)
Source code in asyncz/schedulers/asyncio.py
def start_timer(self, wait_seconds: Optional[float] = None) -> None:
    self.stop_timer()
    if wait_seconds is not None:
        self.timer = self.event_loop.call_later(wait_seconds, self.wakeup)

stop_timer

stop_timer()
Source code in asyncz/schedulers/asyncio.py
def stop_timer(self) -> None:
    if getattr(self, "timer", None):
        self.timer.cancel()  # type: ignore
        self.timer = None

wakeup

wakeup()
Source code in asyncz/schedulers/asyncio.py
@run_in_event_loop
def wakeup(self) -> None:
    self.stop_timer()
    wait_seconds = self.process_tasks()
    self.start_timer(wait_seconds)

create_default_executor

create_default_executor()
Source code in asyncz/schedulers/asyncio.py
def create_default_executor(self) -> BaseExecutor:
    return AsyncIOExecutor()

pause

pause()

Pause task processing in the scheduler.

This will prevent the scheduler from waking up to do task processing until resume is called. It will not however stop any already running task processing.

Source code in asyncz/schedulers/base.py
def pause(self) -> None:
    """
    Pause task processing in the scheduler.

    This will prevent the scheduler from waking up to do task processing until resume
    is called. It will not however stop any already running task processing.
    """
    if self.state == SchedulerState.STATE_STOPPED:
        raise SchedulerNotRunningError()
    elif self.state == SchedulerState.STATE_RUNNING:
        self.state = SchedulerState.STATE_PAUSED
        self.loggers[self.logger_name].info("Paused scheduler task processing.")
        self.dispatch_event(SchedulerEvent(code=SCHEDULER_PAUSED))

resume

resume()

Resume task processing in the scheduler.

Source code in asyncz/schedulers/base.py
def resume(self) -> None:
    """
    Resume task processing in the scheduler.
    """
    if self.state == SchedulerState.STATE_STOPPED:
        raise SchedulerNotRunningError
    elif self.state == SchedulerState.STATE_PAUSED:
        self.state = SchedulerState.STATE_RUNNING
        self.loggers[self.logger_name].info("Resumed scheduler task processing.")
        self.dispatch_event(SchedulerEvent(code=SCHEDULER_RESUMED))
        self.wakeup()

add_executor

add_executor(executor, alias='default', **executor_options)
Source code in asyncz/schedulers/base.py
def add_executor(
    self,
    executor: Union[ExecutorType, str],
    alias: str = "default",
    **executor_options: Any,
) -> None:
    with self.executor_lock:
        if alias in self.executors:
            raise ValueError(
                f"This scheduler already has an executor by the alias of '{alias}'."
            )

        if isinstance(executor, ExecutorType):
            self.executors[alias] = executor
        elif isinstance(executor, str):
            self.executors[alias] = executor = self.create_plugin_instance(
                PluginInstance.EXECUTOR, executor, executor_options
            )
        else:
            raise TypeError(
                f"Expected an executor instance or a string, got {executor.__class__.__name__} instead."
            )
        if self.state != SchedulerState.STATE_STOPPED:
            cast("ExecutorType", executor).start(self, alias)

    self.dispatch_event(SchedulerEvent(code=EXECUTOR_ADDED, alias=alias))

remove_executor

remove_executor(alias, shutdown=True)

Removes the executor by the given alias from this scheduler.

Source code in asyncz/schedulers/base.py
def remove_executor(self, alias: str, shutdown: bool = True) -> None:
    """
    Removes the executor by the given alias from this scheduler.
    """
    with self.executor_lock:
        executor = self.lookup_executor(alias)
        del self.executors[alias]

    if shutdown:
        executor.shutdown()

    self.dispatch_event(SchedulerEvent(code=EXECUTOR_REMOVED, alias=alias))

add_store

add_store(store, alias='default', **store_options)

Adds a task store to this scheduler.

Any extra keyword arguments will be passed to the task store plugin's constructor, assuming that the first argument is the name of a task store plugin.

Source code in asyncz/schedulers/base.py
def add_store(
    self, store: Union[StoreType, str], alias: str = "default", **store_options: Any
) -> None:
    """
    Adds a task store to this scheduler.

    Any extra keyword arguments will be passed to the task store plugin's constructor, assuming
    that the first argument is the name of a task store plugin.
    """
    with self.store_lock:
        if alias in self.stores:
            raise ValueError(
                f"This scheduler already has a task store by the alias of '{alias}'."
            )

        if isinstance(store, StoreType):
            self.stores[alias] = store
        elif isinstance(store, str):
            self.stores[alias] = store = self.create_plugin_instance(
                PluginInstance.STORE, store, store_options
            )
        else:
            raise TypeError(
                f"Expected a task store instance or a string, got {store.__class__.__name__} instead."
            )

        if self.state != SchedulerState.STATE_STOPPED:
            cast("StoreType", store).start(self, alias)

    self.dispatch_event(SchedulerEvent(code=STORE_ADDED, alias=alias))

    if self.state == SchedulerState.STATE_RUNNING:
        self.wakeup()

remove_store

remove_store(alias, shutdown=True)

Removes the task store by the given alias from this scheduler.

Source code in asyncz/schedulers/base.py
def remove_store(self, alias: str, shutdown: bool = True) -> None:
    """
    Removes the task store by the given alias from this scheduler.
    """
    with self.store_lock:
        store = self.lookup_store(alias)
        del self.stores[alias]

    if shutdown:
        store.shutdown()

    self.dispatch_event(SchedulerEvent(code=STORE_REMOVED, alias=alias))

add_listener

add_listener(callback, mask=ALL_EVENTS)

add_listener(callback, mask=EVENT_ALL)

Adds a listener for scheduler events.

When a matching event occurs, callback is executed with the event object as its sole argument. If the mask parameter is not provided, the callback will receive events of all types.

PARAMETER DESCRIPTION
callback

any callable that takes one argument.

TYPE: Any

mask

bitmask that indicates which events should be listened to.

TYPE: Union[int, str] DEFAULT: ALL_EVENTS

Source code in asyncz/schedulers/base.py
def add_listener(self, callback: Any, mask: Union[int, str] = ALL_EVENTS) -> None:
    """
    add_listener(callback, mask=EVENT_ALL)

    Adds a listener for scheduler events.

    When a matching event  occurs, callback is executed with the event object as its
    sole argument. If the mask parameter is not provided, the callback will receive events
    of all types.

    Args:
        callback: any callable that takes one argument.
        mask: bitmask that indicates which events should be listened to.
    """
    with self.listeners_lock:
        self.listeners.append((callback, mask))

remove_listener

remove_listener(callback)

Removes a previously added event listener.

Source code in asyncz/schedulers/base.py
def remove_listener(self, callback: Any) -> bool:
    """
    Removes a previously added event listener.
    """
    with self.listeners_lock:
        for index, (_callback, _) in enumerate(self.listeners):
            if callback == _callback:
                del self.listeners[index]
                return True
    return False

add_task

add_task(
    fn_or_task=None,
    trigger=None,
    args=None,
    kwargs=None,
    id=None,
    name=None,
    mistrigger_grace_time=undefined,
    coalesce=undefined,
    max_instances=undefined,
    next_run_time=undefined,
    store=None,
    executor=None,
    replace_existing=False,
    fn=None,
    **trigger_args,
)

Adds the given task to the task list and wakes up the scheduler if it's already running.

Any option that defaults to undefined will be replaced with the corresponding default value when the task is scheduled (which happens when the scheduler is started, or immediately if the scheduler is already running).

The fn argument can be given either as a callable object or a textual reference in the package.module:some.object format, where the first half (separated by :) is an importable module and the second half is a reference to the callable object, relative to the module.

The trigger argument can either be

. The alias name of the trigger (e.g. date, interval or cron), in which case any extra keyword arguments to this method are passed on to the trigger's constructor. . An instance of a trigger class (TriggerType).

PARAMETER DESCRIPTION
fn

Callable (or a textual reference to one) to run at the given time.

TYPE: Optional[Any] DEFAULT: None

trigger

Trigger instance that determines when fn is called.

TYPE: Optional[Union[TriggerType, str]] DEFAULT: None

args

List of positional arguments to call fn with.

TYPE: Optional[Any] DEFAULT: None

kwargs

Dict of keyword arguments to call fn with.

TYPE: Optional[Any] DEFAULT: None

id

Explicit identifier for the task (for modifying it later).

TYPE: Optional[str] DEFAULT: None

name

Textual description of the task.

TYPE: Optional[str] DEFAULT: None

mistriger_grace_time

Seconds after the designated runtime that the task is still allowed to be run (or None to allow the task to run no matter how late it is).

coalesce

Run once instead of many times if the scheduler determines that the task should be run more than once in succession.

TYPE: Union[bool, Undefined] DEFAULT: undefined

max_instances

Maximum number of concurrently running instances allowed for this task.

TYPE: Union[int, Undefined, None] DEFAULT: undefined

next_run_time

When to first run the task, regardless of the trigger (pass None to add the task as paused).

TYPE: Union[datetime, str, Undefined, None] DEFAULT: undefined

store

Alias of the task store to store the task in.

TYPE: Union[str, Undefined, None] DEFAULT: None

executor

Alias of the executor to run the task with.

TYPE: Union[str, Undefined, None] DEFAULT: None

replace_existing

True to replace an existing task with the same id (but retain the number of runs from the existing one).

TYPE: bool DEFAULT: False

Source code in asyncz/schedulers/base.py
def add_task(
    self,
    fn_or_task: Optional[Union[Callable[..., Any], TaskType]] = None,
    trigger: Optional[Union[TriggerType, str]] = None,
    args: Optional[Any] = None,
    kwargs: Optional[Any] = None,
    id: Optional[str] = None,
    name: Optional[str] = None,
    mistrigger_grace_time: Union[int, Undefined, None] = undefined,
    coalesce: Union[bool, Undefined] = undefined,
    max_instances: Union[int, Undefined, None] = undefined,
    next_run_time: Union[datetime, str, Undefined, None] = undefined,
    store: Union[str, Undefined, None] = None,
    executor: Union[str, Undefined, None] = None,
    replace_existing: bool = False,
    # old name
    fn: Optional[Any] = None,
    **trigger_args: Any,
) -> TaskType:
    """
    Adds the given task to the task list and wakes up the scheduler if it's already running.

    Any option that defaults to undefined will be replaced with the corresponding default
    value when the task is scheduled (which happens when the scheduler is started, or
    immediately if the scheduler is already running).

    The fn argument can be given either as a callable object or a textual reference in
    the package.module:some.object format, where the first half (separated by :) is an
    importable module and the second half is a reference to the callable object, relative to
    the module.

    The trigger argument can either be:
      . The alias name of the trigger (e.g. date, interval or cron), in which case
        any extra keyword arguments to this method are passed on to the trigger's constructor.
      . An instance of a trigger class (TriggerType).


    Args:
        fn: Callable (or a textual reference to one) to run at the given time.
        trigger: Trigger instance that determines when fn is called.
        args: List of positional arguments to call fn with.
        kwargs: Dict of keyword arguments to call fn with.
        id: Explicit identifier for the task (for modifying it later).
        name: Textual description of the task.
        mistriger_grace_time: Seconds after the designated runtime that the task is still
            allowed to be run (or None to allow the task to run no matter how late it is).
        coalesce: Run once instead of many times if the scheduler determines that the
            task should be run more than once in succession.
        max_instances: Maximum number of concurrently running instances allowed for this task.
        next_run_time: When to first run the task, regardless of the trigger (pass
            None to add the task as paused).
        store: Alias of the task store to store the task in.
        executor: Alias of the executor to run the task with.
        replace_existing: True to replace an existing task with the same id
                          (but retain the number of runs from the existing one).
    """
    from asyncz.tasks import Task

    if fn is not None:
        warnings.warn(
            "The parameter 'fn' was renamed to 'fn_or_task'",
            DeprecationWarning,
            stacklevel=2,
        )
        fn_or_task = fn
    if isinstance(fn_or_task, TaskType):
        assert fn_or_task.submitted is False, "Can submit tasks only once"
        fn_or_task.submitted = True
        # tweak task before submitting
        # WARNING: in contrast to the decorator mode this really updates the task
        # by providing an id (e.g. autogenerated) you can make a real task from decorator type task while submitting
        task_update_kwargs: dict[str, Any] = {
            "scheduler": self,
            "args": tuple(args) if args is not None else undefined,
            "kwargs": dict(kwargs) if kwargs is not None else undefined,
            "id": id or undefined,
            "name": name or undefined,
            "mistrigger_grace_time": mistrigger_grace_time,
            "coalesce": coalesce,
            "max_instances": max_instances,
            "next_run_time": next_run_time,
            "executor": executor if executor is not None else undefined,
            "store_alias": store if store is not None else undefined,
        }
        if trigger:
            task_update_kwargs["trigger"] = self.create_trigger(trigger, trigger_args)
            # WARNING: when submitting a task object allow_mistrigger_by_default has no effect
        task_update_kwargs = {
            key: value for key, value in task_update_kwargs.items() if value is not undefined
        }
        fn_or_task.update_task(**task_update_kwargs)
        # fallback if still not set. Set manually executor and store_alias to default.
        if fn_or_task.executor is None:
            fn_or_task.executor = "default"
        if fn_or_task.store_alias is None:
            fn_or_task.store_alias = "default"
        assert fn_or_task.trigger is not None, "Cannot submit a task without a trigger."
        assert fn_or_task.id is not None, "Cannot submit a decorator type task."
        with self.store_lock:
            if self.state == SchedulerState.STATE_STOPPED:
                self.pending_tasks.append(
                    (
                        fn_or_task,
                        replace_existing,
                        next_run_time is not None,
                    )
                )
                self.loggers[self.logger_name].info(
                    "Adding task tentatively. It will be properly scheduled when the scheduler starts."
                )
            else:
                self.real_add_task(
                    fn_or_task, replace_existing, next_run_time is not None, None
                )
        return fn_or_task
    task_kwargs: dict[str, Any] = {
        "scheduler": self,
        "trigger": self.create_trigger(trigger, trigger_args),
        "fn": fn_or_task,
        "args": tuple(args) if args is not None else (),
        "kwargs": dict(kwargs) if kwargs is not None else {},
        "id": id,
        "name": name,
        "mistrigger_grace_time": mistrigger_grace_time,
        "coalesce": coalesce,
        "max_instances": max_instances,
        "next_run_time": next_run_time,
        "executor": executor if executor is not None else undefined,
        "store_alias": store if store is not None else undefined,
    }
    task_kwargs = {key: value for key, value in task_kwargs.items() if value is not undefined}
    if task_kwargs["trigger"].allow_mistrigger_by_default:
        # we want to be able, to just use add_task, and the task is scheduled
        task_kwargs.setdefault("mistrigger_grace_time", None)
    for key, value in self.task_defaults.model_dump(exclude_none=True).items():
        task_kwargs.setdefault(key, value)

    task = Task(**task_kwargs)
    if task.fn is not None:
        return self.add_task(
            task, replace_existing=replace_existing, next_run_time=next_run_time
        )
    return task

update_task

update_task(task_id, store=None, **updates)

Modifies the properties of a single task.

Modifications are passed to this method as extra keyword arguments.

PARAMETER DESCRIPTION
task_id

The identifier of the task.

TYPE: Union[TaskType, str]

store

Alias of the store that contains the task.

TYPE: Optional[str] DEFAULT: None

Source code in asyncz/schedulers/base.py
def update_task(
    self, task_id: Union[TaskType, str], store: Optional[str] = None, **updates: Any
) -> TaskType:
    """
    Modifies the properties of a single task.

    Modifications are passed to this method as extra keyword arguments.

    Args:
        task_id: The identifier of the task.
        store: Alias of the store that contains the task.
    """
    if isinstance(task_id, TaskType):
        assert task_id.id, "Cannot update a decorator type Task"
        new_updates = task_id.model_dump()
        new_updates.update(**updates)
        task_id = task_id.id
    else:
        new_updates = updates

    with self.store_lock:
        task, store = self.lookup_task(task_id, store)
        task.update_task(scheduler=self, **new_updates)

        if store:
            self.lookup_store(store).update_task(task)

    self.dispatch_event(TaskEvent(code=TASK_MODIFIED, task_id=task_id, store=store))

    if self.state == SchedulerState.STATE_RUNNING:
        self.wakeup()
    return task

reschedule_task

reschedule_task(
    task_id, store=None, trigger=None, **trigger_args
)

Constructs a new trigger for a task and updates its next run time.

Extra keyword arguments are passed directly to the trigger's constructor.

PARAMETER DESCRIPTION
task_id

The identifier of the task.

TYPE: Union[TaskType, str]

store

Alias of the task store that contains the task.

TYPE: Optional[str] DEFAULT: None

trigger

Alias of the trigger type or a trigger instance.

TYPE: Optional[Union[str, TriggerType]] DEFAULT: None

Source code in asyncz/schedulers/base.py
def reschedule_task(
    self,
    task_id: Union[TaskType, str],
    store: Optional[str] = None,
    trigger: Optional[Union[str, TriggerType]] = None,
    **trigger_args: Any,
) -> TaskType:
    """
    Constructs a new trigger for a task and updates its next run time.

    Extra keyword arguments are passed directly to the trigger's constructor.

    Args:
        task_id: The identifier of the task.
        store: Alias of the task store that contains the task.
        trigger: Alias of the trigger type or a trigger instance.
    """
    trigger = self.create_trigger(trigger, trigger_args)
    now = datetime.now(self.timezone)
    next_run_time = trigger.get_next_trigger_time(self.timezone, None, now)
    return self.update_task(task_id, store, trigger=trigger, next_run_time=next_run_time)

pause_task

pause_task(task_id, store=None)

Causes the given task not to be executed until it is explicitly resumed.

PARAMETER DESCRIPTION
task_id

The identifier of the task.

TYPE: Union[TaskType, str]

store

Alias of the task store that contains the task.

TYPE: Optional[str] DEFAULT: None

Source code in asyncz/schedulers/base.py
def pause_task(self, task_id: Union[TaskType, str], store: Optional[str] = None) -> TaskType:
    """
    Causes the given task not to be executed until it is explicitly resumed.

    Args:
        task_id: The identifier of the task.
        store: Alias of the task store that contains the task.
    """
    return self.update_task(task_id, store, next_run_time=None)

resume_task

resume_task(task_id, store=None)

Resumes the schedule of the given task, or removes the task if its schedule is finished.

PARAMETER DESCRIPTION
task_id

The identifier of the task.

TYPE: Union[TaskType, str]

store

Alias of the task store that contains the task.

TYPE: Optional[str] DEFAULT: None

Source code in asyncz/schedulers/base.py
def resume_task(
    self, task_id: Union[TaskType, str], store: Optional[str] = None
) -> Union[TaskType, None]:
    """
    Resumes the schedule of the given task, or removes the task if its schedule is finished.

    Args:
        task_id: The identifier of the task.
        store: Alias of the task store that contains the task.
    """
    if isinstance(task_id, TaskType):
        assert task_id.id, "Cannot resume decorator style Task"
        task_id = task_id.id
    with self.store_lock:
        task, store = self.lookup_task(task_id, store)
        now = datetime.now(self.timezone)
        next_run_time = task.trigger.get_next_trigger_time(self.timezone, None, now)  # type: ignore

        if next_run_time:
            return self.update_task(task_id, store, next_run_time=next_run_time)
        else:
            self.delete_task(task.id, store)
            return None

run_task

run_task(
    task_id, store=None, *, force=True, remove_finished=True
)

Submit a task to its configured executor immediately.

This method centralizes the "run now" behavior that operational surfaces such as the CLI and dashboard need. It reuses the task's configured executor, dispatches the same submission-related events that the main scheduler loop emits, and persists the task's updated schedule state.

PARAMETER DESCRIPTION
task_id

The task instance or identifier to execute immediately.

TYPE: Union[TaskType, str]

store

Optional preferred store alias when resolving the task.

TYPE: Optional[str] DEFAULT: None

force

When True and the trigger has no run times due yet, submit a one-off immediate execution at now.

TYPE: bool DEFAULT: True

remove_finished

When True and the trigger yields no further run times after this execution, delete the task. When False, preserve the task in a paused state.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Union[TaskType, None]

The updated task when it remains scheduled or paused, or None if

Union[TaskType, None]

it was removed because the schedule finished and remove_finished

Union[TaskType, None]

was requested.

RAISES DESCRIPTION
SchedulerNotRunningError

If the scheduler has not been started yet.

MaximumInstancesError

If the task is already at its executor's concurrency limit.

TaskLookupError

If the task cannot be found.

KeyError

If the configured executor cannot be found.

Source code in asyncz/schedulers/base.py
def run_task(
    self,
    task_id: Union[TaskType, str],
    store: Optional[str] = None,
    *,
    force: bool = True,
    remove_finished: bool = True,
) -> Union[TaskType, None]:
    """
    Submit a task to its configured executor immediately.

    This method centralizes the "run now" behavior that operational surfaces
    such as the CLI and dashboard need. It reuses the task's configured
    executor, dispatches the same submission-related events that the main
    scheduler loop emits, and persists the task's updated schedule state.

    Args:
        task_id: The task instance or identifier to execute immediately.
        store: Optional preferred store alias when resolving the task.
        force: When ``True`` and the trigger has no run times due yet, submit
            a one-off immediate execution at ``now``.
        remove_finished: When ``True`` and the trigger yields no further run
            times after this execution, delete the task. When ``False``,
            preserve the task in a paused state.

    Returns:
        The updated task when it remains scheduled or paused, or ``None`` if
        it was removed because the schedule finished and ``remove_finished``
        was requested.

    Raises:
        SchedulerNotRunningError: If the scheduler has not been started yet.
        MaximumInstancesError: If the task is already at its executor's
            concurrency limit.
        TaskLookupError: If the task cannot be found.
        KeyError: If the configured executor cannot be found.
    """

    if self.state == SchedulerState.STATE_STOPPED:
        raise SchedulerNotRunningError()

    if isinstance(task_id, TaskType):
        assert task_id.id, "Cannot run a decorator style Task."
        task_id = task_id.id

    task, store_alias = self.lookup_task(task_id, store)
    now = datetime.now(self.timezone)
    run_times = task.get_run_times(self.timezone, now)
    if not run_times:
        if not force:
            return task
        run_times = [now]

    assert task.executor is not None, "Task has no executor configured."
    executor = self.lookup_executor(task.executor)

    try:
        executor.send_task(task, run_times)
    except MaximumInstancesError:
        self.loggers[self.logger_name].warning(
            "Execution of task '%s' skipped: maximum running instances reached (%s).",
            task,
            task.max_instances,
        )
        self.dispatch_event(
            TaskSubmissionEvent(
                code=TASK_MAX_INSTANCES,
                task_id=task_id,
                store=store_alias,
                scheduled_run_times=run_times,
            )
        )
        raise
    except Exception:
        self.loggers[self.logger_name].exception(
            "Error submitting task '%s' to executor '%s'.",
            task,
            task.executor,
        )
        raise
    else:
        self.dispatch_event(
            TaskSubmissionEvent(
                code=TASK_SUBMITTED,
                task_id=task_id,
                store=store_alias,
                scheduled_run_times=run_times,
            )
        )

    next_run_time = task.trigger.get_next_trigger_time(  # type: ignore[union-attr]
        self.timezone,
        run_times[-1],
        now,
    )

    if next_run_time is not None and next_run_time <= now:
        # Manual runs must always advance strictly forward so admin actions do
        # not create zero-delay feedback loops in tests or dashboards.
        next_run_time = now + timedelta(milliseconds=1)

    if next_run_time is None and remove_finished:
        self.delete_task(task_id, store_alias)
        return None

    task.update_task(next_run_time=next_run_time)
    if store_alias is not None:
        with self.store_lock:
            self.lookup_store(store_alias).update_task(task)

    return task

get_tasks

get_tasks(store=None)

Returns a list of pending tasks (if the scheduler hasn't been started yet) and scheduled tasks, either from a specific task store or from all of them.

If the scheduler has not been started yet, only pending tasks can be returned because the task stores haven't been started yet either.

PARAMETER DESCRIPTION
store

alias of the task store.

TYPE: Optional[str] DEFAULT: None

Source code in asyncz/schedulers/base.py
def get_tasks(self, store: Optional[str] = None) -> list[TaskType]:
    """
    Returns a list of pending tasks (if the scheduler hasn't been started yet) and scheduled
    tasks, either from a specific task store or from all of them.

    If the scheduler has not been started yet, only pending tasks can be returned because the
    task stores haven't been started yet either.

    Args:
        store: alias of the task store.
    """
    with self.store_lock:
        tasks = []
        if self.state == SchedulerState.STATE_STOPPED:
            for task, _, _ in self.pending_tasks:
                if store is None or task.store_alias == store:
                    tasks.append(task)
        else:
            for alias, _store in self.stores.items():
                if store is None or alias == store:
                    tasks.extend(_store.get_all_tasks())

        return tasks

get_task

get_task(task_id, store=None)

Returms the Task that matches the given task_id.

PARAMETER DESCRIPTION
task_id

The identifier of the task.

TYPE: str

store

Alias of the task store that most likely contains the task.

TYPE: Optional[str] DEFAULT: None

Source code in asyncz/schedulers/base.py
def get_task(self, task_id: str, store: Optional[str] = None) -> Union[TaskType, None]:
    """
    Returms the Task that matches the given task_id.

    Args:
        task_id: The identifier of the task.
        store: Alias of the task store that most likely contains the task.
    """
    with self.store_lock:
        try:
            return self.lookup_task(task_id, store)[0]
        except TaskLookupError:
            return None

get_task_info

get_task_info(task_id, store=None)

Return an immutable inspection snapshot for a single task.

The returned snapshot is safe to expose in operational tooling because it is detached from the underlying task object and captures only observable scheduler metadata.

Source code in asyncz/schedulers/base.py
def get_task_info(self, task_id: str, store: Optional[str] = None) -> Union[TaskInfo, None]:
    """
    Return an immutable inspection snapshot for a single task.

    The returned snapshot is safe to expose in operational tooling because it
    is detached from the underlying task object and captures only observable
    scheduler metadata.
    """

    task = self.get_task(task_id, store)
    return task.snapshot() if task is not None else None

get_task_infos

get_task_infos(
    store=None,
    *,
    schedule_state=None,
    executor=None,
    trigger=None,
    q=None,
    sort_by="next_run_time",
    descending=False,
)

Return task snapshots with scheduler-native filtering and sorting.

This API is meant for read-oriented operational tooling. Instead of every consumer re-implementing its own serialization, filtering, and ordering logic, the scheduler exposes a single consistent view of task metadata.

PARAMETER DESCRIPTION
store

Restrict results to one store alias.

TYPE: Optional[str] DEFAULT: None

schedule_state

Optional state filter. Accepts a TaskScheduleState member or its string value.

TYPE: TaskScheduleState | str | None DEFAULT: None

executor

Restrict results to one executor alias.

TYPE: str | None DEFAULT: None

trigger

Restrict results to one trigger alias or trigger class name.

TYPE: str | None DEFAULT: None

q

Case-insensitive free-text search across task identifiers, names, callable information, trigger metadata, executor, and state.

TYPE: str | None DEFAULT: None

sort_by

Field to sort on. Supported values are id, name, next_run_time, schedule_state, executor, store, and trigger.

TYPE: str DEFAULT: 'next_run_time'

descending

Reverse the final sort order.

TYPE: bool DEFAULT: False

Source code in asyncz/schedulers/base.py
def get_task_infos(
    self,
    store: Optional[str] = None,
    *,
    schedule_state: TaskScheduleState | str | None = None,
    executor: str | None = None,
    trigger: str | None = None,
    q: str | None = None,
    sort_by: str = "next_run_time",
    descending: bool = False,
) -> list[TaskInfo]:
    """
    Return task snapshots with scheduler-native filtering and sorting.

    This API is meant for read-oriented operational tooling. Instead of every
    consumer re-implementing its own serialization, filtering, and ordering
    logic, the scheduler exposes a single consistent view of task metadata.

    Args:
        store: Restrict results to one store alias.
        schedule_state: Optional state filter. Accepts a
            ``TaskScheduleState`` member or its string value.
        executor: Restrict results to one executor alias.
        trigger: Restrict results to one trigger alias or trigger class name.
        q: Case-insensitive free-text search across task identifiers, names,
            callable information, trigger metadata, executor, and state.
        sort_by: Field to sort on. Supported values are ``id``, ``name``,
            ``next_run_time``, ``schedule_state``, ``executor``, ``store``,
            and ``trigger``.
        descending: Reverse the final sort order.
    """

    resolved_state: TaskScheduleState | None = None
    if schedule_state is not None:
        resolved_state = (
            schedule_state
            if isinstance(schedule_state, TaskScheduleState)
            else TaskScheduleState(str(schedule_state).strip().lower())
        )

    infos = [task.snapshot() for task in self.get_tasks(store)]
    if resolved_state is not None:
        infos = [info for info in infos if info.schedule_state is resolved_state]

    if executor:
        infos = [info for info in infos if info.executor == executor]

    if trigger:
        trigger_value = trigger.strip().lower()
        infos = [
            info
            for info in infos
            if (info.trigger_alias or "").lower() == trigger_value
            or (info.trigger_name or "").lower() == trigger_value
        ]

    if q:
        needle = q.strip().lower()
        infos = [
            info
            for info in infos
            if needle in (info.id or "").lower()
            or needle in (info.name or "").lower()
            or needle in (info.callable_name or "").lower()
            or needle in (info.callable_reference or "").lower()
            or needle in (info.trigger_alias or "").lower()
            or needle in (info.trigger_name or "").lower()
            or needle in (info.trigger_description or "").lower()
            or needle in (info.executor or "").lower()
            or needle in (info.store_alias or "").lower()
            or needle in info.schedule_state.value
        ]

    sorters: dict[str, Callable[[TaskInfo], Any]] = {
        "id": lambda info: (info.id or "").lower(),
        "name": lambda info: ((info.name or "").lower(), (info.id or "").lower()),
        "next_run_time": lambda info: (
            info.next_run_time is None,
            info.next_run_time or datetime.max.replace(tzinfo=self.timezone),
            (info.name or "").lower(),
            (info.id or "").lower(),
        ),
        "schedule_state": lambda info: (
            info.schedule_state.value,
            info.next_run_time is None,
            info.next_run_time or datetime.max.replace(tzinfo=self.timezone),
            (info.id or "").lower(),
        ),
        "executor": lambda info: ((info.executor or "").lower(), (info.id or "").lower()),
        "store": lambda info: ((info.store_alias or "").lower(), (info.id or "").lower()),
        "trigger": lambda info: (
            (info.trigger_alias or info.trigger_name or "").lower(),
            (info.id or "").lower(),
        ),
    }
    try:
        key = sorters[sort_by]
    except KeyError as exc:
        raise ValueError(
            "sort_by must be one of: id, name, next_run_time, schedule_state, "
            "executor, store, trigger."
        ) from exc

    return sorted(infos, key=key, reverse=descending)

delete_task

delete_task(task_id, store=None)

Removes a task, preventing it from being run anymore.

PARAMETER DESCRIPTION
task_id

The identifier of the task.

TYPE: Union[TaskType, str, None]

store

Alias of the task store that most likely contains the task.

TYPE: Optional[str] DEFAULT: None

Source code in asyncz/schedulers/base.py
def delete_task(
    self, task_id: Union[TaskType, str, None], store: Optional[str] = None
) -> None:
    """
    Removes a task, preventing it from being run anymore.

    Args:
        task_id: The identifier of the task.
        store: Alias of the task store that most likely contains the task.
    """
    if isinstance(task_id, TaskType):
        task_id = task_id.id
    if not task_id:
        return
    store_alias = None

    with self.store_lock:
        if self.state == SchedulerState.STATE_STOPPED:
            for index, (task, _, _) in enumerate(self.pending_tasks):
                if task.id == task_id and store in (None, task.store_alias):
                    del self.pending_tasks[index]
                    store_alias = task.store_alias
                    break
        else:
            for alias, _store in self.stores.items():
                if store in (None, alias):
                    try:
                        _store.delete_task(task_id)
                        store_alias = alias
                        break
                    except TaskLookupError:
                        continue

    if store_alias is None:
        raise TaskLookupError(task_id)

    event = TaskEvent(code=TASK_REMOVED, task_id=task_id, store=store_alias)
    self.dispatch_event(event)

    self.loggers[self.logger_name].info(f"Removed task {task_id}.")

remove_all_tasks

remove_all_tasks(store)

Removes all tasks from the specified task store, or all task stores if none is given.

Source code in asyncz/schedulers/base.py
def remove_all_tasks(self, store: Optional[str]) -> None:
    """
    Removes all tasks from the specified task store, or all task stores if none is given.
    """
    with self.store_lock:
        if self.state == SchedulerState.STATE_STOPPED:
            if store:
                self.pending_tasks = [
                    pending
                    for pending in self.pending_tasks
                    if pending[0].store_alias != store
                ]
            else:
                self.pending_tasks = []
        else:
            for alias, _store in self.stores.items():
                if store in (None, alias):
                    _store.remove_all_tasks()
    self.dispatch_event(SchedulerEvent(code=ALL_TASKS_REMOVED, alias=store))

create_default_store

create_default_store()

Creates a default store, specific to the particular scheduler type.

Source code in asyncz/schedulers/base.py
def create_default_store(self) -> StoreType:
    """
    Creates a default store, specific to the particular scheduler type.
    """
    return MemoryStore()

lookup_executor

lookup_executor(alias)

Returns the executor instance by the given name from the list of executors that were added to this scheduler.

PARAMETER DESCRIPTION
alias

The alias for the instance.

TYPE: str

Source code in asyncz/schedulers/base.py
def lookup_executor(self, alias: str) -> ExecutorType:
    """
    Returns the executor instance by the given name from the list of executors that were added
    to this scheduler.

    Args:
        alias: The alias for the instance.
    """
    try:
        return self.executors[alias]
    except KeyError:
        raise KeyError(f"No such executor: {alias}.") from None

lookup_store

lookup_store(alias)

Returns the task store instance by the given name from the list of task stores that were added to this scheduler.

PARAMETER DESCRIPTION
alias

The alias for the instance.

TYPE: str

Source code in asyncz/schedulers/base.py
def lookup_store(self, alias: str) -> StoreType:
    """
    Returns the task store instance by the given name from the list of task stores that were
    added to this scheduler.

    Args:
        alias: The alias for the instance.
    """
    try:
        return self.stores[alias]
    except KeyError:
        raise KeyError(f"No such store: {alias}.") from None

lookup_task

lookup_task(task_id, store_alias)

Finds a task by its ID.

PARAMETER DESCRIPTION
task_id

The id of the task to lookup.

TYPE: str

alias

Alias of a task store to look in.

Source code in asyncz/schedulers/base.py
def lookup_task(
    self, task_id: str, store_alias: Optional[str]
) -> tuple[TaskType, Optional[str]]:
    """
    Finds a task by its ID.

    Args:
        task_id: The id of the task to lookup.
        alias: Alias of a task store to look in.
    """
    if self.state == SchedulerState.STATE_STOPPED:
        for task, _, _ in self.pending_tasks:
            if task.id == task_id and store_alias in (None, task.store_alias):
                return task, None
    else:
        for alias, store in self.stores.items():
            if store_alias in (None, alias):
                task2 = store.lookup_task(task_id)
                if task2 is not None:
                    return task2, alias

    raise TaskLookupError(task_id)

dispatch_event

dispatch_event(event)

Dispatches the given event to interested listeners.

PARAMETER DESCRIPTION
event

The SchedulerEvent to be sent.

TYPE: SchedulerEvent

Source code in asyncz/schedulers/base.py
def dispatch_event(self, event: SchedulerEvent) -> None:
    """
    Dispatches the given event to interested listeners.

    Args:
        event: The SchedulerEvent to be sent.
    """
    with self.listeners_lock:
        listeners = tuple(self.listeners)

    for callback, mask in listeners:
        if event.code & mask:
            try:
                callback(event)
            except BaseException:
                self.loggers[self.logger_name].exception("Error notifying listener.")

create_lock

create_lock()

Creates a reentrant lock object.

Source code in asyncz/schedulers/base.py
def create_lock(self) -> RLock:
    """
    Creates a reentrant lock object.
    """
    return RLock()

process_tasks

process_tasks()

Iterates through tasks in every store, starts tasks that are due and figures out how long to wait for the next round.

If the get_due_tasks() call raises an exception, a new wakeup is scheduled in at least store_retry_interval seconds.

Source code in asyncz/schedulers/base.py
def process_tasks(self) -> Optional[float]:
    """
    Iterates through tasks in every store, starts tasks that are due and figures out how long
    to wait for the next round.

    If the get_due_tasks() call raises an exception, a new wakeup is scheduled in at least
    store_retry_interval seconds.
    """
    if self.state == SchedulerState.STATE_PAUSED:
        self.loggers[self.logger_name].debug("Scheduler is paused. Not processing tasks.")
        return None

    self.loggers[self.logger_name].debug("Looking for tasks to run.")
    now = datetime.now(self.timezone)
    next_wakeup_time: Optional[datetime] = None
    events: list[SchedulerEvent] = []

    # check for other processing thread
    with self.store_processing_lock.protected(blocking=False) as blocking_success:
        if blocking_success:
            # threading lock
            with self.store_lock:
                for store_alias, store in self.stores.items():
                    next_wakeup_time = self._process_tasks_of_store(
                        now, next_wakeup_time, store_alias, store, events
                    )
        else:
            retry_wakeup_time = now + timedelta(seconds=self.store_retry_interval)
            if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
                next_wakeup_time = retry_wakeup_time

    for event in events:
        self.dispatch_event(event)

    wait_seconds: Optional[float] = None
    if self.state == SchedulerState.STATE_PAUSED:
        self.loggers[self.logger_name].debug(
            "Scheduler is paused. Waiting until resume() is called."
        )
    elif next_wakeup_time is None:
        if self.lock_path:
            wait_seconds = self.store_retry_interval
            self.loggers[self.logger_name].debug(f"No tasks found. Recheck in {wait_seconds}.")
        else:
            self.loggers[self.logger_name].debug("No tasks. Waiting until task is added.")

    else:
        wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
        self.loggers[self.logger_name].debug(
            f"Next wakeup is due at {next_wakeup_time} (in {wait_seconds} seconds)."
        )
    return wait_seconds

setup

setup(global_config=None, prefix='asyncz.', **options)

Reconfigures the scheduler with the given options. Can only be done when the scheduler isn't running.

PARAMETER DESCRIPTION
global_config

a "global" configuration dictionary whose values can be overridden by keyword arguments to this method.

TYPE: Optional[dict[str, Any]] DEFAULT: None

prefix: pick only those keys from global_config that are prefixed with this string (pass an empty string or None to use all keys).

Source code in asyncz/schedulers/base.py
def setup(
    self,
    global_config: Optional[dict[str, Any]] = None,
    prefix: Optional[str] = "asyncz.",
    **options: Any,
) -> None:
    """
    Reconfigures the scheduler with the given options.
    Can only be done when the scheduler isn't running.

    Args:
        global_config: a "global" configuration dictionary whose values can be overridden by
            keyword arguments to this method.
        :prefix: pick only those keys from global_config that are prefixed with
            this string (pass an empty string or None to use all keys).
    """
    if global_config is None:
        global_config = {}

    if self.state != SchedulerState.STATE_STOPPED:
        raise SchedulerAlreadyRunningError()

    if prefix:
        prefix_length = len(prefix)
        global_config = {
            key[prefix_length:]: value
            for key, value in global_config.items()
            if key.startswith(prefix)
        }

    config: dict[str, Any] = {}
    for key, value in global_config.items():
        parts = key.split(".")
        parent = config
        key = parts.pop(0)
        while parts:
            parent = parent.setdefault(key, {})
            key = parts.pop(0)
        parent[key] = value

    config.update(options)
    self._setup(config)

inc_refcount

inc_refcount()
Source code in asyncz/schedulers/base.py
def inc_refcount(self) -> bool:
    with self.ref_lock:
        self.ref_counter += 1
        # first start with 1
        if self.ref_counter > 1:
            return False
    return True

decr_refcount

decr_refcount()
Source code in asyncz/schedulers/base.py
def decr_refcount(self) -> bool:
    with self.ref_lock:
        self.ref_counter -= 1
        # first start with 0
        if self.ref_counter > 0:
            return False
    return True

handle_shutdown_coros

handle_shutdown_coros(coros)
Source code in asyncz/schedulers/base.py
def handle_shutdown_coros(self, coros: Sequence[Any]) -> None:
    if coros:
        self.event_loop.call_soon_threadsafe(asyncio.gather, *coros)

asgi

asgi(
    app: None,
    handle_lifespan: bool = False,
    wait: bool = False,
) -> Callable[[ASGIApp], ASGIApp]
asgi(
    app: ASGIApp,
    handle_lifespan: bool = False,
    wait: bool = False,
) -> ASGIApp
asgi(app=None, handle_lifespan=False, wait=False)

Return wrapper for asgi integration.

Source code in asyncz/schedulers/base.py
def asgi(
    self,
    app: Optional[ASGIApp] = None,
    handle_lifespan: bool = False,
    wait: bool = False,
) -> Union[ASGIApp, Callable[[ASGIApp], ASGIApp]]:
    """Return wrapper for asgi integration."""

    async def shutdown() -> None:
        result: Any = self.shutdown(wait)
        if isawaitable(result):
            await result

    async def setup() -> contextlib.AsyncExitStack:
        cm = contextlib.AsyncExitStack()
        result: Any = self.start()
        if isawaitable(result):
            await result
        cm.push_async_callback(shutdown)
        return cm

    return LifespanHook(app, setup=setup, do_forward=not handle_lifespan)

check_uwsgi

check_uwsgi()

Check if we are running under uWSGI with threads disabled.

Source code in asyncz/schedulers/base.py
def check_uwsgi(self) -> None:
    """
    Check if we are running under uWSGI with threads disabled.
    """
    uwsgi_module = sys.modules.get("uwsgi")
    if not getattr(uwsgi_module, "has_threads", True):
        raise RuntimeError(
            "The scheduler seems to be running under uWSGI, but threads have "
            "been disabled. You must run uWSGI with the --enable-threads "
            "option for the scheduler to work."
        )

real_add_task

real_add_task(
    task, replace_existing, start_task, min_dtime=None
)

Adds the task.

PARAMETER DESCRIPTION
task

Task instance.

TYPE: TaskType

store_alias

The alias of the store to add the task to.

replace_existing

The flag indicating the replacement of the task.

TYPE: bool

Source code in asyncz/schedulers/base.py
def real_add_task(
    self,
    task: TaskType,
    replace_existing: bool,
    start_task: bool,
    min_dtime: datetime | None = None,
) -> None:
    """
    Adds the task.

    Args:
        task: Task instance.
        store_alias: The alias of the store to add the task to.
        replace_existing: The flag indicating the replacement of the task.
    """
    assert task.trigger is not None, "Submitted task has no trigger set."
    assert task.store_alias is not None, "Submitted task has no store_alias set."
    assert task.executor is not None, "Submitted task has no executor set."
    replacements: dict[str, Any] = {}

    # Calculate the next run time if there is none defined
    if task.next_run_time is None and start_task:
        now = datetime.now(self.timezone)
        replacements["next_run_time"] = task.trigger.get_next_trigger_time(
            self.timezone, None, now
        )
        if min_dtime is not None and isinstance(replacements["next_run_time"], datetime):
            replacements["next_run_time"] = max(min_dtime, replacements["next_run_time"])
    elif isinstance(task.next_run_time, datetime) and min_dtime is not None:
        replacements["next_run_time"] = max(min_dtime, task.next_run_time)

    # Apply replacements
    task.update_task(**replacements)
    # Add the task to the given store
    store = self.lookup_store(task.store_alias)
    try:
        store.add_task(task)
    except ConflictIdError as exc:
        if replace_existing:
            try:
                store.update_task(task)
            except TaskLookupError:
                # was executed and is now gone
                return
        else:
            raise exc
    task.pending = False

    event = TaskEvent(code=TASK_ADDED, task_id=task.id, alias=task.store_alias)
    self.dispatch_event(event)

    self.loggers[self.logger_name].info(
        f"Added task '{task.name}' to store '{task.store_alias}'."
    )

    # Notify the scheduler about the new task.
    if start_task and self.state == SchedulerState.STATE_RUNNING:
        self.wakeup()

resolve_load_plugin classmethod

resolve_load_plugin(module_name)

Resolve the plugin from its module and attrs.

Source code in asyncz/schedulers/base.py
@classmethod
def resolve_load_plugin(cls, module_name: str) -> Any:
    """
    Resolve the plugin from its module and attrs.
    """
    try:
        module_path, class_name = module_name.rsplit(":", 1)
    except ValueError as err:
        raise ImportError(f"{module_name} doesn't look like a module path") from err

    module = import_module(module_path)

    try:
        return getattr(module, class_name)
    except AttributeError as exc:
        raise ImportError(str(exc)) from exc

create_plugin_instance

create_plugin_instance(_type, alias, constructor_args)

Creates an instance of the given plugin type, loading the plugin first if necessary.

Source code in asyncz/schedulers/base.py
def create_plugin_instance(self, _type: str, alias: str, constructor_args: Any) -> Any:
    """
    Creates an instance of the given plugin type, loading the plugin first if necessary.
    """
    plugin_container, class_container, base_class = {
        "trigger": (self.trigger_plugins, self.trigger_classes, TriggerType),
        "store": (self.store_plugins, self.store_classes, StoreType),
        "executor": (self.executor_plugins, self.executor_classes, ExecutorType),
    }[_type]

    try:
        plugin_cls = class_container[alias]  # type: ignore
    except KeyError:
        if alias in plugin_container:
            # plugin_cls = class_container[alias] = plugin_container[alias].load()
            plugin_cls = class_container[alias] = self.resolve_load_plugin(  # type: ignore
                plugin_container[alias]
            )
            if not issubclass(plugin_cls, base_class):
                raise TypeError(
                    f"The {format(_type)} entry point does not point to a {format(_type)} class."
                ) from None
        else:
            raise LookupError(f"No {_type} by the name '{alias}' was found.") from None

    return plugin_cls(**constructor_args)

create_trigger

create_trigger(trigger, trigger_args)

Creates a trigger.

Source code in asyncz/schedulers/base.py
def create_trigger(
    self, trigger: Union[TriggerType, str, None], trigger_args: Any
) -> TriggerType:
    """
    Creates a trigger.
    """
    if isinstance(trigger, TriggerType):
        return trigger
    elif trigger is None:
        trigger = "date"
    elif not isinstance(trigger, str):
        raise TypeError(
            f"Expected a trigger instance or string, got '{trigger.__class__.__name__}' instead."
        )

    trigger_args.setdefault("timezone", self.timezone)

    return cast("TriggerType", self.create_plugin_instance("trigger", trigger, trigger_args))

create_processing_lock

create_processing_lock()

Creates a non-reentrant lock object used to distribute between threads for processing.

Source code in asyncz/schedulers/base.py
def create_processing_lock(self) -> LockProtectedProtocol:
    """
    Creates a non-reentrant lock object used to distribute between threads for processing.
    """
    return RLockProtected()

asyncz.schedulers.asyncio.NativeAsyncIOScheduler

NativeAsyncIOScheduler(global_config=None, **kwargs)

Bases: AsyncIOScheduler

A scheduler that runs on an existing asyncio event loop.

This scheduler is typically to run with asyncio which means that any ASGI framework can also use it internally if needed. For example, Ravyn and Starlette.

PARAMETER DESCRIPTION
isolated_event_loop

Use a fresh, isolated event_loop instead the existing.

Source code in asyncz/schedulers/base.py
def __init__(
    self,
    global_config: Optional[Any] = None,
    **kwargs: Any,
) -> None:
    super().__init__()
    self.trigger_plugins = dict(defaults.triggers.items())
    self.trigger_classes: dict[str, type[TriggerType]] = {}
    self.executor_plugins: dict[str, str] = dict(defaults.executors.items())
    self.executor_classes: dict[str, type[ExecutorType]] = {}
    self.store_plugins: dict[str, str] = dict(defaults.stores.items())
    self.store_classes: dict[str, type[StoreType]] = {}
    self.executors: dict[str, ExecutorType] = {}
    self.executor_lock: RLock = self.create_lock()
    self.stores: dict[str, StoreType] = {}
    self.store_processing_lock: LockProtectedProtocol = self.create_processing_lock()
    self.store_lock: RLock = self.create_lock()
    self.listeners: list[Any] = []
    self.listeners_lock: RLock = self.create_lock()
    self.pending_tasks: list[tuple[TaskType, bool, bool]] = []
    self.state: Union[SchedulerState, Any] = SchedulerState.STATE_STOPPED

    self.ref_counter: int = 0
    self.ref_lock: Lock = Lock()
    self.instances: dict[str, int] = defaultdict(lambda: 0)
    self.setup(global_config, **kwargs)

event_loop class-attribute instance-attribute

event_loop = None

loggers instance-attribute

loggers

instances instance-attribute

instances = defaultdict(lambda: 0)

running property

running

Return True if the scheduler has been started. This is a shortcut for scheduler.state != SchedulerState.STATE_STOPPED.

trigger_plugins instance-attribute

trigger_plugins = dict(items())

trigger_classes instance-attribute

trigger_classes = {}

executor_plugins instance-attribute

executor_plugins = dict(items())

executor_classes instance-attribute

executor_classes = {}

store_plugins instance-attribute

store_plugins = dict(items())

store_classes instance-attribute

store_classes = {}

executors instance-attribute

executors = {}

executor_lock instance-attribute

executor_lock = create_lock()

stores instance-attribute

stores = {}

store_processing_lock instance-attribute

store_processing_lock = create_processing_lock()

store_lock instance-attribute

store_lock = create_lock()

listeners instance-attribute

listeners = []

listeners_lock instance-attribute

listeners_lock = create_lock()

pending_tasks instance-attribute

pending_tasks = []

state instance-attribute

state = STATE_STOPPED

ref_counter instance-attribute

ref_counter = 0

ref_lock instance-attribute

ref_lock = Lock()

isolated_event_loop class-attribute instance-attribute

isolated_event_loop = False

event_loop_thread class-attribute instance-attribute

event_loop_thread = None

timer class-attribute instance-attribute

timer = None

start async

start(paused=False)
Source code in asyncz/schedulers/asyncio.py
async def start(self, paused: bool = False) -> bool:  # type: ignore
    if self.isolated_event_loop:
        event = Event()
        self.event_loop_thread = Thread(target=self._init_new_loop, args=[event], daemon=True)
        self.event_loop_thread.start()
        await asyncio.to_thread(event.wait)
    else:
        self.event_loop = asyncio.get_running_loop()

    return super(AsyncIOScheduler, self).start(paused)

handle_shutdown_coros

handle_shutdown_coros(coros)
Source code in asyncz/schedulers/asyncio.py
def handle_shutdown_coros(self, coros: Sequence[Any]) -> None:
    if coros:
        self._shutdown_handle = asyncio.gather(*coros)
    else:
        self._shutdown_handle = None

shutdown async

shutdown(wait=True)
Source code in asyncz/schedulers/asyncio.py
async def shutdown(self, wait: bool = True) -> bool:  # type: ignore
    # not decremented yet so +1
    result = self.ref_counter
    if super(AsyncIOScheduler, self).shutdown(wait):
        handle = self._shutdown_handle
        if handle is not None:
            self._shutdown_handle = None
            await handle
        self.stop_timer()
        thread = self.event_loop_thread
        if thread:
            self.event_loop.stop()
            self.event_loop = self.event_loop_thread = None
            await asyncio.to_thread(thread.join)
    return result > 1

pause

pause()

Pause task processing in the scheduler.

This will prevent the scheduler from waking up to do task processing until resume is called. It will not however stop any already running task processing.

Source code in asyncz/schedulers/base.py
def pause(self) -> None:
    """
    Pause task processing in the scheduler.

    This will prevent the scheduler from waking up to do task processing until resume
    is called. It will not however stop any already running task processing.
    """
    if self.state == SchedulerState.STATE_STOPPED:
        raise SchedulerNotRunningError()
    elif self.state == SchedulerState.STATE_RUNNING:
        self.state = SchedulerState.STATE_PAUSED
        self.loggers[self.logger_name].info("Paused scheduler task processing.")
        self.dispatch_event(SchedulerEvent(code=SCHEDULER_PAUSED))

resume

resume()

Resume task processing in the scheduler.

Source code in asyncz/schedulers/base.py
def resume(self) -> None:
    """
    Resume task processing in the scheduler.
    """
    if self.state == SchedulerState.STATE_STOPPED:
        raise SchedulerNotRunningError
    elif self.state == SchedulerState.STATE_PAUSED:
        self.state = SchedulerState.STATE_RUNNING
        self.loggers[self.logger_name].info("Resumed scheduler task processing.")
        self.dispatch_event(SchedulerEvent(code=SCHEDULER_RESUMED))
        self.wakeup()

add_executor

add_executor(executor, alias='default', **executor_options)
Source code in asyncz/schedulers/base.py
def add_executor(
    self,
    executor: Union[ExecutorType, str],
    alias: str = "default",
    **executor_options: Any,
) -> None:
    with self.executor_lock:
        if alias in self.executors:
            raise ValueError(
                f"This scheduler already has an executor by the alias of '{alias}'."
            )

        if isinstance(executor, ExecutorType):
            self.executors[alias] = executor
        elif isinstance(executor, str):
            self.executors[alias] = executor = self.create_plugin_instance(
                PluginInstance.EXECUTOR, executor, executor_options
            )
        else:
            raise TypeError(
                f"Expected an executor instance or a string, got {executor.__class__.__name__} instead."
            )
        if self.state != SchedulerState.STATE_STOPPED:
            cast("ExecutorType", executor).start(self, alias)

    self.dispatch_event(SchedulerEvent(code=EXECUTOR_ADDED, alias=alias))

remove_executor

remove_executor(alias, shutdown=True)

Removes the executor by the given alias from this scheduler.

Source code in asyncz/schedulers/base.py
def remove_executor(self, alias: str, shutdown: bool = True) -> None:
    """
    Removes the executor by the given alias from this scheduler.
    """
    with self.executor_lock:
        executor = self.lookup_executor(alias)
        del self.executors[alias]

    if shutdown:
        executor.shutdown()

    self.dispatch_event(SchedulerEvent(code=EXECUTOR_REMOVED, alias=alias))

add_store

add_store(store, alias='default', **store_options)

Adds a task store to this scheduler.

Any extra keyword arguments will be passed to the task store plugin's constructor, assuming that the first argument is the name of a task store plugin.

Source code in asyncz/schedulers/base.py
def add_store(
    self, store: Union[StoreType, str], alias: str = "default", **store_options: Any
) -> None:
    """
    Adds a task store to this scheduler.

    Any extra keyword arguments will be passed to the task store plugin's constructor, assuming
    that the first argument is the name of a task store plugin.
    """
    with self.store_lock:
        if alias in self.stores:
            raise ValueError(
                f"This scheduler already has a task store by the alias of '{alias}'."
            )

        if isinstance(store, StoreType):
            self.stores[alias] = store
        elif isinstance(store, str):
            self.stores[alias] = store = self.create_plugin_instance(
                PluginInstance.STORE, store, store_options
            )
        else:
            raise TypeError(
                f"Expected a task store instance or a string, got {store.__class__.__name__} instead."
            )

        if self.state != SchedulerState.STATE_STOPPED:
            cast("StoreType", store).start(self, alias)

    self.dispatch_event(SchedulerEvent(code=STORE_ADDED, alias=alias))

    if self.state == SchedulerState.STATE_RUNNING:
        self.wakeup()

remove_store

remove_store(alias, shutdown=True)

Removes the task store by the given alias from this scheduler.

Source code in asyncz/schedulers/base.py
def remove_store(self, alias: str, shutdown: bool = True) -> None:
    """
    Removes the task store by the given alias from this scheduler.
    """
    with self.store_lock:
        store = self.lookup_store(alias)
        del self.stores[alias]

    if shutdown:
        store.shutdown()

    self.dispatch_event(SchedulerEvent(code=STORE_REMOVED, alias=alias))

add_listener

add_listener(callback, mask=ALL_EVENTS)

add_listener(callback, mask=EVENT_ALL)

Adds a listener for scheduler events.

When a matching event occurs, callback is executed with the event object as its sole argument. If the mask parameter is not provided, the callback will receive events of all types.

PARAMETER DESCRIPTION
callback

any callable that takes one argument.

TYPE: Any

mask

bitmask that indicates which events should be listened to.

TYPE: Union[int, str] DEFAULT: ALL_EVENTS

Source code in asyncz/schedulers/base.py
def add_listener(self, callback: Any, mask: Union[int, str] = ALL_EVENTS) -> None:
    """
    add_listener(callback, mask=EVENT_ALL)

    Adds a listener for scheduler events.

    When a matching event  occurs, callback is executed with the event object as its
    sole argument. If the mask parameter is not provided, the callback will receive events
    of all types.

    Args:
        callback: any callable that takes one argument.
        mask: bitmask that indicates which events should be listened to.
    """
    with self.listeners_lock:
        self.listeners.append((callback, mask))

remove_listener

remove_listener(callback)

Removes a previously added event listener.

Source code in asyncz/schedulers/base.py
def remove_listener(self, callback: Any) -> bool:
    """
    Removes a previously added event listener.
    """
    with self.listeners_lock:
        for index, (_callback, _) in enumerate(self.listeners):
            if callback == _callback:
                del self.listeners[index]
                return True
    return False

add_task

add_task(
    fn_or_task=None,
    trigger=None,
    args=None,
    kwargs=None,
    id=None,
    name=None,
    mistrigger_grace_time=undefined,
    coalesce=undefined,
    max_instances=undefined,
    next_run_time=undefined,
    store=None,
    executor=None,
    replace_existing=False,
    fn=None,
    **trigger_args,
)

Adds the given task to the task list and wakes up the scheduler if it's already running.

Any option that defaults to undefined will be replaced with the corresponding default value when the task is scheduled (which happens when the scheduler is started, or immediately if the scheduler is already running).

The fn argument can be given either as a callable object or a textual reference in the package.module:some.object format, where the first half (separated by :) is an importable module and the second half is a reference to the callable object, relative to the module.

The trigger argument can either be

. The alias name of the trigger (e.g. date, interval or cron), in which case any extra keyword arguments to this method are passed on to the trigger's constructor. . An instance of a trigger class (TriggerType).

PARAMETER DESCRIPTION
fn

Callable (or a textual reference to one) to run at the given time.

TYPE: Optional[Any] DEFAULT: None

trigger

Trigger instance that determines when fn is called.

TYPE: Optional[Union[TriggerType, str]] DEFAULT: None

args

List of positional arguments to call fn with.

TYPE: Optional[Any] DEFAULT: None

kwargs

Dict of keyword arguments to call fn with.

TYPE: Optional[Any] DEFAULT: None

id

Explicit identifier for the task (for modifying it later).

TYPE: Optional[str] DEFAULT: None

name

Textual description of the task.

TYPE: Optional[str] DEFAULT: None

mistriger_grace_time

Seconds after the designated runtime that the task is still allowed to be run (or None to allow the task to run no matter how late it is).

coalesce

Run once instead of many times if the scheduler determines that the task should be run more than once in succession.

TYPE: Union[bool, Undefined] DEFAULT: undefined

max_instances

Maximum number of concurrently running instances allowed for this task.

TYPE: Union[int, Undefined, None] DEFAULT: undefined

next_run_time

When to first run the task, regardless of the trigger (pass None to add the task as paused).

TYPE: Union[datetime, str, Undefined, None] DEFAULT: undefined

store

Alias of the task store to store the task in.

TYPE: Union[str, Undefined, None] DEFAULT: None

executor

Alias of the executor to run the task with.

TYPE: Union[str, Undefined, None] DEFAULT: None

replace_existing

True to replace an existing task with the same id (but retain the number of runs from the existing one).

TYPE: bool DEFAULT: False

Source code in asyncz/schedulers/base.py
def add_task(
    self,
    fn_or_task: Optional[Union[Callable[..., Any], TaskType]] = None,
    trigger: Optional[Union[TriggerType, str]] = None,
    args: Optional[Any] = None,
    kwargs: Optional[Any] = None,
    id: Optional[str] = None,
    name: Optional[str] = None,
    mistrigger_grace_time: Union[int, Undefined, None] = undefined,
    coalesce: Union[bool, Undefined] = undefined,
    max_instances: Union[int, Undefined, None] = undefined,
    next_run_time: Union[datetime, str, Undefined, None] = undefined,
    store: Union[str, Undefined, None] = None,
    executor: Union[str, Undefined, None] = None,
    replace_existing: bool = False,
    # old name
    fn: Optional[Any] = None,
    **trigger_args: Any,
) -> TaskType:
    """
    Adds the given task to the task list and wakes up the scheduler if it's already running.

    Any option that defaults to undefined will be replaced with the corresponding default
    value when the task is scheduled (which happens when the scheduler is started, or
    immediately if the scheduler is already running).

    The fn argument can be given either as a callable object or a textual reference in
    the package.module:some.object format, where the first half (separated by :) is an
    importable module and the second half is a reference to the callable object, relative to
    the module.

    The trigger argument can either be:
      . The alias name of the trigger (e.g. date, interval or cron), in which case
        any extra keyword arguments to this method are passed on to the trigger's constructor.
      . An instance of a trigger class (TriggerType).


    Args:
        fn: Callable (or a textual reference to one) to run at the given time.
        trigger: Trigger instance that determines when fn is called.
        args: List of positional arguments to call fn with.
        kwargs: Dict of keyword arguments to call fn with.
        id: Explicit identifier for the task (for modifying it later).
        name: Textual description of the task.
        mistriger_grace_time: Seconds after the designated runtime that the task is still
            allowed to be run (or None to allow the task to run no matter how late it is).
        coalesce: Run once instead of many times if the scheduler determines that the
            task should be run more than once in succession.
        max_instances: Maximum number of concurrently running instances allowed for this task.
        next_run_time: When to first run the task, regardless of the trigger (pass
            None to add the task as paused).
        store: Alias of the task store to store the task in.
        executor: Alias of the executor to run the task with.
        replace_existing: True to replace an existing task with the same id
                          (but retain the number of runs from the existing one).
    """
    from asyncz.tasks import Task

    if fn is not None:
        warnings.warn(
            "The parameter 'fn' was renamed to 'fn_or_task'",
            DeprecationWarning,
            stacklevel=2,
        )
        fn_or_task = fn
    if isinstance(fn_or_task, TaskType):
        assert fn_or_task.submitted is False, "Can submit tasks only once"
        fn_or_task.submitted = True
        # tweak task before submitting
        # WARNING: in contrast to the decorator mode this really updates the task
        # by providing an id (e.g. autogenerated) you can make a real task from decorator type task while submitting
        task_update_kwargs: dict[str, Any] = {
            "scheduler": self,
            "args": tuple(args) if args is not None else undefined,
            "kwargs": dict(kwargs) if kwargs is not None else undefined,
            "id": id or undefined,
            "name": name or undefined,
            "mistrigger_grace_time": mistrigger_grace_time,
            "coalesce": coalesce,
            "max_instances": max_instances,
            "next_run_time": next_run_time,
            "executor": executor if executor is not None else undefined,
            "store_alias": store if store is not None else undefined,
        }
        if trigger:
            task_update_kwargs["trigger"] = self.create_trigger(trigger, trigger_args)
            # WARNING: when submitting a task object allow_mistrigger_by_default has no effect
        task_update_kwargs = {
            key: value for key, value in task_update_kwargs.items() if value is not undefined
        }
        fn_or_task.update_task(**task_update_kwargs)
        # fallback if still not set. Set manually executor and store_alias to default.
        if fn_or_task.executor is None:
            fn_or_task.executor = "default"
        if fn_or_task.store_alias is None:
            fn_or_task.store_alias = "default"
        assert fn_or_task.trigger is not None, "Cannot submit a task without a trigger."
        assert fn_or_task.id is not None, "Cannot submit a decorator type task."
        with self.store_lock:
            if self.state == SchedulerState.STATE_STOPPED:
                self.pending_tasks.append(
                    (
                        fn_or_task,
                        replace_existing,
                        next_run_time is not None,
                    )
                )
                self.loggers[self.logger_name].info(
                    "Adding task tentatively. It will be properly scheduled when the scheduler starts."
                )
            else:
                self.real_add_task(
                    fn_or_task, replace_existing, next_run_time is not None, None
                )
        return fn_or_task
    task_kwargs: dict[str, Any] = {
        "scheduler": self,
        "trigger": self.create_trigger(trigger, trigger_args),
        "fn": fn_or_task,
        "args": tuple(args) if args is not None else (),
        "kwargs": dict(kwargs) if kwargs is not None else {},
        "id": id,
        "name": name,
        "mistrigger_grace_time": mistrigger_grace_time,
        "coalesce": coalesce,
        "max_instances": max_instances,
        "next_run_time": next_run_time,
        "executor": executor if executor is not None else undefined,
        "store_alias": store if store is not None else undefined,
    }
    task_kwargs = {key: value for key, value in task_kwargs.items() if value is not undefined}
    if task_kwargs["trigger"].allow_mistrigger_by_default:
        # we want to be able, to just use add_task, and the task is scheduled
        task_kwargs.setdefault("mistrigger_grace_time", None)
    for key, value in self.task_defaults.model_dump(exclude_none=True).items():
        task_kwargs.setdefault(key, value)

    task = Task(**task_kwargs)
    if task.fn is not None:
        return self.add_task(
            task, replace_existing=replace_existing, next_run_time=next_run_time
        )
    return task

update_task

update_task(task_id, store=None, **updates)

Modifies the properties of a single task.

Modifications are passed to this method as extra keyword arguments.

PARAMETER DESCRIPTION
task_id

The identifier of the task.

TYPE: Union[TaskType, str]

store

Alias of the store that contains the task.

TYPE: Optional[str] DEFAULT: None

Source code in asyncz/schedulers/base.py
def update_task(
    self, task_id: Union[TaskType, str], store: Optional[str] = None, **updates: Any
) -> TaskType:
    """
    Modifies the properties of a single task.

    Modifications are passed to this method as extra keyword arguments.

    Args:
        task_id: The identifier of the task.
        store: Alias of the store that contains the task.
    """
    if isinstance(task_id, TaskType):
        assert task_id.id, "Cannot update a decorator type Task"
        new_updates = task_id.model_dump()
        new_updates.update(**updates)
        task_id = task_id.id
    else:
        new_updates = updates

    with self.store_lock:
        task, store = self.lookup_task(task_id, store)
        task.update_task(scheduler=self, **new_updates)

        if store:
            self.lookup_store(store).update_task(task)

    self.dispatch_event(TaskEvent(code=TASK_MODIFIED, task_id=task_id, store=store))

    if self.state == SchedulerState.STATE_RUNNING:
        self.wakeup()
    return task

reschedule_task

reschedule_task(
    task_id, store=None, trigger=None, **trigger_args
)

Constructs a new trigger for a task and updates its next run time.

Extra keyword arguments are passed directly to the trigger's constructor.

PARAMETER DESCRIPTION
task_id

The identifier of the task.

TYPE: Union[TaskType, str]

store

Alias of the task store that contains the task.

TYPE: Optional[str] DEFAULT: None

trigger

Alias of the trigger type or a trigger instance.

TYPE: Optional[Union[str, TriggerType]] DEFAULT: None

Source code in asyncz/schedulers/base.py
def reschedule_task(
    self,
    task_id: Union[TaskType, str],
    store: Optional[str] = None,
    trigger: Optional[Union[str, TriggerType]] = None,
    **trigger_args: Any,
) -> TaskType:
    """
    Constructs a new trigger for a task and updates its next run time.

    Extra keyword arguments are passed directly to the trigger's constructor.

    Args:
        task_id: The identifier of the task.
        store: Alias of the task store that contains the task.
        trigger: Alias of the trigger type or a trigger instance.
    """
    trigger = self.create_trigger(trigger, trigger_args)
    now = datetime.now(self.timezone)
    next_run_time = trigger.get_next_trigger_time(self.timezone, None, now)
    return self.update_task(task_id, store, trigger=trigger, next_run_time=next_run_time)

pause_task

pause_task(task_id, store=None)

Causes the given task not to be executed until it is explicitly resumed.

PARAMETER DESCRIPTION
task_id

The identifier of the task.

TYPE: Union[TaskType, str]

store

Alias of the task store that contains the task.

TYPE: Optional[str] DEFAULT: None

Source code in asyncz/schedulers/base.py
def pause_task(self, task_id: Union[TaskType, str], store: Optional[str] = None) -> TaskType:
    """
    Causes the given task not to be executed until it is explicitly resumed.

    Args:
        task_id: The identifier of the task.
        store: Alias of the task store that contains the task.
    """
    return self.update_task(task_id, store, next_run_time=None)

resume_task

resume_task(task_id, store=None)

Resumes the schedule of the given task, or removes the task if its schedule is finished.

PARAMETER DESCRIPTION
task_id

The identifier of the task.

TYPE: Union[TaskType, str]

store

Alias of the task store that contains the task.

TYPE: Optional[str] DEFAULT: None

Source code in asyncz/schedulers/base.py
def resume_task(
    self, task_id: Union[TaskType, str], store: Optional[str] = None
) -> Union[TaskType, None]:
    """
    Resumes the schedule of the given task, or removes the task if its schedule is finished.

    Args:
        task_id: The identifier of the task.
        store: Alias of the task store that contains the task.
    """
    if isinstance(task_id, TaskType):
        assert task_id.id, "Cannot resume decorator style Task"
        task_id = task_id.id
    with self.store_lock:
        task, store = self.lookup_task(task_id, store)
        now = datetime.now(self.timezone)
        next_run_time = task.trigger.get_next_trigger_time(self.timezone, None, now)  # type: ignore

        if next_run_time:
            return self.update_task(task_id, store, next_run_time=next_run_time)
        else:
            self.delete_task(task.id, store)
            return None

run_task

run_task(
    task_id, store=None, *, force=True, remove_finished=True
)

Submit a task to its configured executor immediately.

This method centralizes the "run now" behavior that operational surfaces such as the CLI and dashboard need. It reuses the task's configured executor, dispatches the same submission-related events that the main scheduler loop emits, and persists the task's updated schedule state.

PARAMETER DESCRIPTION
task_id

The task instance or identifier to execute immediately.

TYPE: Union[TaskType, str]

store

Optional preferred store alias when resolving the task.

TYPE: Optional[str] DEFAULT: None

force

When True and the trigger has no run times due yet, submit a one-off immediate execution at now.

TYPE: bool DEFAULT: True

remove_finished

When True and the trigger yields no further run times after this execution, delete the task. When False, preserve the task in a paused state.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Union[TaskType, None]

The updated task when it remains scheduled or paused, or None if

Union[TaskType, None]

it was removed because the schedule finished and remove_finished

Union[TaskType, None]

was requested.

RAISES DESCRIPTION
SchedulerNotRunningError

If the scheduler has not been started yet.

MaximumInstancesError

If the task is already at its executor's concurrency limit.

TaskLookupError

If the task cannot be found.

KeyError

If the configured executor cannot be found.

Source code in asyncz/schedulers/base.py
def run_task(
    self,
    task_id: Union[TaskType, str],
    store: Optional[str] = None,
    *,
    force: bool = True,
    remove_finished: bool = True,
) -> Union[TaskType, None]:
    """
    Submit a task to its configured executor immediately.

    This method centralizes the "run now" behavior that operational surfaces
    such as the CLI and dashboard need. It reuses the task's configured
    executor, dispatches the same submission-related events that the main
    scheduler loop emits, and persists the task's updated schedule state.

    Args:
        task_id: The task instance or identifier to execute immediately.
        store: Optional preferred store alias when resolving the task.
        force: When ``True`` and the trigger has no run times due yet, submit
            a one-off immediate execution at ``now``.
        remove_finished: When ``True`` and the trigger yields no further run
            times after this execution, delete the task. When ``False``,
            preserve the task in a paused state.

    Returns:
        The updated task when it remains scheduled or paused, or ``None`` if
        it was removed because the schedule finished and ``remove_finished``
        was requested.

    Raises:
        SchedulerNotRunningError: If the scheduler has not been started yet.
        MaximumInstancesError: If the task is already at its executor's
            concurrency limit.
        TaskLookupError: If the task cannot be found.
        KeyError: If the configured executor cannot be found.
    """

    if self.state == SchedulerState.STATE_STOPPED:
        raise SchedulerNotRunningError()

    if isinstance(task_id, TaskType):
        assert task_id.id, "Cannot run a decorator style Task."
        task_id = task_id.id

    task, store_alias = self.lookup_task(task_id, store)
    now = datetime.now(self.timezone)
    run_times = task.get_run_times(self.timezone, now)
    if not run_times:
        if not force:
            return task
        run_times = [now]

    assert task.executor is not None, "Task has no executor configured."
    executor = self.lookup_executor(task.executor)

    try:
        executor.send_task(task, run_times)
    except MaximumInstancesError:
        self.loggers[self.logger_name].warning(
            "Execution of task '%s' skipped: maximum running instances reached (%s).",
            task,
            task.max_instances,
        )
        self.dispatch_event(
            TaskSubmissionEvent(
                code=TASK_MAX_INSTANCES,
                task_id=task_id,
                store=store_alias,
                scheduled_run_times=run_times,
            )
        )
        raise
    except Exception:
        self.loggers[self.logger_name].exception(
            "Error submitting task '%s' to executor '%s'.",
            task,
            task.executor,
        )
        raise
    else:
        self.dispatch_event(
            TaskSubmissionEvent(
                code=TASK_SUBMITTED,
                task_id=task_id,
                store=store_alias,
                scheduled_run_times=run_times,
            )
        )

    next_run_time = task.trigger.get_next_trigger_time(  # type: ignore[union-attr]
        self.timezone,
        run_times[-1],
        now,
    )

    if next_run_time is not None and next_run_time <= now:
        # Manual runs must always advance strictly forward so admin actions do
        # not create zero-delay feedback loops in tests or dashboards.
        next_run_time = now + timedelta(milliseconds=1)

    if next_run_time is None and remove_finished:
        self.delete_task(task_id, store_alias)
        return None

    task.update_task(next_run_time=next_run_time)
    if store_alias is not None:
        with self.store_lock:
            self.lookup_store(store_alias).update_task(task)

    return task

get_tasks

get_tasks(store=None)

Returns a list of pending tasks (if the scheduler hasn't been started yet) and scheduled tasks, either from a specific task store or from all of them.

If the scheduler has not been started yet, only pending tasks can be returned because the task stores haven't been started yet either.

PARAMETER DESCRIPTION
store

alias of the task store.

TYPE: Optional[str] DEFAULT: None

Source code in asyncz/schedulers/base.py
def get_tasks(self, store: Optional[str] = None) -> list[TaskType]:
    """
    Returns a list of pending tasks (if the scheduler hasn't been started yet) and scheduled
    tasks, either from a specific task store or from all of them.

    If the scheduler has not been started yet, only pending tasks can be returned because the
    task stores haven't been started yet either.

    Args:
        store: alias of the task store.
    """
    with self.store_lock:
        tasks = []
        if self.state == SchedulerState.STATE_STOPPED:
            for task, _, _ in self.pending_tasks:
                if store is None or task.store_alias == store:
                    tasks.append(task)
        else:
            for alias, _store in self.stores.items():
                if store is None or alias == store:
                    tasks.extend(_store.get_all_tasks())

        return tasks

get_task

get_task(task_id, store=None)

Returms the Task that matches the given task_id.

PARAMETER DESCRIPTION
task_id

The identifier of the task.

TYPE: str

store

Alias of the task store that most likely contains the task.

TYPE: Optional[str] DEFAULT: None

Source code in asyncz/schedulers/base.py
def get_task(self, task_id: str, store: Optional[str] = None) -> Union[TaskType, None]:
    """
    Returms the Task that matches the given task_id.

    Args:
        task_id: The identifier of the task.
        store: Alias of the task store that most likely contains the task.
    """
    with self.store_lock:
        try:
            return self.lookup_task(task_id, store)[0]
        except TaskLookupError:
            return None

get_task_info

get_task_info(task_id, store=None)

Return an immutable inspection snapshot for a single task.

The returned snapshot is safe to expose in operational tooling because it is detached from the underlying task object and captures only observable scheduler metadata.

Source code in asyncz/schedulers/base.py
def get_task_info(self, task_id: str, store: Optional[str] = None) -> Union[TaskInfo, None]:
    """
    Return an immutable inspection snapshot for a single task.

    The returned snapshot is safe to expose in operational tooling because it
    is detached from the underlying task object and captures only observable
    scheduler metadata.
    """

    task = self.get_task(task_id, store)
    return task.snapshot() if task is not None else None

get_task_infos

get_task_infos(
    store=None,
    *,
    schedule_state=None,
    executor=None,
    trigger=None,
    q=None,
    sort_by="next_run_time",
    descending=False,
)

Return task snapshots with scheduler-native filtering and sorting.

This API is meant for read-oriented operational tooling. Instead of every consumer re-implementing its own serialization, filtering, and ordering logic, the scheduler exposes a single consistent view of task metadata.

PARAMETER DESCRIPTION
store

Restrict results to one store alias.

TYPE: Optional[str] DEFAULT: None

schedule_state

Optional state filter. Accepts a TaskScheduleState member or its string value.

TYPE: TaskScheduleState | str | None DEFAULT: None

executor

Restrict results to one executor alias.

TYPE: str | None DEFAULT: None

trigger

Restrict results to one trigger alias or trigger class name.

TYPE: str | None DEFAULT: None

q

Case-insensitive free-text search across task identifiers, names, callable information, trigger metadata, executor, and state.

TYPE: str | None DEFAULT: None

sort_by

Field to sort on. Supported values are id, name, next_run_time, schedule_state, executor, store, and trigger.

TYPE: str DEFAULT: 'next_run_time'

descending

Reverse the final sort order.

TYPE: bool DEFAULT: False

Source code in asyncz/schedulers/base.py
def get_task_infos(
    self,
    store: Optional[str] = None,
    *,
    schedule_state: TaskScheduleState | str | None = None,
    executor: str | None = None,
    trigger: str | None = None,
    q: str | None = None,
    sort_by: str = "next_run_time",
    descending: bool = False,
) -> list[TaskInfo]:
    """
    Return task snapshots with scheduler-native filtering and sorting.

    This API is meant for read-oriented operational tooling. Instead of every
    consumer re-implementing its own serialization, filtering, and ordering
    logic, the scheduler exposes a single consistent view of task metadata.

    Args:
        store: Restrict results to one store alias.
        schedule_state: Optional state filter. Accepts a
            ``TaskScheduleState`` member or its string value.
        executor: Restrict results to one executor alias.
        trigger: Restrict results to one trigger alias or trigger class name.
        q: Case-insensitive free-text search across task identifiers, names,
            callable information, trigger metadata, executor, and state.
        sort_by: Field to sort on. Supported values are ``id``, ``name``,
            ``next_run_time``, ``schedule_state``, ``executor``, ``store``,
            and ``trigger``.
        descending: Reverse the final sort order.
    """

    resolved_state: TaskScheduleState | None = None
    if schedule_state is not None:
        resolved_state = (
            schedule_state
            if isinstance(schedule_state, TaskScheduleState)
            else TaskScheduleState(str(schedule_state).strip().lower())
        )

    infos = [task.snapshot() for task in self.get_tasks(store)]
    if resolved_state is not None:
        infos = [info for info in infos if info.schedule_state is resolved_state]

    if executor:
        infos = [info for info in infos if info.executor == executor]

    if trigger:
        trigger_value = trigger.strip().lower()
        infos = [
            info
            for info in infos
            if (info.trigger_alias or "").lower() == trigger_value
            or (info.trigger_name or "").lower() == trigger_value
        ]

    if q:
        needle = q.strip().lower()
        infos = [
            info
            for info in infos
            if needle in (info.id or "").lower()
            or needle in (info.name or "").lower()
            or needle in (info.callable_name or "").lower()
            or needle in (info.callable_reference or "").lower()
            or needle in (info.trigger_alias or "").lower()
            or needle in (info.trigger_name or "").lower()
            or needle in (info.trigger_description or "").lower()
            or needle in (info.executor or "").lower()
            or needle in (info.store_alias or "").lower()
            or needle in info.schedule_state.value
        ]

    sorters: dict[str, Callable[[TaskInfo], Any]] = {
        "id": lambda info: (info.id or "").lower(),
        "name": lambda info: ((info.name or "").lower(), (info.id or "").lower()),
        "next_run_time": lambda info: (
            info.next_run_time is None,
            info.next_run_time or datetime.max.replace(tzinfo=self.timezone),
            (info.name or "").lower(),
            (info.id or "").lower(),
        ),
        "schedule_state": lambda info: (
            info.schedule_state.value,
            info.next_run_time is None,
            info.next_run_time or datetime.max.replace(tzinfo=self.timezone),
            (info.id or "").lower(),
        ),
        "executor": lambda info: ((info.executor or "").lower(), (info.id or "").lower()),
        "store": lambda info: ((info.store_alias or "").lower(), (info.id or "").lower()),
        "trigger": lambda info: (
            (info.trigger_alias or info.trigger_name or "").lower(),
            (info.id or "").lower(),
        ),
    }
    try:
        key = sorters[sort_by]
    except KeyError as exc:
        raise ValueError(
            "sort_by must be one of: id, name, next_run_time, schedule_state, "
            "executor, store, trigger."
        ) from exc

    return sorted(infos, key=key, reverse=descending)

delete_task

delete_task(task_id, store=None)

Removes a task, preventing it from being run anymore.

PARAMETER DESCRIPTION
task_id

The identifier of the task.

TYPE: Union[TaskType, str, None]

store

Alias of the task store that most likely contains the task.

TYPE: Optional[str] DEFAULT: None

Source code in asyncz/schedulers/base.py
def delete_task(
    self, task_id: Union[TaskType, str, None], store: Optional[str] = None
) -> None:
    """
    Removes a task, preventing it from being run anymore.

    Args:
        task_id: The identifier of the task.
        store: Alias of the task store that most likely contains the task.
    """
    if isinstance(task_id, TaskType):
        task_id = task_id.id
    if not task_id:
        return
    store_alias = None

    with self.store_lock:
        if self.state == SchedulerState.STATE_STOPPED:
            for index, (task, _, _) in enumerate(self.pending_tasks):
                if task.id == task_id and store in (None, task.store_alias):
                    del self.pending_tasks[index]
                    store_alias = task.store_alias
                    break
        else:
            for alias, _store in self.stores.items():
                if store in (None, alias):
                    try:
                        _store.delete_task(task_id)
                        store_alias = alias
                        break
                    except TaskLookupError:
                        continue

    if store_alias is None:
        raise TaskLookupError(task_id)

    event = TaskEvent(code=TASK_REMOVED, task_id=task_id, store=store_alias)
    self.dispatch_event(event)

    self.loggers[self.logger_name].info(f"Removed task {task_id}.")

remove_all_tasks

remove_all_tasks(store)

Removes all tasks from the specified task store, or all task stores if none is given.

Source code in asyncz/schedulers/base.py
def remove_all_tasks(self, store: Optional[str]) -> None:
    """
    Removes all tasks from the specified task store, or all task stores if none is given.
    """
    with self.store_lock:
        if self.state == SchedulerState.STATE_STOPPED:
            if store:
                self.pending_tasks = [
                    pending
                    for pending in self.pending_tasks
                    if pending[0].store_alias != store
                ]
            else:
                self.pending_tasks = []
        else:
            for alias, _store in self.stores.items():
                if store in (None, alias):
                    _store.remove_all_tasks()
    self.dispatch_event(SchedulerEvent(code=ALL_TASKS_REMOVED, alias=store))

wakeup

wakeup()
Source code in asyncz/schedulers/asyncio.py
@run_in_event_loop
def wakeup(self) -> None:
    self.stop_timer()
    wait_seconds = self.process_tasks()
    self.start_timer(wait_seconds)

create_default_executor

create_default_executor()
Source code in asyncz/schedulers/asyncio.py
def create_default_executor(self) -> BaseExecutor:
    return AsyncIOExecutor()

create_default_store

create_default_store()

Creates a default store, specific to the particular scheduler type.

Source code in asyncz/schedulers/base.py
def create_default_store(self) -> StoreType:
    """
    Creates a default store, specific to the particular scheduler type.
    """
    return MemoryStore()

lookup_executor

lookup_executor(alias)

Returns the executor instance by the given name from the list of executors that were added to this scheduler.

PARAMETER DESCRIPTION
alias

The alias for the instance.

TYPE: str

Source code in asyncz/schedulers/base.py
def lookup_executor(self, alias: str) -> ExecutorType:
    """
    Returns the executor instance by the given name from the list of executors that were added
    to this scheduler.

    Args:
        alias: The alias for the instance.
    """
    try:
        return self.executors[alias]
    except KeyError:
        raise KeyError(f"No such executor: {alias}.") from None

lookup_store

lookup_store(alias)

Returns the task store instance by the given name from the list of task stores that were added to this scheduler.

PARAMETER DESCRIPTION
alias

The alias for the instance.

TYPE: str

Source code in asyncz/schedulers/base.py
def lookup_store(self, alias: str) -> StoreType:
    """
    Returns the task store instance by the given name from the list of task stores that were
    added to this scheduler.

    Args:
        alias: The alias for the instance.
    """
    try:
        return self.stores[alias]
    except KeyError:
        raise KeyError(f"No such store: {alias}.") from None

lookup_task

lookup_task(task_id, store_alias)

Finds a task by its ID.

PARAMETER DESCRIPTION
task_id

The id of the task to lookup.

TYPE: str

alias

Alias of a task store to look in.

Source code in asyncz/schedulers/base.py
def lookup_task(
    self, task_id: str, store_alias: Optional[str]
) -> tuple[TaskType, Optional[str]]:
    """
    Finds a task by its ID.

    Args:
        task_id: The id of the task to lookup.
        alias: Alias of a task store to look in.
    """
    if self.state == SchedulerState.STATE_STOPPED:
        for task, _, _ in self.pending_tasks:
            if task.id == task_id and store_alias in (None, task.store_alias):
                return task, None
    else:
        for alias, store in self.stores.items():
            if store_alias in (None, alias):
                task2 = store.lookup_task(task_id)
                if task2 is not None:
                    return task2, alias

    raise TaskLookupError(task_id)

dispatch_event

dispatch_event(event)

Dispatches the given event to interested listeners.

PARAMETER DESCRIPTION
event

The SchedulerEvent to be sent.

TYPE: SchedulerEvent

Source code in asyncz/schedulers/base.py
def dispatch_event(self, event: SchedulerEvent) -> None:
    """
    Dispatches the given event to interested listeners.

    Args:
        event: The SchedulerEvent to be sent.
    """
    with self.listeners_lock:
        listeners = tuple(self.listeners)

    for callback, mask in listeners:
        if event.code & mask:
            try:
                callback(event)
            except BaseException:
                self.loggers[self.logger_name].exception("Error notifying listener.")

create_lock

create_lock()

Creates a reentrant lock object.

Source code in asyncz/schedulers/base.py
def create_lock(self) -> RLock:
    """
    Creates a reentrant lock object.
    """
    return RLock()

process_tasks

process_tasks()

Iterates through tasks in every store, starts tasks that are due and figures out how long to wait for the next round.

If the get_due_tasks() call raises an exception, a new wakeup is scheduled in at least store_retry_interval seconds.

Source code in asyncz/schedulers/base.py
def process_tasks(self) -> Optional[float]:
    """
    Iterates through tasks in every store, starts tasks that are due and figures out how long
    to wait for the next round.

    If the get_due_tasks() call raises an exception, a new wakeup is scheduled in at least
    store_retry_interval seconds.
    """
    if self.state == SchedulerState.STATE_PAUSED:
        self.loggers[self.logger_name].debug("Scheduler is paused. Not processing tasks.")
        return None

    self.loggers[self.logger_name].debug("Looking for tasks to run.")
    now = datetime.now(self.timezone)
    next_wakeup_time: Optional[datetime] = None
    events: list[SchedulerEvent] = []

    # check for other processing thread
    with self.store_processing_lock.protected(blocking=False) as blocking_success:
        if blocking_success:
            # threading lock
            with self.store_lock:
                for store_alias, store in self.stores.items():
                    next_wakeup_time = self._process_tasks_of_store(
                        now, next_wakeup_time, store_alias, store, events
                    )
        else:
            retry_wakeup_time = now + timedelta(seconds=self.store_retry_interval)
            if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
                next_wakeup_time = retry_wakeup_time

    for event in events:
        self.dispatch_event(event)

    wait_seconds: Optional[float] = None
    if self.state == SchedulerState.STATE_PAUSED:
        self.loggers[self.logger_name].debug(
            "Scheduler is paused. Waiting until resume() is called."
        )
    elif next_wakeup_time is None:
        if self.lock_path:
            wait_seconds = self.store_retry_interval
            self.loggers[self.logger_name].debug(f"No tasks found. Recheck in {wait_seconds}.")
        else:
            self.loggers[self.logger_name].debug("No tasks. Waiting until task is added.")

    else:
        wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
        self.loggers[self.logger_name].debug(
            f"Next wakeup is due at {next_wakeup_time} (in {wait_seconds} seconds)."
        )
    return wait_seconds

setup

setup(global_config=None, prefix='asyncz.', **options)

Reconfigures the scheduler with the given options. Can only be done when the scheduler isn't running.

PARAMETER DESCRIPTION
global_config

a "global" configuration dictionary whose values can be overridden by keyword arguments to this method.

TYPE: Optional[dict[str, Any]] DEFAULT: None

prefix: pick only those keys from global_config that are prefixed with this string (pass an empty string or None to use all keys).

Source code in asyncz/schedulers/base.py
def setup(
    self,
    global_config: Optional[dict[str, Any]] = None,
    prefix: Optional[str] = "asyncz.",
    **options: Any,
) -> None:
    """
    Reconfigures the scheduler with the given options.
    Can only be done when the scheduler isn't running.

    Args:
        global_config: a "global" configuration dictionary whose values can be overridden by
            keyword arguments to this method.
        :prefix: pick only those keys from global_config that are prefixed with
            this string (pass an empty string or None to use all keys).
    """
    if global_config is None:
        global_config = {}

    if self.state != SchedulerState.STATE_STOPPED:
        raise SchedulerAlreadyRunningError()

    if prefix:
        prefix_length = len(prefix)
        global_config = {
            key[prefix_length:]: value
            for key, value in global_config.items()
            if key.startswith(prefix)
        }

    config: dict[str, Any] = {}
    for key, value in global_config.items():
        parts = key.split(".")
        parent = config
        key = parts.pop(0)
        while parts:
            parent = parent.setdefault(key, {})
            key = parts.pop(0)
        parent[key] = value

    config.update(options)
    self._setup(config)

inc_refcount

inc_refcount()
Source code in asyncz/schedulers/base.py
def inc_refcount(self) -> bool:
    with self.ref_lock:
        self.ref_counter += 1
        # first start with 1
        if self.ref_counter > 1:
            return False
    return True

decr_refcount

decr_refcount()
Source code in asyncz/schedulers/base.py
def decr_refcount(self) -> bool:
    with self.ref_lock:
        self.ref_counter -= 1
        # first start with 0
        if self.ref_counter > 0:
            return False
    return True

asgi

asgi(
    app: None,
    handle_lifespan: bool = False,
    wait: bool = False,
) -> Callable[[ASGIApp], ASGIApp]
asgi(
    app: ASGIApp,
    handle_lifespan: bool = False,
    wait: bool = False,
) -> ASGIApp
asgi(app=None, handle_lifespan=False, wait=False)

Return wrapper for asgi integration.

Source code in asyncz/schedulers/base.py
def asgi(
    self,
    app: Optional[ASGIApp] = None,
    handle_lifespan: bool = False,
    wait: bool = False,
) -> Union[ASGIApp, Callable[[ASGIApp], ASGIApp]]:
    """Return wrapper for asgi integration."""

    async def shutdown() -> None:
        result: Any = self.shutdown(wait)
        if isawaitable(result):
            await result

    async def setup() -> contextlib.AsyncExitStack:
        cm = contextlib.AsyncExitStack()
        result: Any = self.start()
        if isawaitable(result):
            await result
        cm.push_async_callback(shutdown)
        return cm

    return LifespanHook(app, setup=setup, do_forward=not handle_lifespan)

check_uwsgi

check_uwsgi()

Check if we are running under uWSGI with threads disabled.

Source code in asyncz/schedulers/base.py
def check_uwsgi(self) -> None:
    """
    Check if we are running under uWSGI with threads disabled.
    """
    uwsgi_module = sys.modules.get("uwsgi")
    if not getattr(uwsgi_module, "has_threads", True):
        raise RuntimeError(
            "The scheduler seems to be running under uWSGI, but threads have "
            "been disabled. You must run uWSGI with the --enable-threads "
            "option for the scheduler to work."
        )

real_add_task

real_add_task(
    task, replace_existing, start_task, min_dtime=None
)

Adds the task.

PARAMETER DESCRIPTION
task

Task instance.

TYPE: TaskType

store_alias

The alias of the store to add the task to.

replace_existing

The flag indicating the replacement of the task.

TYPE: bool

Source code in asyncz/schedulers/base.py
def real_add_task(
    self,
    task: TaskType,
    replace_existing: bool,
    start_task: bool,
    min_dtime: datetime | None = None,
) -> None:
    """
    Adds the task.

    Args:
        task: Task instance.
        store_alias: The alias of the store to add the task to.
        replace_existing: The flag indicating the replacement of the task.
    """
    assert task.trigger is not None, "Submitted task has no trigger set."
    assert task.store_alias is not None, "Submitted task has no store_alias set."
    assert task.executor is not None, "Submitted task has no executor set."
    replacements: dict[str, Any] = {}

    # Calculate the next run time if there is none defined
    if task.next_run_time is None and start_task:
        now = datetime.now(self.timezone)
        replacements["next_run_time"] = task.trigger.get_next_trigger_time(
            self.timezone, None, now
        )
        if min_dtime is not None and isinstance(replacements["next_run_time"], datetime):
            replacements["next_run_time"] = max(min_dtime, replacements["next_run_time"])
    elif isinstance(task.next_run_time, datetime) and min_dtime is not None:
        replacements["next_run_time"] = max(min_dtime, task.next_run_time)

    # Apply replacements
    task.update_task(**replacements)
    # Add the task to the given store
    store = self.lookup_store(task.store_alias)
    try:
        store.add_task(task)
    except ConflictIdError as exc:
        if replace_existing:
            try:
                store.update_task(task)
            except TaskLookupError:
                # was executed and is now gone
                return
        else:
            raise exc
    task.pending = False

    event = TaskEvent(code=TASK_ADDED, task_id=task.id, alias=task.store_alias)
    self.dispatch_event(event)

    self.loggers[self.logger_name].info(
        f"Added task '{task.name}' to store '{task.store_alias}'."
    )

    # Notify the scheduler about the new task.
    if start_task and self.state == SchedulerState.STATE_RUNNING:
        self.wakeup()

resolve_load_plugin classmethod

resolve_load_plugin(module_name)

Resolve the plugin from its module and attrs.

Source code in asyncz/schedulers/base.py
@classmethod
def resolve_load_plugin(cls, module_name: str) -> Any:
    """
    Resolve the plugin from its module and attrs.
    """
    try:
        module_path, class_name = module_name.rsplit(":", 1)
    except ValueError as err:
        raise ImportError(f"{module_name} doesn't look like a module path") from err

    module = import_module(module_path)

    try:
        return getattr(module, class_name)
    except AttributeError as exc:
        raise ImportError(str(exc)) from exc

create_plugin_instance

create_plugin_instance(_type, alias, constructor_args)

Creates an instance of the given plugin type, loading the plugin first if necessary.

Source code in asyncz/schedulers/base.py
def create_plugin_instance(self, _type: str, alias: str, constructor_args: Any) -> Any:
    """
    Creates an instance of the given plugin type, loading the plugin first if necessary.
    """
    plugin_container, class_container, base_class = {
        "trigger": (self.trigger_plugins, self.trigger_classes, TriggerType),
        "store": (self.store_plugins, self.store_classes, StoreType),
        "executor": (self.executor_plugins, self.executor_classes, ExecutorType),
    }[_type]

    try:
        plugin_cls = class_container[alias]  # type: ignore
    except KeyError:
        if alias in plugin_container:
            # plugin_cls = class_container[alias] = plugin_container[alias].load()
            plugin_cls = class_container[alias] = self.resolve_load_plugin(  # type: ignore
                plugin_container[alias]
            )
            if not issubclass(plugin_cls, base_class):
                raise TypeError(
                    f"The {format(_type)} entry point does not point to a {format(_type)} class."
                ) from None
        else:
            raise LookupError(f"No {_type} by the name '{alias}' was found.") from None

    return plugin_cls(**constructor_args)

create_trigger

create_trigger(trigger, trigger_args)

Creates a trigger.

Source code in asyncz/schedulers/base.py
def create_trigger(
    self, trigger: Union[TriggerType, str, None], trigger_args: Any
) -> TriggerType:
    """
    Creates a trigger.
    """
    if isinstance(trigger, TriggerType):
        return trigger
    elif trigger is None:
        trigger = "date"
    elif not isinstance(trigger, str):
        raise TypeError(
            f"Expected a trigger instance or string, got '{trigger.__class__.__name__}' instead."
        )

    trigger_args.setdefault("timezone", self.timezone)

    return cast("TriggerType", self.create_plugin_instance("trigger", trigger, trigger_args))

create_processing_lock

create_processing_lock()

Creates a non-reentrant lock object used to distribute between threads for processing.

Source code in asyncz/schedulers/base.py
def create_processing_lock(self) -> LockProtectedProtocol:
    """
    Creates a non-reentrant lock object used to distribute between threads for processing.
    """
    return RLockProtected()

start_timer

start_timer(wait_seconds=None)
Source code in asyncz/schedulers/asyncio.py
def start_timer(self, wait_seconds: Optional[float] = None) -> None:
    self.stop_timer()
    if wait_seconds is not None:
        self.timer = self.event_loop.call_later(wait_seconds, self.wakeup)

stop_timer

stop_timer()
Source code in asyncz/schedulers/asyncio.py
def stop_timer(self) -> None:
    if getattr(self, "timer", None):
        self.timer.cancel()  # type: ignore
        self.timer = None

Tasks

asyncz.tasks.base.Task

Task(fn=None, *, id=None, **kwargs)

Bases: BaseState, TaskType

Contains the options given when scheduling callables and its current schedule and other state. This class should never be instantiated by the user.

Args:

id: The unique identifier of this task.
name: The description of this task.
fn: The callable to execute.
args: Positional arguments to the callable.
kwargs: Keyword arguments to the callable.
coalesce: Whether to only run the task once when several run times are due.
trigger: The trigger object that controls the schedule of this task.
executor: The name of the executor that will run this task.
mistrigger_grace_time: The time (in seconds) how much this task's execution is allowed to
    be late (`None` means "allow the task to run no matter how late it is").
max_instances: The maximum number of concurrently executing instances allowed for this
    task.
next_run_time: The next scheduled run time of this task.
Source code in asyncz/tasks/base.py
def __init__(
    self,
    fn: Union[Callable[..., Any], str, None] = None,
    *,
    id: Optional[str] = None,
    **kwargs: Any,
):
    if id is None and fn is not None:
        id = uuid4().hex
    super().__init__(id=id, **kwargs)
    self.update_task(fn=fn, **kwargs)

fn_reference class-attribute instance-attribute

fn_reference = None

args class-attribute instance-attribute

args = ()

kwargs class-attribute instance-attribute

kwargs = Field(default_factory=dict)

mistrigger_grace_time class-attribute instance-attribute

mistrigger_grace_time = 1

coalesce class-attribute instance-attribute

coalesce = True

max_instances class-attribute instance-attribute

max_instances = 1

id class-attribute instance-attribute

id = None

name class-attribute instance-attribute

name = None

next_run_time class-attribute instance-attribute

next_run_time = None

fn class-attribute instance-attribute

fn = None

store_alias class-attribute instance-attribute

store_alias = None

executor class-attribute instance-attribute

executor = None

trigger class-attribute instance-attribute

trigger = None

scheduler class-attribute instance-attribute

scheduler = None

pending class-attribute instance-attribute

pending = True

submitted class-attribute instance-attribute

submitted = False

schedule_state property

schedule_state

Return the scheduler-facing state of this task.

The state is derived entirely from the task metadata so it works for both live tasks and tasks reconstructed from stores:

  • pending tasks have not yet been committed to a running store
  • paused tasks have no next_run_time
  • scheduled tasks have a future or due next_run_time

paused property

paused

Convenience flag used by management APIs and the dashboard.

This intentionally distinguishes paused tasks from pending tasks so the caller can tell whether a task is merely unscheduled or still waiting to be committed at scheduler start time.

model_config class-attribute instance-attribute

model_config = ConfigDict(arbitrary_types_allowed=True)

get_run_times

get_run_times(timezone, now)

Computes the scheduled run times next_run_time and now, inclusive.

Source code in asyncz/tasks/base.py
def get_run_times(self, timezone: tzinfo, now: datetime) -> list[datetime]:
    """
    Computes the scheduled run times `next_run_time` and `now`, inclusive.
    """
    run_times = []
    next_run_time = self.next_run_time
    assert self.trigger
    while next_run_time and next_run_time <= now:
        run_times.append(next_run_time)
        next_run_time = self.trigger.get_next_trigger_time(timezone, next_run_time, now)
    return run_times

update_task

update_task(*, fn=None, scheduler=None, **updates)

Validates the updates to the Task and makes the modifications if and only if all of them validate.

Source code in asyncz/tasks/base.py
def update_task(  # type: ignore
    self,
    *,
    fn: Union[Callable[..., Any], str, None] = None,
    scheduler: Optional[SchedulerType] = None,
    **updates: Any,
) -> None:
    """
    Validates the updates to the Task and makes the modifications if and only if all of them
    validate.
    """
    approved: dict[str, Any] = {}
    if scheduler is not None:
        if self.scheduler is not None and self.scheduler is not scheduler:
            raise ValueError("The task scheduler may not be changed.")
        approved["scheduler"] = scheduler
    else:
        scheduler = self.scheduler

    if "id" in updates:
        raise ValueError("The task ID may not be changed.")

    if fn or "args" in updates or "kwargs" in updates:
        if not fn:
            fn = self.fn
        args = updates.pop("args") if "args" in updates else self.args
        kwargs = updates.pop("kwargs") if "kwargs" in updates else self.kwargs

        if fn is None:
            fn_reference = None
        elif isinstance(fn, str):
            fn_reference = fn
            fn = ref_to_obj(fn)
        elif callable(fn):
            try:
                fn_reference = obj_to_ref(fn)
            except ValueError:
                fn_reference = None
        else:
            raise TypeError("fn must be a callable or a textual reference to a callable.")

        if fn is not None and not getattr(self, "name", None) and updates.get("name") is None:
            updates["name"] = get_callable_name(cast(Callable[..., Any], fn))

        if isinstance(args, str) or not isinstance(args, Iterable):
            raise TypeError("args must be a non-string iterable.")
        if isinstance(kwargs, str) or not isinstance(kwargs, Mapping):
            raise TypeError("kwargs must be a dict-like object.")

        if fn is not None:
            check_callable_args(cast(Callable[..., Any], fn), args, kwargs)

        approved["fn"] = fn
        approved["fn_reference"] = fn_reference
        approved["args"] = args
        approved["kwargs"] = kwargs

    if updates.get("name") is not None:
        name = updates.pop("name")
        if not name or not isinstance(name, str):
            raise TypeError("name must be a non empty string.")
        approved["name"] = name
    else:
        # pop Nones
        updates.pop("name", None)

    if "mistrigger_grace_time" in updates:
        mistrigger_grace_time = updates.pop("mistrigger_grace_time")
        if mistrigger_grace_time is not None and (
            not isinstance(mistrigger_grace_time, (float, int)) or mistrigger_grace_time <= 0
        ):
            raise TypeError(
                "mistrigger_grace_time must be either None or a positive float/integer."
            )
        approved["mistrigger_grace_time"] = mistrigger_grace_time

    if "coalesce" in updates:
        coalesce = bool(updates.pop("coalesce"))
        approved["coalesce"] = coalesce

    if "store_alias" in updates:
        store_alias = updates.pop("store_alias")
        if not isinstance(store_alias, str):
            raise TypeError("store_alias must be a string.")
        approved["store_alias"] = store_alias

    if "max_instances" in updates:
        max_instances = updates.pop("max_instances")
        if not isinstance(max_instances, int) or max_instances <= 0:
            raise TypeError("max_instances must be a positive integer.")
        approved["max_instances"] = max_instances

    if "trigger" in updates:
        trigger = updates.pop("trigger")
        if not isinstance(trigger, TriggerType):
            raise TypeError(
                f"Expected a trigger instance, got {trigger.__class__.__name__} instead."
            )
        approved["trigger"] = trigger

    if "executor" in updates:
        executor = updates.pop("executor")
        if not isinstance(executor, str):
            raise TypeError("executor must be a string.")
        approved["executor"] = executor

    if "next_run_time" in updates:
        if not isinstance(scheduler, SchedulerType):
            raise TypeError("Cannot set next_run_time without scheduler.")

        next_run_time = updates.pop("next_run_time")
        approved["next_run_time"] = to_datetime(
            next_run_time, scheduler.timezone, "next_run_time"
        )

    if updates:
        raise AttributeError(
            f"The following are not modifiable attributes of Task: {', '.join(updates)}."
        )

    for key, value in approved.items():
        setattr(self, key, value)

update

update(**updates)

Makes the given updates to this json and save it in the associated store. Accepted keyword args are the same as the class variables.

Source code in asyncz/tasks/types.py
def update(self, **updates: Any) -> TaskType:
    """
    Makes the given updates to this json and save it in the associated store.
    Accepted keyword args are the same as the class variables.
    """
    scheduler = self.scheduler
    task_id = self.id
    if scheduler is not None and task_id is not None:
        scheduler.update_task(task_id, self.store_alias, **updates)
    return self

reschedule

reschedule(trigger, **trigger_args)

Shortcut for switching the trigger on this task.

Source code in asyncz/tasks/types.py
def reschedule(self, trigger: TriggerType, **trigger_args: Any) -> TaskType:
    """
    Shortcut for switching the trigger on this task.
    """
    scheduler = self.scheduler
    task_id = self.id
    if scheduler is not None and task_id is not None:
        scheduler.reschedule_task(task_id, self.store_alias, trigger, **trigger_args)
    return self

pause

pause()

Temporarily suspenses the execution of a given task.

Source code in asyncz/tasks/types.py
def pause(self) -> TaskType:
    """
    Temporarily suspenses the execution of a given task.
    """
    scheduler = self.scheduler
    task_id = self.id
    if scheduler is not None and task_id is not None:
        scheduler.pause_task(task_id, self.store_alias)
    return self

resume

resume()

Resume the schedule of this task if previously paused.

Source code in asyncz/tasks/types.py
def resume(self) -> TaskType:
    """
    Resume the schedule of this task if previously paused.
    """
    scheduler = self.scheduler
    task_id = self.id
    if scheduler is not None and task_id is not None:
        scheduler.resume_task(task_id, self.store_alias)
    return self

delete

delete()

Unschedules this task and removes it from its associated store.

Source code in asyncz/tasks/types.py
def delete(self) -> TaskType:
    """
    Unschedules this task and removes it from its associated store.
    """
    scheduler = self.scheduler
    task_id = self.id
    if scheduler is not None and task_id is not None:
        scheduler.delete_task(task_id, self.store_alias)
    return self

snapshot

snapshot()

Build an immutable inspection snapshot for this task.

The snapshot is the preferred representation for presentation-oriented code because it captures the task's observable state without exposing the live mutable task object.

Source code in asyncz/tasks/types.py
def snapshot(self) -> TaskInfo:
    """
    Build an immutable inspection snapshot for this task.

    The snapshot is the preferred representation for presentation-oriented
    code because it captures the task's observable state without exposing
    the live mutable task object.
    """

    from asyncz.utils import get_callable_name

    callable_name: str | None = None
    if self.fn is not None:
        try:
            callable_name = get_callable_name(self.fn)
        except TypeError:
            callable_name = None

    fn_reference = getattr(self, "fn_reference", None)
    if callable_name is None and fn_reference:
        callable_name = fn_reference.split(":")[-1]

    trigger = self.trigger
    return TaskInfo(
        id=self.id,
        name=self.name,
        callable_name=callable_name,
        callable_reference=fn_reference,
        trigger_alias=getattr(trigger, "alias", None),
        trigger_name=type(trigger).__name__ if trigger is not None else None,
        trigger_description=str(trigger) if trigger is not None else None,
        executor=self.executor,
        store_alias=self.store_alias,
        schedule_state=self.schedule_state,
        next_run_time=self.next_run_time,
        coalesce=self.coalesce,
        max_instances=self.max_instances,
        mistrigger_grace_time=self.mistrigger_grace_time,
        pending=self.pending,
        paused=self.paused,
        submitted=self.submitted,
    )

Triggers

asyncz.triggers.date.DateTrigger

DateTrigger(run_at=None, timezone=None, **kwargs)

Bases: BaseTrigger

Triggers once on the given datetime. If run_at is left empty then the current time is used.

PARAMETER DESCRIPTION
run_at

The date/time to run the task at.

TYPE: Optional[Union[datetime, str]] DEFAULT: None

timezone

The time zone for the run_at if it does not have one already.

TYPE: Optional[Union[tzinfo, str]] DEFAULT: None

Source code in asyncz/triggers/date.py
def __init__(
    self,
    run_at: Optional[Union[datetime, str]] = None,
    timezone: Optional[Union[tzinfo, str]] = None,
    **kwargs: Any,
):
    timezone = to_timezone(timezone)
    if run_at is not None:
        kwargs["run_at"] = to_datetime(run_at, timezone, "run_at")
    else:
        kwargs["run_at"] = datetime.now(timezone)
        kwargs["allow_mistrigger_by_default"] = True
    super().__init__(**kwargs)

alias class-attribute

alias = 'date'

run_at instance-attribute

run_at

jitter class-attribute instance-attribute

jitter = None

allow_mistrigger_by_default class-attribute instance-attribute

allow_mistrigger_by_default = False

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow", arbitrary_types_allowed=True
)

get_next_trigger_time

get_next_trigger_time(timezone, previous_time, now=None)
Source code in asyncz/triggers/date.py
def get_next_trigger_time(
    self, timezone: tzinfo, previous_time: Optional[datetime], now: Optional[datetime] = None
) -> Union[datetime, None]:
    if previous_time is None:
        return self.run_at.astimezone(timezone)
    return None

apply_jitter

apply_jitter(
    next_trigger_time: datetime,
    jitter: Optional[int],
    now: datetime,
) -> datetime
apply_jitter(
    next_trigger_time: None,
    jitter: Optional[int],
    now: datetime,
) -> None
apply_jitter(next_trigger_time, jitter, now)

Makes the next trigger time random by ading a random value (jitter).

PARAMETER DESCRIPTION
next_trigger_time

The next triger time without the jitter.

TYPE: Optional[datetime]

jitter

The maximum number of second to add to the next_trigger_time.

TYPE: Optional[int]

now

The next trigger time with the jitter.

TYPE: datetime

Source code in asyncz/triggers/base.py
def apply_jitter(
    self, next_trigger_time: Optional[datetime], jitter: Optional[int], now: datetime
) -> Union[datetime, None]:
    """
    Makes the next trigger time random by ading a random value (jitter).

    Args:
        next_trigger_time: The next triger time without the jitter.
        jitter: The maximum number of second to add to the next_trigger_time.
        now: The next trigger time with the jitter.
    """
    if next_trigger_time is None or not jitter:
        return next_trigger_time
    return next_trigger_time + timedelta(seconds=random.uniform(0, jitter))

asyncz.triggers.interval.IntervalTrigger

IntervalTrigger(
    weeks=0,
    days=0,
    hours=0,
    minutes=0,
    seconds=0,
    start_at=None,
    end_at=None,
    timezone=None,
    jitter=None,
    **kwargs,
)

Bases: BaseTrigger

Triggers on a specific intervals, starting on start_at if specified or datetime.now() + interval otherwise.

PARAMETER DESCRIPTION
weeks

Number of weeks to wait.

TYPE: int DEFAULT: 0

days

Number of days to wait.

TYPE: int DEFAULT: 0

hours

Number of hours to wait.

TYPE: int DEFAULT: 0

minutes

Number of minutes to wait.

TYPE: int DEFAULT: 0

seconds

Number of seconds to wait.

TYPE: int DEFAULT: 0

start_at

Starting point for the interval calculation.

TYPE: Optional[Union[datetime, str]] DEFAULT: None

end_at

Latest possible date/time to trigger on.

TYPE: Optional[Union[datetime, str]] DEFAULT: None

timezone

Time zone to use gor the date/time calculations.

TYPE: Optional[Union[tzinfo, str]] DEFAULT: None

jitter

Delay the task execution by jitter seconds at most.

TYPE: Optional[int] DEFAULT: None

Source code in asyncz/triggers/interval.py
def __init__(
    self,
    weeks: int = 0,
    days: int = 0,
    hours: int = 0,
    minutes: int = 0,
    seconds: int = 0,
    start_at: Optional[Union[datetime, str]] = None,
    end_at: Optional[Union[datetime, str]] = None,
    timezone: Optional[Union[tzinfo, str]] = None,
    jitter: Optional[int] = None,
    **kwargs: Any,
):
    super().__init__(jitter=jitter, **kwargs)
    self.interval = timedelta(
        weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds
    )
    self.interval_size = timedelta_seconds(self.interval)

    if self.interval_size == 0:
        self.interval = timedelta(seconds=1)
        self.interval_size = 1

    if timezone:
        self.timezone = to_timezone(timezone)
    elif isinstance(start_at, datetime) and start_at.tzinfo:
        self.timezone = start_at.tzinfo
    elif isinstance(end_at, datetime) and end_at.tzinfo:
        self.timezone = end_at.tzinfo
    else:
        self.timezone = None

    start_at = start_at or (datetime.now(self.timezone) + self.interval)
    self.start_at = to_datetime(start_at, self.timezone, "start_at", require_tz=False)
    self.end_at = to_datetime(end_at, self.timezone, "end_at", require_tz=False)

alias class-attribute

alias = 'interval'

timezone class-attribute instance-attribute

timezone = None

interval instance-attribute

interval = timedelta(
    weeks=weeks,
    days=days,
    hours=hours,
    minutes=minutes,
    seconds=seconds,
)

interval_size instance-attribute

interval_size = timedelta_seconds(interval)

start_at instance-attribute

start_at = to_datetime(
    start_at, timezone, "start_at", require_tz=False
)

end_at instance-attribute

end_at = to_datetime(
    end_at, timezone, "end_at", require_tz=False
)

jitter class-attribute instance-attribute

jitter = None

allow_mistrigger_by_default class-attribute instance-attribute

allow_mistrigger_by_default = False

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow", arbitrary_types_allowed=True
)

get_next_trigger_time

get_next_trigger_time(timezone, previous_time, now=None)
Source code in asyncz/triggers/interval.py
def get_next_trigger_time(
    self,
    timezone: tzinfo,
    previous_time: Optional[datetime],
    now: Union[datetime, None] = None,
) -> Union[datetime, None]:
    timezone = self.timezone or timezone
    if now is None:
        now = datetime.now(timezone)
    next_trigger_time: Optional[datetime]
    start_at = (
        self.start_at.replace(tzinfo=timezone)
        if self.start_at and self.start_at.tzinfo is None
        else self.start_at
    )
    if previous_time:
        next_trigger_time = previous_time + self.interval
    elif start_at > now:
        next_trigger_time = start_at
    else:
        time_difference_seconds = timedelta_seconds(now - start_at)
        next_interval_number = int(ceil(time_difference_seconds / self.interval_size))
        next_trigger_time = start_at + self.interval * next_interval_number

    if self.jitter is not None:
        next_trigger_time = self.apply_jitter(
            next_trigger_time=next_trigger_time, jitter=self.jitter, now=now
        )
    end_at = (
        self.end_at.replace(tzinfo=timezone)
        if self.end_at and self.end_at.tzinfo is None
        else self.end_at
    )

    if not end_at or next_trigger_time <= end_at:
        return normalize(value=next_trigger_time)
    return None

apply_jitter

apply_jitter(
    next_trigger_time: datetime,
    jitter: Optional[int],
    now: datetime,
) -> datetime
apply_jitter(
    next_trigger_time: None,
    jitter: Optional[int],
    now: datetime,
) -> None
apply_jitter(next_trigger_time, jitter, now)

Makes the next trigger time random by ading a random value (jitter).

PARAMETER DESCRIPTION
next_trigger_time

The next triger time without the jitter.

TYPE: Optional[datetime]

jitter

The maximum number of second to add to the next_trigger_time.

TYPE: Optional[int]

now

The next trigger time with the jitter.

TYPE: datetime

Source code in asyncz/triggers/base.py
def apply_jitter(
    self, next_trigger_time: Optional[datetime], jitter: Optional[int], now: datetime
) -> Union[datetime, None]:
    """
    Makes the next trigger time random by ading a random value (jitter).

    Args:
        next_trigger_time: The next triger time without the jitter.
        jitter: The maximum number of second to add to the next_trigger_time.
        now: The next trigger time with the jitter.
    """
    if next_trigger_time is None or not jitter:
        return next_trigger_time
    return next_trigger_time + timedelta(seconds=random.uniform(0, jitter))

asyncz.triggers.cron.trigger.CronTrigger

CronTrigger(
    year=None,
    month=None,
    day=None,
    week=None,
    day_of_week=None,
    hour=None,
    minute=None,
    second=None,
    start_at=None,
    end_at=None,
    timezone=None,
    jitter=None,
    **kwargs,
)

Bases: BaseTrigger

Triggers when the current time matches all specified time constraints. Very simlar to the way UNIX cron scheduler works.

┌───────────── minute (0 - 59) │ ┌───────────── hour (0 - 23) │ │ ┌───────────── day of the month (1 - 31) │ │ │ ┌───────────── month (1 - 12) │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday; │ │ │ │ │ 7 is also Sunday on some systems) │ │ │ │ │ │ │ │ │ │ * * * * *

PARAMETER DESCRIPTION
year

4-digit value.

TYPE: Optional[Union[int, str]] DEFAULT: None

month

Month (1-12).

TYPE: Optional[Union[int, str]] DEFAULT: None

day

Day of the month (1-31).

TYPE: Optional[Union[int, str]] DEFAULT: None

week

ISO week (1-53).

TYPE: Optional[Union[int, str]] DEFAULT: None

day_of_week

Number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun).

TYPE: Optional[Union[int, str]] DEFAULT: None

hour

Hour (0-23).

TYPE: Optional[Union[int, str]] DEFAULT: None

minute

Minute (0-59).

TYPE: Optional[Union[int, str]] DEFAULT: None

second

Second (0-59).

TYPE: Optional[Union[int, str]] DEFAULT: None

start_at

Earliest possible date/time to trigger on (inclusive).

TYPE: Optional[Union[datetime, str]] DEFAULT: None

end_at

Latest possible date/time to trier on (inclusive).

TYPE: Optional[Union[datetime, str]] DEFAULT: None

timezone

Time zone to use for the date/time calculations (defaults to scheduler timezone).

TYPE: Optional[Union[tzinfo, str]] DEFAULT: None

jitter

Delay the task executions by jitter seconds at most.

TYPE: Optional[int] DEFAULT: None

The first day of the week is always monday.

Source code in asyncz/triggers/cron/trigger.py
def __init__(
    self,
    year: Optional[Union[int, str]] = None,
    month: Optional[Union[int, str]] = None,
    day: Optional[Union[int, str]] = None,
    week: Optional[Union[int, str]] = None,
    day_of_week: Optional[Union[int, str]] = None,
    hour: Optional[Union[int, str]] = None,
    minute: Optional[Union[int, str]] = None,
    second: Optional[Union[int, str]] = None,
    start_at: Optional[Union[datetime, str]] = None,
    end_at: Optional[Union[datetime, str]] = None,
    timezone: Optional[Union[tzinfo, str]] = None,
    jitter: Optional[int] = None,
    **kwargs: Any,
):
    super().__init__(**kwargs)
    self.year = year
    self.month = month
    self.day = day
    self.week = week
    self.day_of_week = day_of_week
    self.hour = hour
    self.minute = minute
    self.second = second
    self.minute = minute
    self.field_names = (
        "year",
        "month",
        "day",
        "week",
        "day_of_week",
        "hour",
        "minute",
        "second",
    )
    self.fields_map = {
        "year": BaseField,
        "month": MonthField,
        "week": WeekField,
        "day": DayOfMonthField,
        "day_of_week": DayOfWeekField,
        "hour": BaseField,
        "minute": BaseField,
        "second": BaseField,
    }

    if timezone:
        self.timezone = to_timezone(timezone)
    elif isinstance(start_at, datetime) and start_at.tzinfo:
        self.timezone = start_at.tzinfo
    elif isinstance(end_at, datetime) and end_at.tzinfo:
        self.timezone = end_at.tzinfo
    else:
        self.timezone = None

    self.start_at = to_datetime(start_at, self.timezone, "start_at", require_tz=False)
    self.end_at = to_datetime(end_at, self.timezone, "end_at", require_tz=False)
    self.jitter = jitter

    values = {
        key: value
        for key, value in iter(locals().items())
        if key in self.field_names and value is not None
    }

    self.fields = []
    assign_defaults = False

    for field_name in self.field_names:
        if field_name in values:
            expressions = values.pop(field_name)
            is_default = False
            assign_defaults = not values
        elif assign_defaults:
            expressions = DEFAULT_VALUES[field_name]
            is_default = True
        else:
            expressions = "*"
            is_default = True

        field_class = self.fields_map[field_name]
        field = field_class(field_name, expressions, is_default)
        self.fields.append(field)

alias class-attribute

alias = 'cron'

timezone class-attribute instance-attribute

timezone = None

year instance-attribute

year = year

month instance-attribute

month = month

day instance-attribute

day = day

week instance-attribute

week = week

day_of_week instance-attribute

day_of_week = day_of_week

hour instance-attribute

hour = hour

second instance-attribute

second = second

minute instance-attribute

minute = minute

field_names instance-attribute

field_names = (
    "year",
    "month",
    "day",
    "week",
    "day_of_week",
    "hour",
    "minute",
    "second",
)

fields_map instance-attribute

fields_map = {
    "year": BaseField,
    "month": MonthField,
    "week": WeekField,
    "day": DayOfMonthField,
    "day_of_week": DayOfWeekField,
    "hour": BaseField,
    "minute": BaseField,
    "second": BaseField,
}

start_at instance-attribute

start_at = to_datetime(
    start_at, timezone, "start_at", require_tz=False
)

end_at instance-attribute

end_at = to_datetime(
    end_at, timezone, "end_at", require_tz=False
)

jitter instance-attribute

jitter = jitter

fields instance-attribute

fields = []

allow_mistrigger_by_default class-attribute instance-attribute

allow_mistrigger_by_default = False

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow", arbitrary_types_allowed=True
)

from_crontab classmethod

from_crontab(expression, timezone=None)

Creates a class CronTrigger from a standard crontab expression. See https://en.wikipedia.org/wiki/Cron for more information on the format accepted here.

Source code in asyncz/triggers/cron/trigger.py
@classmethod
def from_crontab(
    cls,
    expression: Union[str, Any],
    timezone: Optional[Union[str, tzinfo]] = None,
) -> "CronTrigger":
    """
    Creates a class CronTrigger from a standard crontab expression.
    See https://en.wikipedia.org/wiki/Cron for more information on the format accepted here.

    Args:
        expression - minute, hour, day of month, month, day of week.
        timezone  Time zone to use for the date/time calculations. Defaults to scheduler timezone.
    """
    values = expression.split()
    if len(values) != 5:
        raise ValueError(f"Wrong number of fields. Got {len(values)}, expected 5.")

    return cls(
        minute=values[0],
        hour=values[1],
        day=values[2],
        month=values[3],
        day_of_week=values[4],
        timezone=timezone,
    )

increment_field_value

increment_field_value(dateval, field_number)

Increments the designated field and resets all significant fields to their minimum values

Source code in asyncz/triggers/cron/trigger.py
def increment_field_value(self, dateval: datetime, field_number: int) -> tuple[datetime, int]:
    """
    Increments the designated field and resets all significant fields to their minimum values
    """
    values = {}
    count = 0

    while count < len(self.fields):
        field = self.fields[count]

        if not field.real:
            if count == field_number:
                field_number -= 1
                count -= 1
            else:
                count += 1
            continue

        if count < field_number:
            values[field.name] = field.get_value(dateval)
            count += 1
        elif count > field_number:
            values[field.name] = field.get_min(dateval)
            count += 1
        else:
            value = field.get_value(dateval)
            max_value = field.get_max(dateval)
            if value == max_value:
                field_number -= 1
                count -= 1
            else:
                values[field.name] = value + 1
                count += 1

    difference = datetime(**values) - dateval.replace(tzinfo=None)
    return normalize(dateval + difference), field_number

set_field_value

set_field_value(dateval, field_number, new_value, timezone)
Source code in asyncz/triggers/cron/trigger.py
def set_field_value(
    self, dateval: datetime, field_number: int, new_value: Any, timezone: tzinfo
) -> datetime:
    values = {}
    for i, field in enumerate(self.fields):
        if field.real:
            if i < field_number:
                values[field.name] = field.get_value(dateval)
            elif i > field_number:
                values[field.name] = field.get_min(dateval)
            else:
                values[field.name] = new_value

    return localize(datetime(**values), timezone)

get_next_trigger_time

get_next_trigger_time(timezone, previous_time, now=None)
Source code in asyncz/triggers/cron/trigger.py
def get_next_trigger_time(
    self, timezone: tzinfo, previous_time: Optional[datetime], now: Optional[datetime] = None
) -> Union[datetime, None]:
    timezone = self.timezone or timezone
    if now is None:
        now = datetime.now(timezone)
    start_at_field = (
        self.start_at.replace(tzinfo=timezone)
        if self.start_at and self.start_at.tzinfo is None
        else self.start_at
    )
    end_at_field = (
        self.end_at.replace(tzinfo=timezone)
        if self.end_at and self.end_at.tzinfo is None
        else self.end_at
    )
    if previous_time:
        start_at = min(now, previous_time + timedelta(microseconds=1))
        if start_at == previous_time:
            start_at += timedelta(microseconds=1)
    else:
        start_at = max(now, start_at_field) if start_at_field else now
    if start_at.tzinfo != timezone:
        start_at = start_at.astimezone(timezone)

    fieldnum = 0
    next_date = datetime_ceil(start_at)

    while 0 <= fieldnum < len(self.fields):
        field = self.fields[fieldnum]
        curr_value = field.get_value(next_date)
        next_value = field.get_next_value(next_date)

        if next_value is None:
            next_date, fieldnum = self.increment_field_value(next_date, fieldnum - 1)
        elif next_value > curr_value:
            if field.real:
                next_date = self.set_field_value(next_date, fieldnum, next_value, timezone)
                fieldnum += 1
            else:
                next_date, fieldnum = self.increment_field_value(next_date, fieldnum)
        else:
            fieldnum += 1

        if end_at_field and next_date > end_at_field:
            return None

    if fieldnum >= 0:
        next_date = self.apply_jitter(next_date, self.jitter, now)
        return min(next_date, end_at_field) if end_at_field else next_date
    return None

apply_jitter

apply_jitter(
    next_trigger_time: datetime,
    jitter: Optional[int],
    now: datetime,
) -> datetime
apply_jitter(
    next_trigger_time: None,
    jitter: Optional[int],
    now: datetime,
) -> None
apply_jitter(next_trigger_time, jitter, now)

Makes the next trigger time random by ading a random value (jitter).

PARAMETER DESCRIPTION
next_trigger_time

The next triger time without the jitter.

TYPE: Optional[datetime]

jitter

The maximum number of second to add to the next_trigger_time.

TYPE: Optional[int]

now

The next trigger time with the jitter.

TYPE: datetime

Source code in asyncz/triggers/base.py
def apply_jitter(
    self, next_trigger_time: Optional[datetime], jitter: Optional[int], now: datetime
) -> Union[datetime, None]:
    """
    Makes the next trigger time random by ading a random value (jitter).

    Args:
        next_trigger_time: The next triger time without the jitter.
        jitter: The maximum number of second to add to the next_trigger_time.
        now: The next trigger time with the jitter.
    """
    if next_trigger_time is None or not jitter:
        return next_trigger_time
    return next_trigger_time + timedelta(seconds=random.uniform(0, jitter))

asyncz.triggers.combination.AndTrigger

AndTrigger(triggers, jitter=None, **kwargs)

Bases: BaseCombinationTrigger

Always returns the earliest next trigger time that all the passed triggers agree on. The trigger is consideres to be finished when any of the given triggers finished its schedule.

PARAMETER DESCRIPTION
triggers

List of triggers to combine.

TYPE: list[TriggerType]

jitter

Delay the task execution by the jitter seconds at most.

TYPE: Optional[int] DEFAULT: None

Source code in asyncz/triggers/base.py
def __init__(
    self,
    triggers: list[TriggerType],
    jitter: Optional[int] = None,
    **kwargs: Any,
) -> None:
    kwargs["triggers"] = triggers
    super().__init__(jitter=jitter, **kwargs)

alias class-attribute

alias = 'and'

jitter class-attribute instance-attribute

jitter = None

allow_mistrigger_by_default class-attribute instance-attribute

allow_mistrigger_by_default = False

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow", arbitrary_types_allowed=True
)

triggers instance-attribute

triggers

get_next_trigger_time

get_next_trigger_time(timezone, previous_time, now=None)
Source code in asyncz/triggers/combination.py
def get_next_trigger_time(
    self, timezone: tzinfo, previous_time: Optional[datetime], now: Optional[datetime] = None
) -> Union[datetime, None]:
    if now is None:
        now = datetime.now(timezone)
    while True:
        trigger_times = []
        for trigger in self.triggers:
            next_trigger_time = trigger.get_next_trigger_time(timezone, previous_time, now)
            # bail out early
            if next_trigger_time is None:
                return None
            trigger_times.append(next_trigger_time)
        if min(trigger_times) == max(trigger_times):
            return self.apply_jitter(trigger_times[0], self.jitter, now)
        else:
            # recheck
            now = max(trigger_times)

apply_jitter

apply_jitter(
    next_trigger_time: datetime,
    jitter: Optional[int],
    now: datetime,
) -> datetime
apply_jitter(
    next_trigger_time: None,
    jitter: Optional[int],
    now: datetime,
) -> None
apply_jitter(next_trigger_time, jitter, now)

Makes the next trigger time random by ading a random value (jitter).

PARAMETER DESCRIPTION
next_trigger_time

The next triger time without the jitter.

TYPE: Optional[datetime]

jitter

The maximum number of second to add to the next_trigger_time.

TYPE: Optional[int]

now

The next trigger time with the jitter.

TYPE: datetime

Source code in asyncz/triggers/base.py
def apply_jitter(
    self, next_trigger_time: Optional[datetime], jitter: Optional[int], now: datetime
) -> Union[datetime, None]:
    """
    Makes the next trigger time random by ading a random value (jitter).

    Args:
        next_trigger_time: The next triger time without the jitter.
        jitter: The maximum number of second to add to the next_trigger_time.
        now: The next trigger time with the jitter.
    """
    if next_trigger_time is None or not jitter:
        return next_trigger_time
    return next_trigger_time + timedelta(seconds=random.uniform(0, jitter))

asyncz.triggers.combination.OrTrigger

OrTrigger(triggers, jitter=None, **kwargs)

Bases: BaseCombinationTrigger

Always returns the earliest next trigger time produced by any of the given triggers. The trigger is considered finished when all the given triggers have finished their schedules.

PARAMETER DESCRIPTION
triggers

List of triggers to combine.

TYPE: list[TriggerType]

jitter

Delay the task execution by the jitter seconds at most.

TYPE: Optional[int] DEFAULT: None

Source code in asyncz/triggers/base.py
def __init__(
    self,
    triggers: list[TriggerType],
    jitter: Optional[int] = None,
    **kwargs: Any,
) -> None:
    kwargs["triggers"] = triggers
    super().__init__(jitter=jitter, **kwargs)

alias class-attribute

alias = 'or'

jitter class-attribute instance-attribute

jitter = None

allow_mistrigger_by_default class-attribute instance-attribute

allow_mistrigger_by_default = False

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow", arbitrary_types_allowed=True
)

triggers instance-attribute

triggers

get_next_trigger_time

get_next_trigger_time(timezone, previous_time, now=None)
Source code in asyncz/triggers/combination.py
def get_next_trigger_time(
    self, timezone: tzinfo, previous_time: Optional[datetime], now: Optional[datetime] = None
) -> Union[datetime, None]:
    if now is None:
        now = datetime.now(timezone)
    trigger_times = []
    for trigger in self.triggers:
        next_trigger_time = trigger.get_next_trigger_time(timezone, previous_time, now)
        if next_trigger_time is not None:
            trigger_times.append(next_trigger_time)

    if trigger_times:
        return self.apply_jitter(min(trigger_times), self.jitter, now)
    else:
        return None

apply_jitter

apply_jitter(
    next_trigger_time: datetime,
    jitter: Optional[int],
    now: datetime,
) -> datetime
apply_jitter(
    next_trigger_time: None,
    jitter: Optional[int],
    now: datetime,
) -> None
apply_jitter(next_trigger_time, jitter, now)

Makes the next trigger time random by ading a random value (jitter).

PARAMETER DESCRIPTION
next_trigger_time

The next triger time without the jitter.

TYPE: Optional[datetime]

jitter

The maximum number of second to add to the next_trigger_time.

TYPE: Optional[int]

now

The next trigger time with the jitter.

TYPE: datetime

Source code in asyncz/triggers/base.py
def apply_jitter(
    self, next_trigger_time: Optional[datetime], jitter: Optional[int], now: datetime
) -> Union[datetime, None]:
    """
    Makes the next trigger time random by ading a random value (jitter).

    Args:
        next_trigger_time: The next triger time without the jitter.
        jitter: The maximum number of second to add to the next_trigger_time.
        now: The next trigger time with the jitter.
    """
    if next_trigger_time is None or not jitter:
        return next_trigger_time
    return next_trigger_time + timedelta(seconds=random.uniform(0, jitter))

asyncz.triggers.shutdown.ShutdownTrigger

Bases: BaseTrigger

alias class-attribute

alias = 'shutdown'

jitter class-attribute instance-attribute

jitter = None

allow_mistrigger_by_default class-attribute instance-attribute

allow_mistrigger_by_default = False

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow", arbitrary_types_allowed=True
)

get_next_trigger_time

get_next_trigger_time(timezone, previous_time, now=None)
Source code in asyncz/triggers/shutdown.py
def get_next_trigger_time(
    self, timezone: tzinfo, previous_time: Optional[datetime], now: Optional[datetime] = None
) -> None:
    return None

apply_jitter

apply_jitter(
    next_trigger_time: datetime,
    jitter: Optional[int],
    now: datetime,
) -> datetime
apply_jitter(
    next_trigger_time: None,
    jitter: Optional[int],
    now: datetime,
) -> None
apply_jitter(next_trigger_time, jitter, now)

Makes the next trigger time random by ading a random value (jitter).

PARAMETER DESCRIPTION
next_trigger_time

The next triger time without the jitter.

TYPE: Optional[datetime]

jitter

The maximum number of second to add to the next_trigger_time.

TYPE: Optional[int]

now

The next trigger time with the jitter.

TYPE: datetime

Source code in asyncz/triggers/base.py
def apply_jitter(
    self, next_trigger_time: Optional[datetime], jitter: Optional[int], now: datetime
) -> Union[datetime, None]:
    """
    Makes the next trigger time random by ading a random value (jitter).

    Args:
        next_trigger_time: The next triger time without the jitter.
        jitter: The maximum number of second to add to the next_trigger_time.
        now: The next trigger time with the jitter.
    """
    if next_trigger_time is None or not jitter:
        return next_trigger_time
    return next_trigger_time + timedelta(seconds=random.uniform(0, jitter))

Executors

asyncz.executors.asyncio.AsyncIOExecutor

Bases: BaseExecutor

Executor used for AsyncIO, typically can also be plugged into any ASGI framework as well, for example, Ravyn, Starlette, or FastAPI.

Runs the task in the default executor event loop.

If the task function is a native coroutine function, it is scheduled to be run directly in the event loop as soon as possible. All other functions are run in the event loop's default executor which is usually a thread pool.

logger instance-attribute

logger

instances property

instances

start

start(scheduler, alias)
Source code in asyncz/executors/asyncio.py
def start(self, scheduler: Any, alias: str) -> None:
    super().start(scheduler, alias)
    self.event_loop = scheduler.event_loop
    self.pending_futures: set[Any] = set()

shutdown

shutdown(wait=True)
Source code in asyncz/executors/asyncio.py
def shutdown(self, wait: bool = True) -> None:
    for f in self.pending_futures:
        if not f.done():
            f.cancel()

    self.pending_futures.clear()

do_send_task

do_send_task(task, run_times)
Source code in asyncz/executors/asyncio.py
def do_send_task(self, task: "TaskType", run_times: list[datetime]) -> None:
    task_id = task.id
    assert task_id is not None, "Cannot send decorator type task"
    assert self.logger is not None, "logger is None"

    def callback(fn: Any) -> None:
        self.pending_futures.discard(fn)
        try:
            events = fn.result()
        except BaseException:
            self.run_task_error(task_id)
        else:
            self.run_task_success(task_id, events)

    if inspect.iscoroutinefunction(task.fn):
        coroutine = run_coroutine_task(
            task, cast(str, task.store_alias), run_times, self.logger
        )
        fn = self.event_loop.create_task(coroutine)
    else:
        fn = self.event_loop.run_in_executor(
            None, run_task, task, cast(str, task.store_alias), run_times, self.logger
        )

    fn.add_done_callback(callback)
    self.pending_futures.add(fn)

send_task

send_task(task, run_times)

Sends the task for execution.

PARAMETER DESCRIPTION
task

A Task instance to execute.

TYPE: TaskType

run_times

A list of datetimes specifying when the task should have been run.

TYPE: list[datetime]

Source code in asyncz/executors/base.py
def send_task(self, task: "TaskType", run_times: list[datetime]) -> None:
    """
    Sends the task for execution.

    Args:
        task: A Task instance to execute.
        run_times: A list of datetimes specifying when the task should have been run.
    """
    assert self.lock is not None, "This executor has not been started yet."
    assert task.id is not None, "The task is in decorator mode."
    with self.lock:
        if self.instances[task.id] >= task.max_instances:
            raise MaximumInstancesError(task.id, task.max_instances)

        self.do_send_task(task, run_times)
        self.instances[task.id] += 1

run_task_success

run_task_success(task_id, events)

Called by the executor with the list of generated events when the function run_task has been successfully executed.

Source code in asyncz/executors/base.py
def run_task_success(self, task_id: str, events: list[TaskExecutionEvent]) -> None:
    """
    Called by the executor with the list of generated events when the function run_task has
    been successfully executed.
    """
    with self.lock:
        self.instances[task_id] -= 1
        if self.instances[task_id] == 0:
            del self.instances[task_id]

    for event in events or []:
        self.scheduler.dispatch_event(event)

run_task_error

run_task_error(task_id)

Called by the executor with the exception if there is an error calling the run_task.

Source code in asyncz/executors/base.py
def run_task_error(self, task_id: str) -> None:
    """
    Called by the executor with the exception if there is an error calling the run_task.
    """
    with self.lock:
        self.instances[task_id] -= 1
        if self.instances[task_id] == 0:
            del self.instances[task_id]

    self.scheduler.loggers[self.logger_name].error(
        f"Error running task {task_id}", exc_info=True
    )

asyncz.executors.pool.ThreadPoolExecutor

ThreadPoolExecutor(
    max_workers=10, pool_kwargs=None, **kwargs
)

Bases: BasePoolExecutor

An executor that runs tasks in a concurrent.futures thread pool.

PARAMETER DESCRIPTION
max_workers

The maximum number of spawned threads.

TYPE: int DEFAULT: 10

pool_kwargs

Dict of keyword arguments to pass to the underlying ThreadPoolExecutor constructor.

TYPE: Optional[Any] DEFAULT: None

Source code in asyncz/executors/pool.py
def __init__(self, max_workers: int = 10, pool_kwargs: Optional[Any] = None, **kwargs: Any):
    pool_kwargs = pool_kwargs or {}
    pool = concurrent.futures.ThreadPoolExecutor(int(max_workers), **pool_kwargs)
    super().__init__(pool, **kwargs)

logger instance-attribute

logger

instances property

instances

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow",
    arbitrary_types_allowed=True,
    populate_by_name=True,
)

cancel_futures class-attribute instance-attribute

cancel_futures = False

overwrite_wait class-attribute instance-attribute

overwrite_wait = None

pool instance-attribute

pool = pool

start

start(scheduler, alias)

Called by the scheduler when the scheduler is being started or when the executor is being added to an already running scheduler.

Source code in asyncz/executors/base.py
def start(self, scheduler: Any, alias: str) -> None:
    """
    Called by the scheduler when the scheduler is being started or when the executor is being
    added to an already running scheduler.

    Args:
        scheduler - The scheduler that is starting this executor.
        alias - The alias of this executor as it was assigned to the scheduler.
    """
    self.scheduler = scheduler
    self.lock: RLock = scheduler.create_lock()
    self.logger_name = f"asyncz.executors.{alias}"
    # send to tasks
    self.logger = self.scheduler.loggers[self.logger_name]

shutdown

shutdown(wait=True)
Source code in asyncz/executors/pool.py
def shutdown(self, wait: bool = True) -> None:
    if self.overwrite_wait is not None:
        wait = self.overwrite_wait
    self.pool.shutdown(wait, cancel_futures=self.cancel_futures)

send_task

send_task(task, run_times)

Sends the task for execution.

PARAMETER DESCRIPTION
task

A Task instance to execute.

TYPE: TaskType

run_times

A list of datetimes specifying when the task should have been run.

TYPE: list[datetime]

Source code in asyncz/executors/base.py
def send_task(self, task: "TaskType", run_times: list[datetime]) -> None:
    """
    Sends the task for execution.

    Args:
        task: A Task instance to execute.
        run_times: A list of datetimes specifying when the task should have been run.
    """
    assert self.lock is not None, "This executor has not been started yet."
    assert task.id is not None, "The task is in decorator mode."
    with self.lock:
        if self.instances[task.id] >= task.max_instances:
            raise MaximumInstancesError(task.id, task.max_instances)

        self.do_send_task(task, run_times)
        self.instances[task.id] += 1

do_send_task

do_send_task(task, run_times)
Source code in asyncz/executors/pool.py
def do_send_task(self, task: "TaskType", run_times: list[datetime]) -> Any:
    task_id = task.id
    assert task_id is not None, "Cannot send decorator type task"

    def callback(fn: Any) -> None:
        exc, _ = (
            fn.exception_info()
            if hasattr(fn, "exception_info")
            else (fn.exception(), getattr(fn.exception(), "__traceback__", None))
        )
        if exc:
            self.run_task_error(task_id)
        else:
            self.run_task_success(task_id, fn.result())

    try:
        fn = self.pool.submit(run_task, task, task.store_alias, run_times, self.logger)
    except (BrokenProcessPool, TypeError):
        self.scheduler.loggers[self.logger_name].warning(
            "Process pool is broken. Replacing pool with a new instance."
        )
        self.pool = self.pool.__class__(self.pool.max_workers)
        fn = self.pool.submit(run_task, task, task.store_alias, run_times, self.logger)

    fn.add_done_callback(callback)

run_task_success

run_task_success(task_id, events)

Called by the executor with the list of generated events when the function run_task has been successfully executed.

Source code in asyncz/executors/base.py
def run_task_success(self, task_id: str, events: list[TaskExecutionEvent]) -> None:
    """
    Called by the executor with the list of generated events when the function run_task has
    been successfully executed.
    """
    with self.lock:
        self.instances[task_id] -= 1
        if self.instances[task_id] == 0:
            del self.instances[task_id]

    for event in events or []:
        self.scheduler.dispatch_event(event)

run_task_error

run_task_error(task_id)

Called by the executor with the exception if there is an error calling the run_task.

Source code in asyncz/executors/base.py
def run_task_error(self, task_id: str) -> None:
    """
    Called by the executor with the exception if there is an error calling the run_task.
    """
    with self.lock:
        self.instances[task_id] -= 1
        if self.instances[task_id] == 0:
            del self.instances[task_id]

    self.scheduler.loggers[self.logger_name].error(
        f"Error running task {task_id}", exc_info=True
    )

asyncz.executors.process_pool.ProcessPoolExecutor

ProcessPoolExecutor(
    max_workers=10, pool_kwargs=None, **kwargs
)

Bases: BasePoolExecutor

An executor that runs tasks in a concurrent.futures process pool.

PARAMETER DESCRIPTION
max_workers

The maximum number of spawned processes.

TYPE: int DEFAULT: 10

pool_kwargs

Dict of keyword arguments to pass to the underlying ProcessPoolExecutor constructor.

TYPE: Optional[Any] DEFAULT: None

Source code in asyncz/executors/process_pool.py
def __init__(
    self, max_workers: int = 10, pool_kwargs: Optional[Any] = None, **kwargs: Any
) -> None:
    self.receive_pipe, self.send_pipe = Pipe(False)
    pool_kwargs = pool_kwargs or {}
    pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs)
    super().__init__(pool, **kwargs)

logger instance-attribute

logger

instances property

instances

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow",
    arbitrary_types_allowed=True,
    populate_by_name=True,
)

cancel_futures class-attribute instance-attribute

cancel_futures = False

overwrite_wait class-attribute instance-attribute

overwrite_wait = None

pool instance-attribute

pool = pool

start

start(scheduler, alias)
Source code in asyncz/executors/process_pool.py
def start(self, scheduler: "SchedulerType", alias: str) -> None:
    super().start(scheduler, alias)
    assert self.logger is not None, "logger is None"
    # move the old logger to logger_receiver
    self.logger_receiver = ProcessPoolReceiver(self.receive_pipe, self.logger)
    # and send the process logger instead
    self.logger = cast("logging.Logger", ProcessPoolLoggerSender(self.send_pipe))

shutdown

shutdown(wait=True)
Source code in asyncz/executors/process_pool.py
def shutdown(self, wait: bool = True) -> None:
    super().shutdown(wait=wait)
    self.send_pipe.close()
    self.logger_receiver.join()

send_task

send_task(task, run_times)

Sends the task for execution.

PARAMETER DESCRIPTION
task

A Task instance to execute.

TYPE: TaskType

run_times

A list of datetimes specifying when the task should have been run.

TYPE: list[datetime]

Source code in asyncz/executors/base.py
def send_task(self, task: "TaskType", run_times: list[datetime]) -> None:
    """
    Sends the task for execution.

    Args:
        task: A Task instance to execute.
        run_times: A list of datetimes specifying when the task should have been run.
    """
    assert self.lock is not None, "This executor has not been started yet."
    assert task.id is not None, "The task is in decorator mode."
    with self.lock:
        if self.instances[task.id] >= task.max_instances:
            raise MaximumInstancesError(task.id, task.max_instances)

        self.do_send_task(task, run_times)
        self.instances[task.id] += 1

do_send_task

do_send_task(task, run_times)
Source code in asyncz/executors/pool.py
def do_send_task(self, task: "TaskType", run_times: list[datetime]) -> Any:
    task_id = task.id
    assert task_id is not None, "Cannot send decorator type task"

    def callback(fn: Any) -> None:
        exc, _ = (
            fn.exception_info()
            if hasattr(fn, "exception_info")
            else (fn.exception(), getattr(fn.exception(), "__traceback__", None))
        )
        if exc:
            self.run_task_error(task_id)
        else:
            self.run_task_success(task_id, fn.result())

    try:
        fn = self.pool.submit(run_task, task, task.store_alias, run_times, self.logger)
    except (BrokenProcessPool, TypeError):
        self.scheduler.loggers[self.logger_name].warning(
            "Process pool is broken. Replacing pool with a new instance."
        )
        self.pool = self.pool.__class__(self.pool.max_workers)
        fn = self.pool.submit(run_task, task, task.store_alias, run_times, self.logger)

    fn.add_done_callback(callback)

run_task_success

run_task_success(task_id, events)

Called by the executor with the list of generated events when the function run_task has been successfully executed.

Source code in asyncz/executors/base.py
def run_task_success(self, task_id: str, events: list[TaskExecutionEvent]) -> None:
    """
    Called by the executor with the list of generated events when the function run_task has
    been successfully executed.
    """
    with self.lock:
        self.instances[task_id] -= 1
        if self.instances[task_id] == 0:
            del self.instances[task_id]

    for event in events or []:
        self.scheduler.dispatch_event(event)

run_task_error

run_task_error(task_id)

Called by the executor with the exception if there is an error calling the run_task.

Source code in asyncz/executors/base.py
def run_task_error(self, task_id: str) -> None:
    """
    Called by the executor with the exception if there is an error calling the run_task.
    """
    with self.lock:
        self.instances[task_id] -= 1
        if self.instances[task_id] == 0:
            del self.instances[task_id]

    self.scheduler.loggers[self.logger_name].error(
        f"Error running task {task_id}", exc_info=True
    )

asyncz.executors.debug.DebugExecutor

Bases: BaseExecutor

A special executor that executes the target callable directly instead of deferring it to a thread or process.

logger instance-attribute

logger

instances property

instances

do_send_task

do_send_task(task, run_times)
Source code in asyncz/executors/debug.py
def do_send_task(
    self,
    task: "TaskType",
    run_times: list[datetime],
) -> None:
    assert task.id is not None, "Cannot send decorator type task"
    assert self.logger is not None, "logger is None"
    try:
        events = run_task(task, cast(str, task.store_alias), run_times, self.logger)
    except Exception:
        self.run_task_error(task.id)
    else:
        self.run_task_success(task.id, events)

start

start(scheduler, alias)

Called by the scheduler when the scheduler is being started or when the executor is being added to an already running scheduler.

Source code in asyncz/executors/base.py
def start(self, scheduler: Any, alias: str) -> None:
    """
    Called by the scheduler when the scheduler is being started or when the executor is being
    added to an already running scheduler.

    Args:
        scheduler - The scheduler that is starting this executor.
        alias - The alias of this executor as it was assigned to the scheduler.
    """
    self.scheduler = scheduler
    self.lock: RLock = scheduler.create_lock()
    self.logger_name = f"asyncz.executors.{alias}"
    # send to tasks
    self.logger = self.scheduler.loggers[self.logger_name]

shutdown

shutdown(wait=True)

Shuts down the executor.

Source code in asyncz/executors/base.py
def shutdown(self, wait: bool = True) -> None:
    """
    Shuts down the executor.

    Args:
        wait - Boolean indicating to wait until all submitted tasks have been executed.
    """

send_task

send_task(task, run_times)

Sends the task for execution.

PARAMETER DESCRIPTION
task

A Task instance to execute.

TYPE: TaskType

run_times

A list of datetimes specifying when the task should have been run.

TYPE: list[datetime]

Source code in asyncz/executors/base.py
def send_task(self, task: "TaskType", run_times: list[datetime]) -> None:
    """
    Sends the task for execution.

    Args:
        task: A Task instance to execute.
        run_times: A list of datetimes specifying when the task should have been run.
    """
    assert self.lock is not None, "This executor has not been started yet."
    assert task.id is not None, "The task is in decorator mode."
    with self.lock:
        if self.instances[task.id] >= task.max_instances:
            raise MaximumInstancesError(task.id, task.max_instances)

        self.do_send_task(task, run_times)
        self.instances[task.id] += 1

run_task_success

run_task_success(task_id, events)

Called by the executor with the list of generated events when the function run_task has been successfully executed.

Source code in asyncz/executors/base.py
def run_task_success(self, task_id: str, events: list[TaskExecutionEvent]) -> None:
    """
    Called by the executor with the list of generated events when the function run_task has
    been successfully executed.
    """
    with self.lock:
        self.instances[task_id] -= 1
        if self.instances[task_id] == 0:
            del self.instances[task_id]

    for event in events or []:
        self.scheduler.dispatch_event(event)

run_task_error

run_task_error(task_id)

Called by the executor with the exception if there is an error calling the run_task.

Source code in asyncz/executors/base.py
def run_task_error(self, task_id: str) -> None:
    """
    Called by the executor with the exception if there is an error calling the run_task.
    """
    with self.lock:
        self.instances[task_id] -= 1
        if self.instances[task_id] == 0:
            del self.instances[task_id]

    self.scheduler.loggers[self.logger_name].error(
        f"Error running task {task_id}", exc_info=True
    )

Stores

asyncz.stores.memory.MemoryStore

MemoryStore(**kwargs)

Bases: BaseStore

Stores tasks in an array in RAM. Provides no persistance support.

Source code in asyncz/stores/memory.py
def __init__(self, **kwargs: Any) -> None:
    super().__init__(**kwargs)
    self.tasks: list[tuple[TaskType, Optional[float]]] = []
    self.tasks_index: dict[str, tuple[TaskType, Optional[float]]] = {}

tasks instance-attribute

tasks = []

tasks_index instance-attribute

tasks_index = {}

alias class-attribute instance-attribute

alias = None

lock class-attribute instance-attribute

lock = NullLockProtected()

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow", arbitrary_types_allowed=True
)

scheduler instance-attribute

scheduler = None

encryption_key instance-attribute

encryption_key = None

lookup_task

lookup_task(task_id)
Source code in asyncz/stores/memory.py
def lookup_task(self, task_id: str) -> Optional["TaskType"]:
    return self.tasks_index.get(task_id, (None, None))[0]

get_task

get_task(task_id)

Return the task by id or raise TaskLookupError if it's missing.

This mirrors the expected BaseStore API used by dashboard helpers that probe stores for task membership.

Source code in asyncz/stores/memory.py
def get_task(self, task_id: str) -> "TaskType":
    """Return the task by id or raise TaskLookupError if it's missing.

    This mirrors the expected BaseStore API used by dashboard helpers
    that probe stores for task membership.
    """
    task = self.lookup_task(task_id)
    if task is None:
        raise TaskLookupError(task_id)
    return task

get_due_tasks

get_due_tasks(now)
Source code in asyncz/stores/memory.py
def get_due_tasks(self, now: datetime) -> list["TaskType"]:
    now_timestamp = datetime_to_utc_timestamp(now)
    pending = []

    for task, timestamp in self.tasks:
        if timestamp is None or timestamp > now_timestamp:
            break
        pending.append(task)

    return pending

get_next_run_time

get_next_run_time()
Source code in asyncz/stores/memory.py
def get_next_run_time(self) -> Optional[datetime]:
    return (self.tasks[0][0].next_run_time or None) if self.tasks else None

get_all_tasks

get_all_tasks()
Source code in asyncz/stores/memory.py
def get_all_tasks(self) -> list["TaskType"]:
    return [task[0] for task in self.tasks]

add_task

add_task(task)
Source code in asyncz/stores/memory.py
def add_task(self, task: "TaskType") -> None:
    assert task.id is not None, "The task is in decorator mode."
    if task.id in self.tasks_index:
        raise ConflictIdError(task.id)

    timestamp = datetime_to_utc_timestamp(task.next_run_time or None)
    index = self.get_task_index(timestamp, task.id)

    self.tasks.insert(index, (task, timestamp))
    self.tasks_index[task.id] = (task, timestamp)

update_task

update_task(task)
Source code in asyncz/stores/memory.py
def update_task(self, task: "TaskType") -> None:
    assert task.id is not None, "The task is in decorator mode."
    old_task, old_timestamp = self.tasks_index.get(task.id, (None, None))

    new_timestamp = datetime_to_utc_timestamp(task.next_run_time or None)

    if old_task is None:
        # Not present yet: be tolerant and insert it (dashboard flows may update before first commit)
        index = self.get_task_index(new_timestamp, task.id)
        self.tasks.insert(index, (task, new_timestamp))
        self.tasks_index[task.id] = (task, new_timestamp)
        return

    old_index = self.get_task_index(old_timestamp, old_task.id)  # type: ignore
    if old_timestamp == new_timestamp:
        # Keep the slot, just replace the task tuple
        self.tasks[old_index] = (task, new_timestamp)
    else:
        # Remove from the old slot and re-insert at the new sorted position
        del self.tasks[old_index]
        new_index = self.get_task_index(new_timestamp, task.id)
        self.tasks.insert(new_index, (task, new_timestamp))

    # Always refresh the index mapping
    self.tasks_index[task.id] = (task, new_timestamp)

delete_task

delete_task(task_id)
Source code in asyncz/stores/memory.py
def delete_task(self, task_id: str) -> None:
    task, timestamp = self.tasks_index.get(task_id, (None, None))
    if task is None:
        raise TaskLookupError(task_id)

    index = self.get_task_index(timestamp, task_id)
    del self.tasks[index]
    del self.tasks_index[task_id]

remove_all_tasks

remove_all_tasks()
Source code in asyncz/stores/memory.py
def remove_all_tasks(self) -> None:
    self.tasks = []
    self.tasks_index = {}

shutdown

shutdown()
Source code in asyncz/stores/memory.py
def shutdown(self) -> None:
    self.remove_all_tasks()
    super().shutdown()

get_task_index

get_task_index(timestamp, task_id)

Returns the index of the given task, or if it's not found, the index where the task should be inserted based on the given timestamp.

Source code in asyncz/stores/memory.py
def get_task_index(self, timestamp: Union[int, float, None], task_id: str) -> int:
    """
    Returns the index of the given task, or if it's not found, the index where the task should be
    inserted based on the given timestamp.
    """
    low, high = 0, len(self.tasks)
    timestamp = float("inf") if timestamp is None else timestamp
    while low < high:
        mid = (low + high) // 2
        mid_task, mid_timestamp = self.tasks[mid]
        mid_timestamp = float("inf") if mid_timestamp is None else mid_timestamp
        if mid_timestamp > timestamp:
            high = mid
        elif mid_timestamp < timestamp:
            low = mid + 1
        elif mid_task.id > task_id:  # type: ignore
            high = mid
        elif mid_task.id < task_id:  # type: ignore
            low = mid + 1
        else:
            return mid

    return low

start

start(scheduler, alias)

Called by the scheduler when the scheduler is being started or when the task store is being added to an already running scheduler.

PARAMETER DESCRIPTION
scheduler

The scheduler that is starting this task store.

TYPE: SchedulerType

alias

Alias of this task store as it was assigned to the scheduler.

TYPE: str

Source code in asyncz/stores/base.py
def start(self, scheduler: SchedulerType, alias: str) -> None:
    """
    Called by the scheduler when the scheduler is being started or when the task store is being
    added to an already running scheduler.

    Args:
        scheduler: The scheduler that is starting this task store.
        alias: Alias of this task store as it was assigned to the scheduler.
    """
    self.scheduler = scheduler
    self.alias = alias
    self.logger_name = f"asyncz.stores.{alias}"
    self.lock = self.create_lock()
    encryption_key = os.environ.get("ASYNCZ_STORE_ENCRYPTION_KEY")
    if encryption_key:
        # we simply use a hash. This way all kinds of tokens, lengths and co are supported
        self.encryption_key = AESCCM(hashlib.new("sha256", encryption_key.encode()).digest())

create_lock

create_lock()

Creates a lock protector.

Source code in asyncz/stores/base.py
def create_lock(self) -> LockProtectedProtocol:
    """
    Creates a lock protector.
    """
    if not self.scheduler or not self.scheduler.lock_path:
        return NullLockProtected()
    return FileLockProtected(self.scheduler.lock_path.replace(r"{store}", self.alias))

conditional_decrypt

conditional_decrypt(inp)
Source code in asyncz/stores/base.py
def conditional_decrypt(self, inp: bytes) -> bytes:
    if self.encryption_key:
        return self.encryption_key.decrypt(inp[:13], inp[13:], None)
    else:
        return inp

conditional_encrypt

conditional_encrypt(inp)
Source code in asyncz/stores/base.py
def conditional_encrypt(self, inp: bytes) -> bytes:
    if self.encryption_key:
        nonce = os.urandom(13)
        return nonce + self.encryption_key.encrypt(nonce, inp, None)
    else:
        return inp

fix_paused_tasks

fix_paused_tasks(tasks)
Source code in asyncz/stores/base.py
def fix_paused_tasks(self, tasks: list[TaskType]) -> None:
    for index, task in enumerate(tasks):
        if task.next_run_time is not None:
            if index > 0:
                paused_tasks = tasks[:index]
                del tasks[:index]
                tasks.extend(paused_tasks)
            break

asyncz.stores.file.FileStore

FileStore(
    directory,
    suffix=".task",
    mode=448,
    cleanup_directory=False,
    pickle_protocol=HIGHEST_PROTOCOL,
    **kwargs,
)

Bases: BaseStore

Stores tasks via sqlalchemy in a database.

Source code in asyncz/stores/file.py
def __init__(
    self,
    directory: Union[str, os.PathLike],
    suffix: str = ".task",
    mode: int = 0o700,
    cleanup_directory: bool = False,
    pickle_protocol: Optional[int] = pickle.HIGHEST_PROTOCOL,
    **kwargs: Any,
) -> None:
    super().__init__(**kwargs)
    self.pickle_protocol = pickle_protocol
    self.directory = Path(directory)
    self.mode = mode
    self.cleanup_directory = cleanup_directory
    self.suffix = suffix

forbidden_characters class-attribute instance-attribute

forbidden_characters = {'/', '\\', '\x00', ':'}

pickle_protocol instance-attribute

pickle_protocol = pickle_protocol

directory instance-attribute

directory = Path(directory)

mode instance-attribute

mode = mode

cleanup_directory instance-attribute

cleanup_directory = cleanup_directory

suffix instance-attribute

suffix = suffix

alias class-attribute instance-attribute

alias = None

lock class-attribute instance-attribute

lock = NullLockProtected()

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow", arbitrary_types_allowed=True
)

scheduler instance-attribute

scheduler = None

encryption_key instance-attribute

encryption_key = None

check_task_id

check_task_id(task_id)
Source code in asyncz/stores/file.py
def check_task_id(self, task_id: str | None) -> None:
    if task_id is None:
        raise RuntimeError("Task id is None")
    if task_id.startswith("."):
        raise RuntimeError(f'Invalid character in task id: "{task_id}".')
    for char in task_id:
        if char in self.forbidden_characters:
            raise RuntimeError(f'Invalid character in task id: "{task_id}".')

start

start(scheduler, alias)

When starting omits from the index any documents that lack next_run_time field.

Source code in asyncz/stores/file.py
def start(self, scheduler: Any, alias: str) -> None:
    """
    When starting omits from the index any documents that lack next_run_time field.
    """
    super().start(scheduler, alias)
    self.directory.mkdir(self.mode, parents=True, exist_ok=True)
    if not self.directory.is_dir():
        raise RuntimeError("Not a directory.")

shutdown

shutdown()
Source code in asyncz/stores/file.py
def shutdown(self) -> None:
    if self.cleanup_directory:
        shutil.rmtree(self.directory, ignore_errors=True)
    super().shutdown()

lookup_task

lookup_task(task_id)
Source code in asyncz/stores/file.py
def lookup_task(self, task_id: str) -> Optional[TaskType]:
    self.check_task_id(task_id)
    task_path = self.directory / f"{task_id}{self.suffix}"
    try:
        with open(task_path, "rb") as read_ob, with_lock(read_ob, LOCK_SH):
            task = self.rebuild_task(read_ob.read())
    except Exception:
        task_path.unlink(missing_ok=True)
        task = None
    return task

rebuild_task

rebuild_task(state)
Source code in asyncz/stores/file.py
def rebuild_task(self, state: Any) -> TaskType:
    state = pickle.loads(self.conditional_decrypt(state))
    task = Task.__new__(Task)
    task.__setstate__(state)
    task.scheduler = cast("SchedulerType", self.scheduler)
    task.store_alias = self.alias
    return task

get_due_tasks

get_due_tasks(now)
Source code in asyncz/stores/file.py
def get_due_tasks(self, now: datetime) -> list[TaskType]:
    return [
        task
        for task in self.get_all_tasks()
        if task.next_run_time is not None and task.next_run_time <= now
    ]

get_tasks

get_tasks()
Source code in asyncz/stores/file.py
def get_tasks(self) -> list[TaskType]:
    tasks: list[tuple[TaskType, os.stat_result]] = []
    with os.scandir(self.directory) as scanner:
        for entry in scanner:
            if not entry.name.endswith(self.suffix) or not entry.is_file():
                continue
            try:
                with open(entry.path, "rb") as read_ob, with_lock(read_ob, LOCK_SH):
                    task = self.rebuild_task(read_ob.read())
                tasks.append((task, entry.stat()))
            except Exception:
                with suppress(FileNotFoundError):
                    os.unlink(entry.path)
        return [
            task
            for task, _ in sorted(
                tasks,
                key=lambda task_stat: (
                    int(task_stat[0].next_run_time is None),
                    task_stat[0].next_run_time,
                    # sort for task creation not update
                    task_stat[1].st_ctime,
                ),
            )
        ]

get_next_run_time

get_next_run_time()
Source code in asyncz/stores/file.py
def get_next_run_time(self) -> Optional[datetime]:
    next_run_time: datetime | None = None
    for task in self.get_all_tasks():
        if task.next_run_time is None:
            break
        if next_run_time is None or next_run_time >= task.next_run_time:
            next_run_time = task.next_run_time
    return next_run_time

get_all_tasks

get_all_tasks()
Source code in asyncz/stores/file.py
def get_all_tasks(self) -> list[TaskType]:
    return self.get_tasks()

add_task

add_task(task)
Source code in asyncz/stores/file.py
def add_task(self, task: TaskType) -> None:
    self.check_task_id(task.id)
    task_path = self.directory / f"{task.id}{self.suffix}"
    try:
        with task_path.open("xb") as write_ob, with_lock(write_ob, LOCK_EX):
            write_ob.write(
                self.conditional_encrypt(
                    pickle.dumps(task.__getstate__(), self.pickle_protocol)
                )
            )
    except FileExistsError:
        raise ConflictIdError(task.id) from None

update_task

update_task(task)
Source code in asyncz/stores/file.py
def update_task(self, task: TaskType) -> None:
    self.check_task_id(task.id)
    task_path = self.directory / f"{task.id}{self.suffix}"
    try:
        with task_path.open("r+b") as write_ob, with_lock(write_ob, LOCK_EX):
            write_ob.truncate()
            write_ob.write(
                self.conditional_encrypt(
                    pickle.dumps(task.__getstate__(), self.pickle_protocol)
                )
            )
    except FileNotFoundError:
        raise TaskLookupError(task.id) from None

delete_task

delete_task(task_id)
Source code in asyncz/stores/file.py
def delete_task(self, task_id: str) -> None:
    self.check_task_id(task_id)
    task_path = self.directory / f"{task_id}{self.suffix}"
    try:
        task_path.unlink(missing_ok=False)
    except FileNotFoundError:
        raise TaskLookupError(task_id) from None

remove_all_tasks

remove_all_tasks()
Source code in asyncz/stores/file.py
def remove_all_tasks(self) -> None:
    for task_path in self.directory.glob(f"*{glob.escape(self.suffix)}"):
        task_path.unlink(missing_ok=True)

create_lock

create_lock()

Creates a lock protector.

Source code in asyncz/stores/base.py
def create_lock(self) -> LockProtectedProtocol:
    """
    Creates a lock protector.
    """
    if not self.scheduler or not self.scheduler.lock_path:
        return NullLockProtected()
    return FileLockProtected(self.scheduler.lock_path.replace(r"{store}", self.alias))

conditional_decrypt

conditional_decrypt(inp)
Source code in asyncz/stores/base.py
def conditional_decrypt(self, inp: bytes) -> bytes:
    if self.encryption_key:
        return self.encryption_key.decrypt(inp[:13], inp[13:], None)
    else:
        return inp

conditional_encrypt

conditional_encrypt(inp)
Source code in asyncz/stores/base.py
def conditional_encrypt(self, inp: bytes) -> bytes:
    if self.encryption_key:
        nonce = os.urandom(13)
        return nonce + self.encryption_key.encrypt(nonce, inp, None)
    else:
        return inp

fix_paused_tasks

fix_paused_tasks(tasks)
Source code in asyncz/stores/base.py
def fix_paused_tasks(self, tasks: list[TaskType]) -> None:
    for index, task in enumerate(tasks):
        if task.next_run_time is not None:
            if index > 0:
                paused_tasks = tasks[:index]
                del tasks[:index]
                tasks.extend(paused_tasks)
            break

asyncz.stores.mongo.MongoDBStore

MongoDBStore(
    database="asyncz",
    collection="tasks",
    client=None,
    pickle_protocol=HIGHEST_PROTOCOL,
    **kwargs,
)

Bases: BaseStore

Stores tasks in a Mongo database instance. Any remaining kwargs are passing directly to the mongo client.

Source code in asyncz/stores/mongo.py
def __init__(
    self,
    database: str = "asyncz",
    collection: str = "tasks",
    client: Optional[MongoClient] = None,
    pickle_protocol: Optional[int] = pickle.HIGHEST_PROTOCOL,
    **kwargs: Any,
) -> None:
    super().__init__(**kwargs)
    self.pickle_protocol = pickle_protocol

    if not database:
        raise ValueError("database must not be empty or None")

    if not client:
        kwargs.setdefault("w", 1)
        self.client: MongoClient = MongoClient(**kwargs)
    else:
        self.client = maybe_ref(client)

    self.collection = self.client[database][collection]

pickle_protocol instance-attribute

pickle_protocol = pickle_protocol

client instance-attribute

client = MongoClient(**kwargs)

collection instance-attribute

collection = client[database][collection]

alias class-attribute instance-attribute

alias = None

lock class-attribute instance-attribute

lock = NullLockProtected()

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow", arbitrary_types_allowed=True
)

scheduler instance-attribute

scheduler = None

encryption_key instance-attribute

encryption_key = None

start

start(scheduler, alias)

When starting omits from the index any documents that lack next_run_time field.

Source code in asyncz/stores/mongo.py
def start(self, scheduler: Any, alias: str) -> None:
    """
    When starting omits from the index any documents that lack next_run_time field.
    """
    super().start(scheduler, alias)
    self.collection.create_index("next_run_time", sparse=True)

lookup_task

lookup_task(task_id)
Source code in asyncz/stores/mongo.py
def lookup_task(self, task_id: str) -> Optional["TaskType"]:
    document = self.collection.find_one(task_id, ["state"])
    return self.rebuild_task(document["state"]) if document else None

rebuild_task

rebuild_task(state)
Source code in asyncz/stores/mongo.py
def rebuild_task(self, state: Any) -> "TaskType":
    state = pickle.loads(self.conditional_decrypt(state))
    task = Task.__new__(Task)
    task.__setstate__(state)
    task.scheduler = cast("SchedulerType", self.scheduler)
    task.store_alias = self.alias
    return task

get_due_tasks

get_due_tasks(now)
Source code in asyncz/stores/mongo.py
def get_due_tasks(self, now: datetime) -> list["TaskType"]:
    timestamp = datetime_to_utc_timestamp(now)
    return self.get_tasks({"next_run_time": {"$lte": timestamp}})

get_tasks

get_tasks(conditions)
Source code in asyncz/stores/mongo.py
def get_tasks(self, conditions: DictAny) -> list["TaskType"]:
    tasks: list[TaskType] = []
    failed_task_ids = []

    for document in self.collection.find(
        conditions, ["_id", "state"], sort=[("next_run_time", ASCENDING)]
    ):
        try:
            tasks.append(self.rebuild_task(document["state"]))
        except BaseException:
            doc_id = document["_id"]
            cast("SchedulerType", self.scheduler).loggers[self.logger_name].exception(
                f"Unable to restore task '{doc_id}'. Removing it..."
            )
            failed_task_ids.append(doc_id)

    if failed_task_ids:
        self.collection.delete_many({"_id": {"$in": failed_task_ids}})

    return tasks

get_next_run_time

get_next_run_time()
Source code in asyncz/stores/mongo.py
def get_next_run_time(self) -> Optional[datetime]:
    document = self.collection.find_one(
        {"next_run_time": {"$ne": None}},
        projection=["next_run_time"],
        sort=[("next_run_time", ASCENDING)],
    )
    return utc_timestamp_to_datetime(document["next_run_time"]) if document else None

get_all_tasks

get_all_tasks()
Source code in asyncz/stores/mongo.py
def get_all_tasks(self) -> list["TaskType"]:
    tasks = self.get_tasks({})
    self.fix_paused_tasks(tasks)
    return tasks

add_task

add_task(task)
Source code in asyncz/stores/mongo.py
def add_task(self, task: "TaskType") -> None:
    try:
        self.collection.insert_one(
            {
                "_id": task.id,
                "next_run_time": datetime_to_utc_timestamp(task.next_run_time or None),
                "state": Binary(
                    self.conditional_encrypt(
                        pickle.dumps(task.__getstate__(), self.pickle_protocol)
                    )
                ),
            }
        )
    except DuplicateKeyError:
        raise ConflictIdError(task.id) from None

update_task

update_task(task)
Source code in asyncz/stores/mongo.py
def update_task(self, task: "TaskType") -> None:
    updates = {
        "next_run_time": datetime_to_utc_timestamp(task.next_run_time or None),
        "state": Binary(
            self.conditional_encrypt(pickle.dumps(task.__getstate__(), self.pickle_protocol))
        ),
    }
    result = self.collection.update_one({"_id": task.id}, {"$set": updates})
    if result and result.matched_count == 0:
        raise TaskLookupError(task.id)

delete_task

delete_task(task_id)
Source code in asyncz/stores/mongo.py
def delete_task(self, task_id: str) -> None:
    result = self.collection.delete_one({"_id": task_id})
    if result and result.deleted_count == 0:
        raise TaskLookupError(task_id)

remove_all_tasks

remove_all_tasks()
Source code in asyncz/stores/mongo.py
def remove_all_tasks(self) -> None:
    self.collection.delete_many({})

shutdown

shutdown()
Source code in asyncz/stores/mongo.py
def shutdown(self) -> None:
    self.client.close()
    super().shutdown()

create_lock

create_lock()

Creates a lock protector.

Source code in asyncz/stores/base.py
def create_lock(self) -> LockProtectedProtocol:
    """
    Creates a lock protector.
    """
    if not self.scheduler or not self.scheduler.lock_path:
        return NullLockProtected()
    return FileLockProtected(self.scheduler.lock_path.replace(r"{store}", self.alias))

conditional_decrypt

conditional_decrypt(inp)
Source code in asyncz/stores/base.py
def conditional_decrypt(self, inp: bytes) -> bytes:
    if self.encryption_key:
        return self.encryption_key.decrypt(inp[:13], inp[13:], None)
    else:
        return inp

conditional_encrypt

conditional_encrypt(inp)
Source code in asyncz/stores/base.py
def conditional_encrypt(self, inp: bytes) -> bytes:
    if self.encryption_key:
        nonce = os.urandom(13)
        return nonce + self.encryption_key.encrypt(nonce, inp, None)
    else:
        return inp

fix_paused_tasks

fix_paused_tasks(tasks)
Source code in asyncz/stores/base.py
def fix_paused_tasks(self, tasks: list[TaskType]) -> None:
    for index, task in enumerate(tasks):
        if task.next_run_time is not None:
            if index > 0:
                paused_tasks = tasks[:index]
                del tasks[:index]
                tasks.extend(paused_tasks)
            break

asyncz.stores.redis.RedisStore

RedisStore(
    database=0,
    tasks_key="asyncz.tasks",
    run_times_key="asyncz.run_times",
    pickle_protocol=HIGHEST_PROTOCOL,
    **kwargs,
)

Bases: BaseStore

Stores tasks in a Redis instance. Any remaining kwargs are passing directly to the redis instance.

Source code in asyncz/stores/redis.py
def __init__(
    self,
    database: int = 0,
    tasks_key: str = "asyncz.tasks",
    run_times_key: str = "asyncz.run_times",
    pickle_protocol: int | None = pickle.HIGHEST_PROTOCOL,
    **kwargs: Any,
):
    super().__init__(**kwargs)
    try:
        self.database = int(database)
    except (TypeError, ValueError):
        raise AsynczException(
            f"The database value must be and int and got ({type(database)})"
        ) from None

    self.pickle_protocol = pickle_protocol
    self.tasks_key = tasks_key
    self.run_times_key = run_times_key
    self.redis = Redis(db=self.database, **kwargs)

database instance-attribute

database = int(database)

pickle_protocol instance-attribute

pickle_protocol = pickle_protocol

tasks_key instance-attribute

tasks_key = tasks_key

run_times_key instance-attribute

run_times_key = run_times_key

redis instance-attribute

redis = Redis(db=database, **kwargs)

alias class-attribute instance-attribute

alias = None

lock class-attribute instance-attribute

lock = NullLockProtected()

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow", arbitrary_types_allowed=True
)

scheduler instance-attribute

scheduler = None

encryption_key instance-attribute

encryption_key = None

lookup_task

lookup_task(task_id)
Source code in asyncz/stores/redis.py
def lookup_task(self, task_id: str) -> Optional["TaskType"]:
    state = self.redis.hget(self.tasks_key, task_id)
    return self.rebuild_task(state) if state else None

rebuild_task

rebuild_task(state)
Source code in asyncz/stores/redis.py
def rebuild_task(self, state: Any) -> "TaskType":
    state = pickle.loads(self.conditional_decrypt(state))
    task = Task.__new__(Task)
    task.__setstate__(state)
    task.scheduler = cast("SchedulerType", self.scheduler)
    task.store_alias = self.alias
    return task

get_due_tasks

get_due_tasks(now)
Source code in asyncz/stores/redis.py
def get_due_tasks(self, now: datetime) -> list["TaskType"]:
    timestamp = datetime_to_utc_timestamp(now)
    ids: list[str] = self.redis.zrangebyscore(self.run_times_key, 0, timestamp)  # type: ignore
    if not ids:
        return []
    states: list[Any] = self.redis.hmget(self.tasks_key, ids)  # type: ignore
    return self.rebuild_tasks(zip(ids, states, strict=False))

rebuild_tasks

rebuild_tasks(states)
Source code in asyncz/stores/redis.py
def rebuild_tasks(self, states: Iterable[tuple[str, Any]]) -> list["TaskType"]:
    tasks = []
    failed_task_ids = []

    for task_id, state in states:
        try:
            tasks.append(self.rebuild_task(state))
        except BaseException:
            cast("SchedulerType", self.scheduler).loggers[self.logger_name].exception(
                f"Unable to restore task '{task_id}'. Removing it..."
            )
            failed_task_ids.append(task_id)

    if failed_task_ids:
        with self.redis.pipeline() as pipe:
            pipe.hdel(self.tasks_key, *failed_task_ids)
            pipe.zrem(self.run_times_key, *failed_task_ids)
            pipe.execute()

    return tasks

get_next_run_time

get_next_run_time()
Source code in asyncz/stores/redis.py
def get_next_run_time(self) -> datetime | None:
    next_run_time: Any = self.redis.zrange(self.run_times_key, 0, 0, withscores=True)
    if next_run_time:
        return utc_timestamp_to_datetime(cast(float, next_run_time[0][1]))
    return None

get_all_tasks

get_all_tasks()
Source code in asyncz/stores/redis.py
def get_all_tasks(self) -> list["TaskType"]:
    states: list[tuple[str, Any]] = self.redis.hgetall(self.tasks_key)  # type: ignore
    tasks = self.rebuild_tasks(states.items())
    paused_sort_key = datetime(9999, 12, 31, tzinfo=tz.utc)
    return sorted(tasks, key=lambda task: task.next_run_time or paused_sort_key)

add_task

add_task(task)
Source code in asyncz/stores/redis.py
def add_task(self, task: "TaskType") -> None:
    assert task.id
    if self.redis.hexists(self.tasks_key, task.id):
        raise ConflictIdError(task.id)

    with self.redis.pipeline() as pipe:
        pipe.multi()
        pipe.hset(
            self.tasks_key,
            task.id,
            self.conditional_encrypt(pickle.dumps(task.__getstate__(), self.pickle_protocol)),  # type: ignore
        )

        if task.next_run_time:
            pipe.zadd(
                self.run_times_key,
                {task.id: datetime_to_utc_timestamp(task.next_run_time)},
            )
        pipe.execute()

update_task

update_task(task)
Source code in asyncz/stores/redis.py
def update_task(self, task: "TaskType") -> None:
    assert task.id
    if not self.redis.hexists(self.tasks_key, task.id):
        raise TaskLookupError(task.id)

    with self.redis.pipeline() as pipe:
        pipe.hset(
            self.tasks_key,
            task.id,
            self.conditional_encrypt(pickle.dumps(task.__getstate__(), self.pickle_protocol)),  # type: ignore
        )
        if task.next_run_time:
            pipe.zadd(
                self.run_times_key,
                {task.id: datetime_to_utc_timestamp(task.next_run_time)},
            )
        else:
            pipe.zrem(self.run_times_key, task.id)

        pipe.execute()

delete_task

delete_task(task_id)
Source code in asyncz/stores/redis.py
def delete_task(self, task_id: str) -> None:
    if not self.redis.hexists(self.tasks_key, task_id):
        raise TaskLookupError(task_id)

    with self.redis.pipeline() as pipe:
        pipe.hdel(self.tasks_key, task_id)
        pipe.zrem(self.run_times_key, task_id)
        pipe.execute()

remove_all_tasks

remove_all_tasks()
Source code in asyncz/stores/redis.py
def remove_all_tasks(self) -> None:
    with self.redis.pipeline() as pipe:
        pipe.delete(self.tasks_key)
        pipe.delete(self.run_times_key)
        pipe.execute()

shutdown

shutdown()
Source code in asyncz/stores/redis.py
def shutdown(self) -> None:
    self.redis.connection_pool.disconnect()
    super().shutdown()

start

start(scheduler, alias)

Called by the scheduler when the scheduler is being started or when the task store is being added to an already running scheduler.

PARAMETER DESCRIPTION
scheduler

The scheduler that is starting this task store.

TYPE: SchedulerType

alias

Alias of this task store as it was assigned to the scheduler.

TYPE: str

Source code in asyncz/stores/base.py
def start(self, scheduler: SchedulerType, alias: str) -> None:
    """
    Called by the scheduler when the scheduler is being started or when the task store is being
    added to an already running scheduler.

    Args:
        scheduler: The scheduler that is starting this task store.
        alias: Alias of this task store as it was assigned to the scheduler.
    """
    self.scheduler = scheduler
    self.alias = alias
    self.logger_name = f"asyncz.stores.{alias}"
    self.lock = self.create_lock()
    encryption_key = os.environ.get("ASYNCZ_STORE_ENCRYPTION_KEY")
    if encryption_key:
        # we simply use a hash. This way all kinds of tokens, lengths and co are supported
        self.encryption_key = AESCCM(hashlib.new("sha256", encryption_key.encode()).digest())

create_lock

create_lock()

Creates a lock protector.

Source code in asyncz/stores/base.py
def create_lock(self) -> LockProtectedProtocol:
    """
    Creates a lock protector.
    """
    if not self.scheduler or not self.scheduler.lock_path:
        return NullLockProtected()
    return FileLockProtected(self.scheduler.lock_path.replace(r"{store}", self.alias))

conditional_decrypt

conditional_decrypt(inp)
Source code in asyncz/stores/base.py
def conditional_decrypt(self, inp: bytes) -> bytes:
    if self.encryption_key:
        return self.encryption_key.decrypt(inp[:13], inp[13:], None)
    else:
        return inp

conditional_encrypt

conditional_encrypt(inp)
Source code in asyncz/stores/base.py
def conditional_encrypt(self, inp: bytes) -> bytes:
    if self.encryption_key:
        nonce = os.urandom(13)
        return nonce + self.encryption_key.encrypt(nonce, inp, None)
    else:
        return inp

fix_paused_tasks

fix_paused_tasks(tasks)
Source code in asyncz/stores/base.py
def fix_paused_tasks(self, tasks: list[TaskType]) -> None:
    for index, task in enumerate(tasks):
        if task.next_run_time is not None:
            if index > 0:
                paused_tasks = tasks[:index]
                del tasks[:index]
                tasks.extend(paused_tasks)
            break

asyncz.stores.sqlalchemy.SQLAlchemyStore

SQLAlchemyStore(
    database,
    tablename="asyncz_store",
    pickle_protocol=HIGHEST_PROTOCOL,
    **kwargs,
)

Bases: BaseStore

Stores tasks via sqlalchemy in a database.

Source code in asyncz/stores/sqlalchemy.py
def __init__(
    self,
    database: Union[str, sqlalchemy.Engine],
    tablename: str = "asyncz_store",
    pickle_protocol: Optional[int] = pickle.HIGHEST_PROTOCOL,
    **kwargs: Any,
) -> None:
    super().__init__(**kwargs)
    self.pickle_protocol = pickle_protocol
    if isinstance(database, str):
        database = sqlalchemy.create_engine(database, **kwargs)
    if not database:
        raise ValueError("database must not be empty or None")
    self.engine: sqlalchemy.Engine = database
    self.metadata: sqlalchemy.MetaData = sqlalchemy.MetaData()
    self.table: sqlalchemy.Table = sqlalchemy.Table(
        tablename,
        self.metadata,
        sqlalchemy.Column(
            "id", sqlalchemy.String(length=255), primary_key=True, nullable=False
        ),
        sqlalchemy.Column("next_run_time", sqlalchemy.BigInteger(), nullable=True),
        sqlalchemy.Column("state", sqlalchemy.LargeBinary(), nullable=False),
    )

pickle_protocol instance-attribute

pickle_protocol = pickle_protocol

engine instance-attribute

engine = database

metadata instance-attribute

metadata = MetaData()

table instance-attribute

table = Table(
    tablename,
    metadata,
    Column(
        "id",
        String(length=255),
        primary_key=True,
        nullable=False,
    ),
    Column("next_run_time", BigInteger(), nullable=True),
    Column("state", LargeBinary(), nullable=False),
)

alias class-attribute instance-attribute

alias = None

lock class-attribute instance-attribute

lock = NullLockProtected()

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="allow", arbitrary_types_allowed=True
)

scheduler instance-attribute

scheduler = None

encryption_key instance-attribute

encryption_key = None

start

start(scheduler, alias)

When starting omits from the index any documents that lack next_run_time field.

Source code in asyncz/stores/sqlalchemy.py
def start(self, scheduler: Any, alias: str) -> None:
    """
    When starting omits from the index any documents that lack next_run_time field.
    """
    super().start(scheduler, alias)
    self.metadata.create_all(self.engine)

shutdown

shutdown()
Source code in asyncz/stores/sqlalchemy.py
def shutdown(self) -> None:
    self.engine.dispose()
    super().shutdown()

lookup_task

lookup_task(task_id)
Source code in asyncz/stores/sqlalchemy.py
def lookup_task(self, task_id: str) -> Optional[TaskType]:
    tasks = self.get_tasks(self.table.c.id == task_id, limit=1)
    return tasks[0] if tasks else None

rebuild_task

rebuild_task(state)
Source code in asyncz/stores/sqlalchemy.py
def rebuild_task(self, state: Any) -> TaskType:
    state = pickle.loads(self.conditional_decrypt(state))
    task = Task.__new__(Task)
    task.__setstate__(state)
    task.scheduler = cast("SchedulerType", self.scheduler)
    task.store_alias = self.alias
    return task

get_due_tasks

get_due_tasks(now)
Source code in asyncz/stores/sqlalchemy.py
def get_due_tasks(self, now: datetime) -> list[TaskType]:
    timestamp = datetime_to_utc_timestamp(now)
    return self.get_tasks(self.table.c.next_run_time <= timestamp)

get_tasks

get_tasks(conditions=None, limit=0)
Source code in asyncz/stores/sqlalchemy.py
def get_tasks(self, conditions: Any = None, limit: int = 0) -> list[TaskType]:
    tasks: list[TaskType] = []
    failed_task_ids = []
    stmt = self.table.select().order_by(self.table.c.next_run_time.asc().nullslast())
    if conditions is not None:
        stmt = stmt.where(conditions)

    if limit > 0:
        stmt = stmt.limit(limit)

    with self.engine.connect() as conn:
        for row in conn.execute(stmt):
            try:
                tasks.append(self.rebuild_task(row.state))
            except Exception:
                task_id = row.id
                cast("SchedulerType", self.scheduler).loggers[self.logger_name].exception(
                    f"Unable to restore task '{task_id}'. Removing it..."
                )
                failed_task_ids.append(task_id)

        if failed_task_ids:
            stmt2 = self.table.delete().where(self.table.c.id.in_(failed_task_ids))
            conn.execute(stmt2)
            conn.commit()
    return tasks

get_next_run_time

get_next_run_time()
Source code in asyncz/stores/sqlalchemy.py
def get_next_run_time(self) -> Optional[datetime]:
    stmt = (
        sqlalchemy.select(self.table.c["next_run_time"])
        .where(self.table.c.next_run_time != None)  #  noqa: E711  other meaning than is not None
        .order_by(self.table.c.next_run_time.asc())
    )

    with self.engine.connect() as conn:
        row = conn.execute(stmt).first()

        return utc_timestamp_to_datetime(row.next_run_time) if row else None

get_all_tasks

get_all_tasks()
Source code in asyncz/stores/sqlalchemy.py
def get_all_tasks(self) -> list[TaskType]:
    return self.get_tasks()

add_task

add_task(task)
Source code in asyncz/stores/sqlalchemy.py
def add_task(self, task: TaskType) -> None:
    data = {
        "id": task.id,
        "next_run_time": datetime_to_utc_timestamp(task.next_run_time or None),
        "state": self.conditional_encrypt(
            pickle.dumps(task.__getstate__(), self.pickle_protocol)
        ),
    }
    try:
        with self.engine.begin() as conn:
            conn.execute(self.table.insert().values(**data))
    except IntegrityError:
        raise ConflictIdError(task.id) from None

update_task

update_task(task)
Source code in asyncz/stores/sqlalchemy.py
def update_task(self, task: TaskType) -> None:
    updates = {
        "next_run_time": datetime_to_utc_timestamp(task.next_run_time or None),
        "state": self.conditional_encrypt(
            pickle.dumps(task.__getstate__(), self.pickle_protocol)
        ),
    }
    success = True

    with self.engine.begin() as conn:
        result = conn.execute(
            self.table.update().values(**updates).where(self.table.c.id == task.id)
        )
        if result and result.rowcount == 0:
            success = False
    if not success:
        raise TaskLookupError(task.id)

delete_task

delete_task(task_id)
Source code in asyncz/stores/sqlalchemy.py
def delete_task(self, task_id: str) -> None:
    success = True
    with self.engine.begin() as conn:
        result = conn.execute(self.table.delete().where(self.table.c.id == task_id))
        if result and result.rowcount == 0:
            success = False
    if not success:
        raise TaskLookupError(task_id)

remove_all_tasks

remove_all_tasks()
Source code in asyncz/stores/sqlalchemy.py
def remove_all_tasks(self) -> None:
    with self.engine.begin() as conn:
        conn.execute(self.table.delete())

create_lock

create_lock()

Creates a lock protector.

Source code in asyncz/stores/base.py
def create_lock(self) -> LockProtectedProtocol:
    """
    Creates a lock protector.
    """
    if not self.scheduler or not self.scheduler.lock_path:
        return NullLockProtected()
    return FileLockProtected(self.scheduler.lock_path.replace(r"{store}", self.alias))

conditional_decrypt

conditional_decrypt(inp)
Source code in asyncz/stores/base.py
def conditional_decrypt(self, inp: bytes) -> bytes:
    if self.encryption_key:
        return self.encryption_key.decrypt(inp[:13], inp[13:], None)
    else:
        return inp

conditional_encrypt

conditional_encrypt(inp)
Source code in asyncz/stores/base.py
def conditional_encrypt(self, inp: bytes) -> bytes:
    if self.encryption_key:
        nonce = os.urandom(13)
        return nonce + self.encryption_key.encrypt(nonce, inp, None)
    else:
        return inp

fix_paused_tasks

fix_paused_tasks(tasks)
Source code in asyncz/stores/base.py
def fix_paused_tasks(self, tasks: list[TaskType]) -> None:
    for index, task in enumerate(tasks):
        if task.next_run_time is not None:
            if index > 0:
                paused_tasks = tasks[:index]
                del tasks[:index]
                tasks.extend(paused_tasks)
            break

Events

asyncz.events.base.SchedulerEvent

Bases: BaseModel

The event itself.

PARAMETER DESCRIPTION
code

The code type for the event

alias

The alias given to store or executor.

model_config class-attribute instance-attribute

model_config = ConfigDict(arbitrary_types_allowed=True)

code instance-attribute

code

alias class-attribute instance-attribute

alias = None

asyncz.events.base.TaskEvent

Bases: SchedulerEvent

The events for a specific task.

PARAMETER DESCRIPTION
task_id

The identifier given to a task.

store

The alias given to a store.

task_id instance-attribute

task_id

store class-attribute instance-attribute

store = None

model_config class-attribute instance-attribute

model_config = ConfigDict(arbitrary_types_allowed=True)

code instance-attribute

code

alias class-attribute instance-attribute

alias = None

asyncz.events.base.TaskSubmissionEvent

Bases: TaskEvent

Event related to the submission of a task.

PARAMETER DESCRIPTION
scheduled_run_times

List of datetimes when the task is supposed to run.

scheduled_run_times instance-attribute

scheduled_run_times

model_config class-attribute instance-attribute

model_config = ConfigDict(arbitrary_types_allowed=True)

code instance-attribute

code

alias class-attribute instance-attribute

alias = None

task_id instance-attribute

task_id

store class-attribute instance-attribute

store = None

asyncz.events.base.TaskExecutionEvent

Bases: TaskEvent

Event relared to the running of a task within the executor.

PARAMETER DESCRIPTION
scheduled_run_times

The time when the task was scheduled to be run.

return_value

The return value of the task successfully executed.

exception

The exception raised by the task.

traceback

A formated traceback for the exception.

scheduled_run_time instance-attribute

scheduled_run_time

return_value class-attribute instance-attribute

return_value = None

exception class-attribute instance-attribute

exception = None

traceback class-attribute instance-attribute

traceback = None

model_config class-attribute instance-attribute

model_config = ConfigDict(arbitrary_types_allowed=True)

code instance-attribute

code

alias class-attribute instance-attribute

alias = None

task_id instance-attribute

task_id

store class-attribute instance-attribute

store = None