Part 2b: Logic
Business logic lives in the logic layer, separate from both the database (which handles persistence) and the tasks (which handle orchestration and scheduling). This separation means the same logic can be called from tasks, routers, or tests without coupling to any specific execution context.
Why a separate logic layer?
DiracX has three layers that can contain "code that does things": routers, tasks, and logic. Each has a distinct role:
| Layer | Owns | Should NOT contain |
|---|---|---|
| Router | HTTP concerns: authentication, serialisation, request/response mapping | Domain rules, DB orchestration |
| Task | Scheduling, retries, locks, child spawning | Domain rules, DB orchestration |
| Logic | Domain rules: validations, state machines, multi-step DB orchestration | HTTP or scheduling concerns |
The key insight is that the same business operation often needs to
run from multiple entry points. For example, submitting a pilot
might be triggered by a periodic task and by an HTTP endpoint for
manual use. If the submission logic lives inside a task's execute
method, the router must either duplicate it or awkwardly call the task
synchronously. Putting it in the logic layer makes it callable from
both without coupling.
In practice, task execute methods should be thin orchestration
wrappers — they know when and how often to run, but delegate the
what to the logic layer. Similarly, router endpoints handle HTTP
concerns and delegate to logic for the actual work.
Implementation
Create the pilot business logic module:
from __future__ import annotations
import random
from gubbins.db.sql.my_pilot_db.db import MyPilotDB
from gubbins.db.sql.my_pilot_db.schema import MyPilotStatus
class PilotSubmissionError(Exception):
def __init__(self, ce_name: str, success_rate: float):
super().__init__(
f"Pilot submission to {ce_name} failed (success_rate={success_rate})"
)
async def submit_pilot(my_pilot_db: MyPilotDB, ce_name: str) -> int:
"""Submit a single pilot to a CE, raising on probabilistic failure."""
success_rate = await my_pilot_db.get_ce_success_rate(ce_name)
if random.random() >= success_rate:
raise PilotSubmissionError(ce_name, success_rate)
return await my_pilot_db.submit_pilot(ce_name)
async def transition_pilot_states(my_pilot_db: MyPilotDB) -> None:
"""Move pilots through their lifecycle based on CE success rates."""
submitted = await my_pilot_db.get_pilots_by_status(MyPilotStatus.SUBMITTED)
for pilot in submitted:
await my_pilot_db.update_pilot_status(pilot["pilot_id"], MyPilotStatus.RUNNING)
running = await my_pilot_db.get_pilots_by_status(MyPilotStatus.RUNNING)
for pilot in running:
success_rate = await my_pilot_db.get_ce_success_rate(pilot["ce_name"])
new_status = (
MyPilotStatus.DONE
if random.random() < success_rate
else MyPilotStatus.FAILED
)
await my_pilot_db.update_pilot_status(pilot["pilot_id"], new_status)
async def get_available_ces(my_pilot_db: MyPilotDB) -> list[dict]:
"""Return CEs with available pilot slots."""
return await my_pilot_db.get_available_ces()
async def get_pilot_summary(my_pilot_db: MyPilotDB) -> dict[str, int]:
"""Return aggregate pilot statistics across all VOs."""
return await my_pilot_db.get_pilot_summary()
Key points:
- Pure async functions — Each function takes a database instance and any required parameters. No framework dependencies, no scheduling concerns.
- Custom exception —
PilotSubmissionErrorreplaces a genericRuntimeError, making error handling more precise in callers. - DB type hints — Functions use the raw
MyPilotDBclass (not the DI-annotated type fromdepends.py). At runtime, the injected instance is the same object either way.
Update dependencies
Add gubbins-db to gubbins-logic's dependencies so the import of
MyPilotDB resolves:
Checkpoint
At this point the logic module is a standalone library with no task or router dependencies. In the next part we'll wire it into the task layer.