Skip to content

Part 5: Testing the database

Database tests verify your schema and queries in isolation using an in-memory SQLite backend. Each test gets a fresh database, so there's no shared state between tests.

Full test file

gubbins-db/tests/test_my_pilot_db.py
from __future__ import annotations

from typing import TYPE_CHECKING

import pytest

from gubbins.db.sql.my_pilot_db.db import MyPilotDB
from gubbins.db.sql.my_pilot_db.schema import MyPilotStatus

if TYPE_CHECKING:
    from typing import AsyncGenerator


@pytest.fixture
async def my_pilot_db(tmp_path) -> AsyncGenerator[MyPilotDB, None]:
    my_pilot_db = MyPilotDB("sqlite+aiosqlite:///:memory:")
    async with my_pilot_db.engine_context():
        async with my_pilot_db.engine.begin() as conn:
            await conn.run_sync(my_pilot_db.metadata.create_all)
        yield my_pilot_db


async def test_add_and_get_ces(my_pilot_db: MyPilotDB):
    async with my_pilot_db as db:
        await db.add_ce("reliable-ce.example.org", capacity=5, success_rate=1.0)
        await db.add_ce("flaky-ce.example.org", capacity=3, success_rate=0.3)

    async with my_pilot_db as db:
        ces = await db.get_available_ces()
        assert len(ces) == 2
        names = {ce["name"] for ce in ces}
        assert names == {"reliable-ce.example.org", "flaky-ce.example.org"}


async def test_disabled_ce_not_available(my_pilot_db: MyPilotDB):
    async with my_pilot_db as db:
        await db.add_ce("disabled-ce", capacity=5, success_rate=1.0, enabled=False)

    async with my_pilot_db as db:
        ces = await db.get_available_ces()
        assert len(ces) == 0


async def test_submit_pilot(my_pilot_db: MyPilotDB):
    async with my_pilot_db as db:
        await db.add_ce("test-ce", capacity=5, success_rate=1.0)

    async with my_pilot_db as db:
        pilot_id = await db.submit_pilot("test-ce")
        assert pilot_id is not None

    async with my_pilot_db as db:
        pilots = await db.get_pilots_by_status(MyPilotStatus.SUBMITTED)
        assert len(pilots) == 1
        assert pilots[0]["ce_name"] == "test-ce"


async def test_update_pilot_status(my_pilot_db: MyPilotDB):
    async with my_pilot_db as db:
        await db.add_ce("test-ce", capacity=5, success_rate=1.0)

    async with my_pilot_db as db:
        pilot_id = await db.submit_pilot("test-ce")

    async with my_pilot_db as db:
        await db.update_pilot_status(pilot_id, MyPilotStatus.RUNNING)

    async with my_pilot_db as db:
        submitted = await db.get_pilots_by_status(MyPilotStatus.SUBMITTED)
        running = await db.get_pilots_by_status(MyPilotStatus.RUNNING)
        assert len(submitted) == 0
        assert len(running) == 1


async def test_capacity_tracking(my_pilot_db: MyPilotDB):
    async with my_pilot_db as db:
        await db.add_ce("small-ce", capacity=2, success_rate=1.0)

    # Submit 2 pilots to fill capacity
    async with my_pilot_db as db:
        await db.submit_pilot("small-ce")
        await db.submit_pilot("small-ce")

    # CE should no longer be available
    async with my_pilot_db as db:
        ces = await db.get_available_ces()
        assert len(ces) == 0

    # Complete one pilot — frees a slot
    async with my_pilot_db as db:
        pilots = await db.get_pilots_by_status(MyPilotStatus.SUBMITTED)
        await db.update_pilot_status(pilots[0]["pilot_id"], MyPilotStatus.DONE)

    async with my_pilot_db as db:
        ces = await db.get_available_ces()
        assert len(ces) == 1
        assert ces[0]["available_slots"] == 1


async def test_get_ce_success_rate(my_pilot_db: MyPilotDB):
    async with my_pilot_db as db:
        await db.add_ce("test-ce", capacity=5, success_rate=0.75)

    async with my_pilot_db as db:
        rate = await db.get_ce_success_rate("test-ce")
        assert rate == 0.75


async def test_pilot_summary(my_pilot_db: MyPilotDB):
    async with my_pilot_db as db:
        await db.add_ce("test-ce", capacity=10, success_rate=1.0)

    async with my_pilot_db as db:
        await db.submit_pilot("test-ce")
        await db.submit_pilot("test-ce")
        pilot_id = await db.submit_pilot("test-ce")

    async with my_pilot_db as db:
        await db.update_pilot_status(pilot_id, MyPilotStatus.RUNNING)

    async with my_pilot_db as db:
        summary = await db.get_pilot_summary()
        assert summary[MyPilotStatus.SUBMITTED] == 2
        assert summary[MyPilotStatus.RUNNING] == 1

The fixture

The my_pilot_db fixture creates a throwaway database for each test:

  • In-memory SQLite — MyPilotDB("sqlite+aiosqlite:///:memory:") avoids touching the filesystem. Each test starts clean.
  • Engine lifecycle — engine_context() manages the async engine. Inside it, engine.begin() opens a connection and metadata.create_all creates the tables from your schema.
  • yield — The fixture yields the DB instance to the test. When the test finishes, the context manager tears down the engine.

This pattern is common across DiracX database tests — you'll see the same structure whenever a test needs a real database connection.

Basic CRUD

test_add_and_get_ces is the simplest test: insert two CEs, then query them back. The two-async with db: pattern is important — the insert happens in one transaction, the query in another. This verifies that data was actually committed, not just visible within the same transaction.

test_disabled_ce_not_available checks that enabled=False excludes a CE from get_available_ces(). This tests the filter clause in the query without needing to set up pilots or capacity.

Each async with db: is a separate transaction

This mirrors how production code works: each HTTP request or task execution gets its own transaction. If you insert and query in the same async with db: block, you're only testing that the data is visible within a single transaction — not that it was committed.

Pilot lifecycle

test_submit_pilot verifies the insert-and-query flow: add a CE, submit a pilot, then check that get_pilots_by_status(SUBMITTED) returns it. Three separate transactions ensure each operation commits independently.

test_update_pilot_status tests status transitions: submit a pilot, move it to RUNNING, then verify that querying SUBMITTED returns nothing and RUNNING returns one result. This exercises the UPDATE query and confirms the status column is actually changing.

Capacity tracking

test_capacity_tracking is the most interesting test. It exercises the subquery from get_available_ces() end-to-end:

  1. Add a CE with capacity=2
  2. Submit 2 pilots — CE should no longer appear in get_available_ces() (capacity full)
  3. Complete one pilot (DONE) — the CE reappears with available_slots == 1

This verifies the coalesce/subquery pattern that counts active pilots and subtracts them from capacity. It also confirms that terminal states (DONE, FAILED) don't count against capacity.

Aggregates

test_get_ce_success_rate is a simple lookup — add a CE with success_rate=0.75, then verify it reads back correctly.

test_pilot_summary tests the grouping query: submit three pilots, transition one to RUNNING, then check that the summary returns {SUBMITTED: 2, RUNNING: 1}. This exercises GROUP BY with the status enum.

Run the tests

pixi run pytest-gubbins-db -- -k my_pilot