Skip to content

Part 3: Tasks

Now we implement the four tasks from our design. Each task is a thin orchestration wrapper that delegates business logic to the logic layer we built in the previous part. We'll build them progressively, from simplest to most complex.

Custom lock type

Before writing any tasks, we need to register a domain-specific lock type. Locks prevent concurrent execution of conflicting operations — for example, two tasks trying to submit a pilot to the same CE simultaneously.

gubbins-tasks/src/gubbins/tasks/my_pilot_lock_types.py
"""Custom locked-object types for my_pilot tasks.

Loaded via the ``diracx.lock_object_types`` entry point so that
``LockedObjectType("my_pilot")`` is valid system-wide.
"""

from __future__ import annotations

from diracx.tasks.plumbing.lock_registry import register_locked_object_type

MY_PILOT = register_locked_object_type("my_pilot")

register_locked_object_type adds "my_pilot" to the global lock registry. This lets you create locks like MutexLock(LockedObjectType(MY_PILOT), "some-ce"), where the first argument identifies the type of thing being locked and the second identifies which instance.

Imports

The tasks module pulls from several parts of the DiracX task framework. Start with the imports — we'll reference these as we build each task:

gubbins-tasks/src/gubbins/tasks/my_pilots.py
from __future__ import annotations

import dataclasses
import logging
from typing import Any

from diracx.tasks.plumbing.base_task import (
    BaseTask,
    PeriodicBaseTask,
    PeriodicVoAwareBaseTask,
)
from diracx.tasks.plumbing.enums import Priority, Size
from diracx.tasks.plumbing.lock_registry import LockedObjectType
from diracx.tasks.plumbing.locks import BaseLock, MutexLock
from diracx.tasks.plumbing.retry_policies import NoRetry
from diracx.tasks.plumbing.schedules import CronSchedule, IntervalSeconds

from gubbins.logic.my_pilots import (
    get_available_ces,
    get_pilot_summary,
    submit_pilot,
    transition_pilot_states,
)

from .depends import MyPilotDB
from .my_pilot_lock_types import MY_PILOT

logger = logging.getLogger(__name__)

The key imports to note:

  • BaseTask — For one-shot tasks triggered on demand
  • PeriodicBaseTask — For tasks that run on a schedule (not VO-aware)
  • PeriodicVoAwareBaseTask — For tasks that run per-VO on a schedule
  • Priority / Size — Task metadata that helps the broker allocate resources
  • NoRetry — A retry policy that skips retries entirely
  • CronSchedule / IntervalSeconds — Schedule types for periodic tasks
  • gubbins.logic.my_pilots — Business logic functions from the logic layer. Tasks delegate to these rather than implementing logic inline.
  • MyPilotDB (from depends) — The dependency-injected database type from Part 2

MyPilotTask — one-shot submission

The simplest task. It receives a CE name and delegates to submit_pilot() from the logic layer, which checks the CE's success rate and either submits a pilot or raises PilotSubmissionError.

@dataclasses.dataclass
class MyPilotTask(BaseTask):
    """Submit a single pilot to a compute element.

    Delegates to ``gubbins.logic.my_pilots.submit_pilot`` which reads
    the CE's success_rate and determines whether the submission
    succeeds.  No retry is configured — the periodic parent will
    naturally resubmit on the next cycle.
    """

    ce_name: str

    priority = Priority.NORMAL
    size = Size.SMALL
    retry_policy = NoRetry()
    dlq_eligible = False

    @property
    def execution_locks(self) -> list[BaseLock]:
        return [MutexLock(LockedObjectType(MY_PILOT), self.ce_name)]

    async def execute(  # type: ignore[override]
        self, my_pilot_db: MyPilotDB, **kwargs: Any
    ) -> int:
        pilot_id = await submit_pilot(my_pilot_db, self.ce_name)
        logger.info("Submitted pilot %d to %s", pilot_id, self.ce_name)
        return pilot_id

Several things to notice:

  • Dataclass pattern — BaseTask is a dataclass. Fields like ce_name become the task's payload, serialised when the task is enqueued and deserialised when it executes.
  • Custom locks — execution_locks returns a MutexLock keyed on the CE name. This means two MyPilotTask instances for the same CE will be serialised, but submissions to different CEs can run in parallel.
  • dlq_eligible = False — Failed pilot submissions are simply discarded. Pilots are ephemeral — the periodic parent will spawn replacements on the next cycle. The DLQ is reserved for tasks that correspond to external state which must always be recovered (e.g. failing to optimise a job).

Why NoRetry instead of ExponentialBackoff?

The periodic parent (MySubmitPilotsTask) already re-evaluates available CEs on every cycle. If a submission fails, the parent will discover the slot is still available and spawn a new task. Retrying at the child level would duplicate this logic and could cause cascading retries under sustained failure.

MyPilotReportTask — periodic, non-VO-aware

A simple periodic task that logs aggregate statistics across all VOs.

class MyPilotReportTask(PeriodicBaseTask):
    """Log global pilot statistics across all VOs.

    Runs hourly via a CronSchedule.  Not VO-aware — reports
    aggregate counts.
    """

    default_schedule = CronSchedule("0 * * * *")

    async def execute(  # type: ignore[override]
        self, my_pilot_db: MyPilotDB, **kwargs: Any
    ) -> dict[str, int]:
        summary = await get_pilot_summary(my_pilot_db)
        logger.info("Pilot summary: %s", summary)
        return summary
  • CronSchedule("0 * * * *") — Runs at the top of every hour, using standard cron syntax.
  • Non-VO-aware — Inherits from PeriodicBaseTask (not PeriodicVoAwareBaseTask), so only one instance runs globally. This makes sense for aggregate reporting.
  • Default mutex — With no custom execution_locks, the base class applies a class-level mutex, ensuring only one report runs at a time.

MyCheckPilotsTask — VO-aware periodic

This task transitions pilot states. It runs per-VO, so each VO's pilots are checked independently.

@dataclasses.dataclass
class MyCheckPilotsTask(PeriodicVoAwareBaseTask):
    """Periodically check and transition pilot states.

    Delegates to ``gubbins.logic.my_pilots.transition_pilot_states``
    which queries pilots in SUBMITTED/RUNNING state and
    probabilistically transitions them based on the CE's success_rate.
    """

    vo: str

    default_schedule = IntervalSeconds(30)

    async def execute(  # type: ignore[override]
        self, my_pilot_db: MyPilotDB, **kwargs: Any
    ) -> None:
        await transition_pilot_states(my_pilot_db)
  • vo: str field — PeriodicVoAwareBaseTask is a dataclass with a vo field. The scheduler creates one instance per configured VO, each with its vo set automatically.
  • IntervalSeconds(30) — Runs every 30 seconds per VO (as opposed to CronSchedule which uses wall-clock times).
  • Delegates to logic — Calls transition_pilot_states() which moves SUBMITTED → RUNNING immediately, then probabilistically transitions RUNNING → DONE or RUNNING → FAILED based on the CE's success rate.

MySubmitPilotsTask — VO-aware periodic, spawns children

The most complex task. It queries for CEs with available capacity and spawns one-shot MyPilotTask instances for each open slot.

@dataclasses.dataclass
class MySubmitPilotsTask(PeriodicVoAwareBaseTask):
    """Periodically submit pilots to available compute elements.

    Queries for CEs with available capacity and spawns a
    ``MyPilotTask`` for each available slot.
    """

    vo: str

    default_schedule = IntervalSeconds(60)

    async def execute(  # type: ignore[override]
        self, my_pilot_db: MyPilotDB, **kwargs: Any
    ) -> int:
        available_ces = await get_available_ces(my_pilot_db)
        spawned = 0
        for ce in available_ces:
            for _ in range(ce["available_slots"]):
                task = MyPilotTask(ce_name=ce["name"])
                await task.schedule()
                spawned += 1
                logger.info("Spawned MyPilotTask for %s", ce["name"])
        logger.info("VO %s: spawned %d pilot tasks", self.vo, spawned)
        return spawned

This demonstrates the parent-child task pattern — a periodic task that dynamically creates one-shot tasks. The parent decides what to do (which CEs need pilots), and each child handles one submission.

Broker context required for schedule()

await task.schedule() enqueues the task onto the broker. This only works when the current task is running inside a broker context (i.e., executed by the task worker). If you call schedule() outside a broker context (e.g., in a test or script), it will raise an error. For testing, mock the schedule method or use the task CLI for direct execution.

Register entry points

Finally, register the tasks and lock type so DiracX discovers them at startup.

Task entry points go under [project.entry-points."diracx.tasks.<group>"], where <group> organises related tasks:

gubbins-tasks/pyproject.toml
[project.entry-points."diracx.tasks.my_pilots"]
MySubmitPilotsTask = "gubbins.tasks.my_pilots:MySubmitPilotsTask"
MyPilotTask = "gubbins.tasks.my_pilots:MyPilotTask"
MyCheckPilotsTask = "gubbins.tasks.my_pilots:MyCheckPilotsTask"
MyPilotReportTask = "gubbins.tasks.my_pilots:MyPilotReportTask"

The entry point names (e.g. MyPilotTask) become the task's identifier in the broker. The dotted path points to the class.

Lock type entry point (add under [project.entry-points."diracx.lock_object_types"]):

gubbins-tasks/pyproject.toml
my_pilot = "gubbins.tasks.my_pilot_lock_types"

For more details on the entry point conventions, see Add a task and the Tasks explanation.

Checkpoint

Verify the tasks are correctly defined:

pixi run pytest-gubbins-tasks -- -k my_pilot