Skip to content

Part 6: Testing the tasks

Task tests verify the logic, metadata, and locking behaviour of each task type. The database is mocked — its own tests (Part 5) cover the query layer, so here we focus on what the task does with its dependencies.

Full test file

gubbins-tasks/tests/test_my_pilot_tasks.py
"""Tests for my_pilot task definitions."""

from __future__ import annotations

from unittest.mock import AsyncMock

import pytest
from diracx.tasks.plumbing.enums import Priority, Size
from diracx.tasks.plumbing.lock_registry import LockedObjectType
from diracx.tasks.plumbing.locks import MutexLock
from diracx.tasks.plumbing.retry_policies import NoRetry

from gubbins.logic.my_pilots import PilotSubmissionError
from gubbins.tasks.my_pilot_lock_types import MY_PILOT
from gubbins.tasks.my_pilots import (
    MyCheckPilotsTask,
    MyPilotReportTask,
    MyPilotTask,
    MySubmitPilotsTask,
)

# ---------------------------------------------------------------------------
# Lock type registration
# ---------------------------------------------------------------------------


def test_my_pilot_lock_type_registered():
    """The MY_PILOT lock type should be usable in LockedObjectType."""
    obj = LockedObjectType(MY_PILOT)
    assert obj == "my_pilot"


# ---------------------------------------------------------------------------
# MyPilotTask
# ---------------------------------------------------------------------------


def test_my_pilot_task_serialize():
    task = MyPilotTask(ce_name="reliable-ce.example.org")
    assert task.serialize() == ("reliable-ce.example.org",)


def test_my_pilot_task_properties():
    assert MyPilotTask.priority == Priority.NORMAL
    assert MyPilotTask.size == Size.SMALL
    assert isinstance(MyPilotTask.retry_policy, NoRetry)
    assert MyPilotTask.dlq_eligible is False


def test_my_pilot_task_locks():
    task = MyPilotTask(ce_name="reliable-ce.example.org")
    locks = task.execution_locks
    assert len(locks) == 1
    assert isinstance(locks[0], MutexLock)
    assert "my_pilot" in locks[0].redis_key
    assert "reliable-ce.example.org" in locks[0].redis_key


def test_my_pilot_task_different_ces_different_locks():
    lock_a = MyPilotTask(ce_name="ce-a").execution_locks[0]
    lock_b = MyPilotTask(ce_name="ce-b").execution_locks[0]
    assert lock_a.redis_key != lock_b.redis_key


async def test_my_pilot_task_execute_success():
    """With success_rate=1.0, submission always succeeds."""
    task = MyPilotTask(ce_name="reliable-ce.example.org")
    mock_db = AsyncMock()
    mock_db.get_ce_success_rate = AsyncMock(return_value=1.0)
    mock_db.submit_pilot = AsyncMock(return_value=42)

    result = await task.execute(my_pilot_db=mock_db)

    assert result == 42
    mock_db.submit_pilot.assert_called_once_with("reliable-ce.example.org")


async def test_my_pilot_task_execute_failure():
    """With success_rate=0.0, submission always fails."""
    task = MyPilotTask(ce_name="flaky-ce.example.org")
    mock_db = AsyncMock()
    mock_db.get_ce_success_rate = AsyncMock(return_value=0.0)

    with pytest.raises(PilotSubmissionError, match="failed"):
        await task.execute(my_pilot_db=mock_db)

    mock_db.submit_pilot.assert_not_called()


# ---------------------------------------------------------------------------
# MyPilotReportTask
# ---------------------------------------------------------------------------


def test_my_pilot_report_task_schedule():
    assert MyPilotReportTask.default_schedule.expression == "0 * * * *"


def test_my_pilot_report_task_locks():
    task = MyPilotReportTask()
    locks = task.execution_locks
    assert len(locks) == 1
    assert isinstance(locks[0], MutexLock)


async def test_my_pilot_report_task_execute():
    task = MyPilotReportTask()
    mock_db = AsyncMock()
    expected = {"submitted": 5, "running": 3}
    mock_db.get_pilot_summary = AsyncMock(return_value=expected)

    result = await task.execute(my_pilot_db=mock_db)

    assert result == expected
    mock_db.get_pilot_summary.assert_called_once()


# ---------------------------------------------------------------------------
# MyCheckPilotsTask (VO-aware periodic)
# ---------------------------------------------------------------------------


def test_my_check_pilots_task_schedule():
    assert MyCheckPilotsTask.default_schedule.seconds == 30


def test_my_check_pilots_task_serialize():
    task = MyCheckPilotsTask(vo="lhcb")
    assert task.serialize() == ("lhcb",)


def test_my_check_pilots_task_locks_include_vo():
    task = MyCheckPilotsTask(vo="lhcb")
    locks = task.execution_locks
    assert len(locks) == 1
    assert isinstance(locks[0], MutexLock)
    assert "lhcb" in locks[0].redis_key


def test_my_check_pilots_task_different_vos_different_locks():
    lock_a = MyCheckPilotsTask(vo="lhcb").execution_locks[0]
    lock_b = MyCheckPilotsTask(vo="atlas").execution_locks[0]
    assert lock_a.redis_key != lock_b.redis_key


# ---------------------------------------------------------------------------
# MySubmitPilotsTask (VO-aware periodic, spawns children)
# ---------------------------------------------------------------------------


def test_my_submit_pilots_task_schedule():
    assert MySubmitPilotsTask.default_schedule.seconds == 60


def test_my_submit_pilots_task_serialize():
    task = MySubmitPilotsTask(vo="lhcb")
    assert task.serialize() == ("lhcb",)


def test_my_submit_pilots_task_locks_include_vo():
    task = MySubmitPilotsTask(vo="lhcb")
    locks = task.execution_locks
    assert len(locks) == 1
    assert isinstance(locks[0], MutexLock)
    assert "lhcb" in locks[0].redis_key

Why mock the database?

Task tests use AsyncMock() for the database instead of a real connection. This is a deliberate choice: the database layer has its own tests (Part 5), so task tests focus purely on the task's logic — branching, error handling, and which DB methods get called. This keeps the tests fast and the failure messages precise.

Lock type registration

test_my_pilot_lock_type_registered verifies that the MY_PILOT constant was registered via register_locked_object_type and can be used to construct a LockedObjectType. If the entry point is missing or the string doesn't match, this test catches it immediately.

MyPilotTask — one-shot submission

Six tests cover the one-shot task progressively:

Serialization and properties

test_my_pilot_task_serialize checks that the task round-trips through serialize() — this is what the broker stores when the task is enqueued.

test_my_pilot_task_properties verifies the class-level metadata: Priority.NORMAL, Size.SMALL, NoRetry policy, and dlq_eligible=False. These control how the broker prioritises, routes, and handles failures for this task.

Locks

test_my_pilot_task_locks verifies that a task for "reliable-ce.example.org" produces a single MutexLock whose redis_key contains both "my_pilot" (the lock type) and the CE name. This is how the broker prevents two submissions to the same CE from running simultaneously.

test_my_pilot_task_different_ces_different_locks confirms that different CE names produce different lock keys — so submissions to different CEs can run in parallel.

Execution

test_my_pilot_task_execute_success wires up an AsyncMock database with success_rate=1.0 and checks that execute() calls submit_pilot and returns the pilot ID.

test_my_pilot_task_execute_failure uses success_rate=0.0 — the random check always fails, so execute() should raise RuntimeError and never call submit_pilot.

Deterministic randomness

The tests use success_rate=1.0 and success_rate=0.0 to make random.random() comparisons deterministic. With success_rate=1.0, random.random() >= 1.0 is always False (success). With success_rate=0.0, it's always True (failure). No need to mock random — the math does the work.

MyPilotReportTask — periodic report

Three tests cover the non-VO-aware periodic task:

  • test_my_pilot_report_task_schedule — Verifies the cron expression is "0 * * * *" (top of every hour).
  • test_my_pilot_report_task_locks — Checks the default class-level mutex lock (one report at a time globally).
  • test_my_pilot_report_task_execute — Mocks the DB, calls execute(), and asserts get_pilot_summary was called and the result passed through.

MyCheckPilotsTask — VO-aware periodic

Four tests for the VO-aware state-transition task:

  • test_my_check_pilots_task_schedule — Interval is 30 seconds.
  • test_my_check_pilots_task_serialize — vo="lhcb" serializes to ("lhcb",).
  • test_my_check_pilots_task_locks_include_vo — The lock key includes the VO name, so lhcb and atlas can run concurrently.
  • test_my_check_pilots_task_different_vos_different_locks — Confirms different VOs produce different lock keys.

MySubmitPilotsTask — VO-aware periodic, spawns children

Three tests mirror the pattern above:

  • test_my_submit_pilots_task_schedule — Interval is 60 seconds.
  • test_my_submit_pilots_task_serialize — vo="lhcb" serializes correctly.
  • test_my_submit_pilots_task_locks_include_vo — Lock key includes the VO name.

These tests verify the task definition — the execution logic (spawning MyPilotTask children) requires a broker context and is tested at the integration level.

Run the tests

pixi run pytest-gubbins-tasks -- -k my_pilot