Part 3: Tasks
Now we implement the four tasks from our design. Each task is a thin orchestration wrapper that delegates business logic to the logic layer we built in the previous part. We'll build them progressively, from simplest to most complex.
Custom lock type
Before writing any tasks, we need to register a domain-specific lock type. Locks prevent concurrent execution of conflicting operations — for example, two tasks trying to submit a pilot to the same CE simultaneously.
"""Custom locked-object types for my_pilot tasks.
Loaded via the ``diracx.lock_object_types`` entry point so that
``LockedObjectType("my_pilot")`` is valid system-wide.
"""
from __future__ import annotations
from diracx.tasks.plumbing.lock_registry import register_locked_object_type
MY_PILOT = register_locked_object_type("my_pilot")
register_locked_object_type adds "my_pilot" to the global lock
registry. This lets you create locks like
MutexLock(LockedObjectType(MY_PILOT), "some-ce"), where the first
argument identifies the type of thing being locked and the second
identifies which instance.
Imports
The tasks module pulls from several parts of the DiracX task framework. Start with the imports — we'll reference these as we build each task:
from __future__ import annotations
import dataclasses
import logging
from typing import Any
from diracx.tasks.plumbing.base_task import (
BaseTask,
PeriodicBaseTask,
PeriodicVoAwareBaseTask,
)
from diracx.tasks.plumbing.enums import Priority, Size
from diracx.tasks.plumbing.lock_registry import LockedObjectType
from diracx.tasks.plumbing.locks import BaseLock, MutexLock
from diracx.tasks.plumbing.retry_policies import NoRetry
from diracx.tasks.plumbing.schedules import CronSchedule, IntervalSeconds
from gubbins.logic.my_pilots import (
get_available_ces,
get_pilot_summary,
submit_pilot,
transition_pilot_states,
)
from .depends import MyPilotDB
from .my_pilot_lock_types import MY_PILOT
logger = logging.getLogger(__name__)
The key imports to note:
BaseTask— For one-shot tasks triggered on demandPeriodicBaseTask— For tasks that run on a schedule (not VO-aware)PeriodicVoAwareBaseTask— For tasks that run per-VO on a schedulePriority/Size— Task metadata that helps the broker allocate resourcesNoRetry— A retry policy that skips retries entirelyCronSchedule/IntervalSeconds— Schedule types for periodic tasksgubbins.logic.my_pilots— Business logic functions from the logic layer. Tasks delegate to these rather than implementing logic inline.MyPilotDB(fromdepends) — The dependency-injected database type from Part 2
MyPilotTask — one-shot submission
The simplest task. It receives a CE name and delegates to
submit_pilot() from the logic layer, which checks the CE's success
rate and either submits a pilot or raises PilotSubmissionError.
@dataclasses.dataclass
class MyPilotTask(BaseTask):
"""Submit a single pilot to a compute element.
Delegates to ``gubbins.logic.my_pilots.submit_pilot`` which reads
the CE's success_rate and determines whether the submission
succeeds. No retry is configured — the periodic parent will
naturally resubmit on the next cycle.
"""
ce_name: str
priority = Priority.NORMAL
size = Size.SMALL
retry_policy = NoRetry()
dlq_eligible = False
@property
def execution_locks(self) -> list[BaseLock]:
return [MutexLock(LockedObjectType(MY_PILOT), self.ce_name)]
async def execute( # type: ignore[override]
self, my_pilot_db: MyPilotDB, **kwargs: Any
) -> int:
pilot_id = await submit_pilot(my_pilot_db, self.ce_name)
logger.info("Submitted pilot %d to %s", pilot_id, self.ce_name)
return pilot_id
Several things to notice:
- Dataclass pattern —
BaseTaskis a dataclass. Fields likece_namebecome the task's payload, serialised when the task is enqueued and deserialised when it executes. - Custom locks —
execution_locksreturns aMutexLockkeyed on the CE name. This means twoMyPilotTaskinstances for the same CE will be serialised, but submissions to different CEs can run in parallel. dlq_eligible = False— Failed pilot submissions are simply discarded. Pilots are ephemeral — the periodic parent will spawn replacements on the next cycle. The DLQ is reserved for tasks that correspond to external state which must always be recovered (e.g. failing to optimise a job).
Why NoRetry instead of ExponentialBackoff?
The periodic parent (MySubmitPilotsTask) already re-evaluates
available CEs on every cycle. If a submission fails, the parent will
discover the slot is still available and spawn a new task. Retrying at
the child level would duplicate this logic and could cause cascading
retries under sustained failure.
MyPilotReportTask — periodic, non-VO-aware
A simple periodic task that logs aggregate statistics across all VOs.
class MyPilotReportTask(PeriodicBaseTask):
"""Log global pilot statistics across all VOs.
Runs hourly via a CronSchedule. Not VO-aware — reports
aggregate counts.
"""
default_schedule = CronSchedule("0 * * * *")
async def execute( # type: ignore[override]
self, my_pilot_db: MyPilotDB, **kwargs: Any
) -> dict[str, int]:
summary = await get_pilot_summary(my_pilot_db)
logger.info("Pilot summary: %s", summary)
return summary
CronSchedule("0 * * * *")— Runs at the top of every hour, using standard cron syntax.- Non-VO-aware — Inherits from
PeriodicBaseTask(notPeriodicVoAwareBaseTask), so only one instance runs globally. This makes sense for aggregate reporting. - Default mutex — With no custom
execution_locks, the base class applies a class-level mutex, ensuring only one report runs at a time.
MyCheckPilotsTask — VO-aware periodic
This task transitions pilot states. It runs per-VO, so each VO's pilots are checked independently.
@dataclasses.dataclass
class MyCheckPilotsTask(PeriodicVoAwareBaseTask):
"""Periodically check and transition pilot states.
Delegates to ``gubbins.logic.my_pilots.transition_pilot_states``
which queries pilots in SUBMITTED/RUNNING state and
probabilistically transitions them based on the CE's success_rate.
"""
vo: str
default_schedule = IntervalSeconds(30)
async def execute( # type: ignore[override]
self, my_pilot_db: MyPilotDB, **kwargs: Any
) -> None:
await transition_pilot_states(my_pilot_db)
vo: strfield —PeriodicVoAwareBaseTaskis a dataclass with avofield. The scheduler creates one instance per configured VO, each with itsvoset automatically.IntervalSeconds(30)— Runs every 30 seconds per VO (as opposed toCronSchedulewhich uses wall-clock times).- Delegates to logic — Calls
transition_pilot_states()which movesSUBMITTED → RUNNINGimmediately, then probabilistically transitionsRUNNING → DONEorRUNNING → FAILEDbased on the CE's success rate.
MySubmitPilotsTask — VO-aware periodic, spawns children
The most complex task. It queries for CEs with available capacity and
spawns one-shot MyPilotTask instances for each open slot.
@dataclasses.dataclass
class MySubmitPilotsTask(PeriodicVoAwareBaseTask):
"""Periodically submit pilots to available compute elements.
Queries for CEs with available capacity and spawns a
``MyPilotTask`` for each available slot.
"""
vo: str
default_schedule = IntervalSeconds(60)
async def execute( # type: ignore[override]
self, my_pilot_db: MyPilotDB, **kwargs: Any
) -> int:
available_ces = await get_available_ces(my_pilot_db)
spawned = 0
for ce in available_ces:
for _ in range(ce["available_slots"]):
task = MyPilotTask(ce_name=ce["name"])
await task.schedule()
spawned += 1
logger.info("Spawned MyPilotTask for %s", ce["name"])
logger.info("VO %s: spawned %d pilot tasks", self.vo, spawned)
return spawned
This demonstrates the parent-child task pattern — a periodic task that dynamically creates one-shot tasks. The parent decides what to do (which CEs need pilots), and each child handles one submission.
Broker context required for schedule()
await task.schedule() enqueues the task onto the broker. This only
works when the current task is running inside a broker context (i.e.,
executed by the task worker). If you call schedule() outside a broker
context (e.g., in a test or script), it will raise an error. For
testing, mock the schedule method or use the task CLI for direct
execution.
Register entry points
Finally, register the tasks and lock type so DiracX discovers them at startup.
Task entry points go under
[project.entry-points."diracx.tasks.<group>"], where <group>
organises related tasks:
[project.entry-points."diracx.tasks.my_pilots"]
MySubmitPilotsTask = "gubbins.tasks.my_pilots:MySubmitPilotsTask"
MyPilotTask = "gubbins.tasks.my_pilots:MyPilotTask"
MyCheckPilotsTask = "gubbins.tasks.my_pilots:MyCheckPilotsTask"
MyPilotReportTask = "gubbins.tasks.my_pilots:MyPilotReportTask"
The entry point names (e.g. MyPilotTask) become the task's identifier
in the broker. The dotted path points to the class.
Lock type entry point (add under [project.entry-points."diracx.lock_object_types"]):
For more details on the entry point conventions, see Add a task and the Tasks explanation.
Checkpoint
Verify the tasks are correctly defined: