Skip to content

Part 2b: Logic

Business logic lives in the logic layer, separate from both the database (which handles persistence) and the tasks (which handle orchestration and scheduling). This separation means the same logic can be called from tasks, routers, or tests without coupling to any specific execution context.

Why a separate logic layer?

DiracX has three layers that can contain "code that does things": routers, tasks, and logic. Each has a distinct role:

Layer Owns Should NOT contain
Router HTTP concerns: authentication, serialisation, request/response mapping Domain rules, DB orchestration
Task Scheduling, retries, locks, child spawning Domain rules, DB orchestration
Logic Domain rules: validations, state machines, multi-step DB orchestration HTTP or scheduling concerns

The key insight is that the same business operation often needs to run from multiple entry points. For example, submitting a pilot might be triggered by a periodic task and by an HTTP endpoint for manual use. If the submission logic lives inside a task's execute method, the router must either duplicate it or awkwardly call the task synchronously. Putting it in the logic layer makes it callable from both without coupling.

In practice, task execute methods should be thin orchestration wrappers — they know when and how often to run, but delegate the what to the logic layer. Similarly, router endpoints handle HTTP concerns and delegate to logic for the actual work.

Implementation

Create the pilot business logic module:

gubbins-logic/src/gubbins/logic/my_pilots.py
from __future__ import annotations

import random

from gubbins.db.sql.my_pilot_db.db import MyPilotDB
from gubbins.db.sql.my_pilot_db.schema import MyPilotStatus


class PilotSubmissionError(Exception):
    def __init__(self, ce_name: str, success_rate: float):
        super().__init__(
            f"Pilot submission to {ce_name} failed (success_rate={success_rate})"
        )


async def submit_pilot(my_pilot_db: MyPilotDB, ce_name: str) -> int:
    """Submit a single pilot to a CE, raising on probabilistic failure."""
    success_rate = await my_pilot_db.get_ce_success_rate(ce_name)
    if random.random() >= success_rate:
        raise PilotSubmissionError(ce_name, success_rate)
    return await my_pilot_db.submit_pilot(ce_name)


async def transition_pilot_states(my_pilot_db: MyPilotDB) -> None:
    """Move pilots through their lifecycle based on CE success rates."""
    submitted = await my_pilot_db.get_pilots_by_status(MyPilotStatus.SUBMITTED)
    for pilot in submitted:
        await my_pilot_db.update_pilot_status(pilot["pilot_id"], MyPilotStatus.RUNNING)

    running = await my_pilot_db.get_pilots_by_status(MyPilotStatus.RUNNING)
    for pilot in running:
        success_rate = await my_pilot_db.get_ce_success_rate(pilot["ce_name"])
        new_status = (
            MyPilotStatus.DONE
            if random.random() < success_rate
            else MyPilotStatus.FAILED
        )
        await my_pilot_db.update_pilot_status(pilot["pilot_id"], new_status)


async def get_available_ces(my_pilot_db: MyPilotDB) -> list[dict]:
    """Return CEs with available pilot slots."""
    return await my_pilot_db.get_available_ces()


async def get_pilot_summary(my_pilot_db: MyPilotDB) -> dict[str, int]:
    """Return aggregate pilot statistics across all VOs."""
    return await my_pilot_db.get_pilot_summary()

Key points:

  • Pure async functions — Each function takes a database instance and any required parameters. No framework dependencies, no scheduling concerns.
  • Custom exception — PilotSubmissionError replaces a generic RuntimeError, making error handling more precise in callers.
  • DB type hints — Functions use the raw MyPilotDB class (not the DI-annotated type from depends.py). At runtime, the injected instance is the same object either way.

Update dependencies

Add gubbins-db to gubbins-logic's dependencies so the import of MyPilotDB resolves:

gubbins-logic/pyproject.toml
dependencies = [
    "diracx-logic",
    "gubbins-db",
]

Checkpoint

At this point the logic module is a standalone library with no task or router dependencies. In the next part we'll wire it into the task layer.