Tasks reference
Lock types
All locks are Redis-backed with ownership tracking. Each lock instance generates a random owner ID so that release operations verify ownership, preventing one worker from accidentally releasing another's lock.
Locks are split into two categories:
- Structural locks (
MutexLock,ExclusiveRWLock,SharedRWLock) — always acquired, including in interactive mode (diracx-task-run call). - Limiters (
RateLimiter,ConcurrencyLimiter) — skipped in interactive mode. These are subclasses ofBaseLimiter.
MutexLock
Mutual-exclusion lock. At most one owner can hold the lock at a time.
obj:LockedObjectType— the type of object being lockedkey:str | int— identifier for the locked object*extra_keys: additional key segments (e.g. VO name)ttl_ms: lock auto-expires after this many milliseconds (safety net for crashes)- Redis key:
lock:mutex:{obj}:{key}:{extra_keys...} - Supports
extend()for the watchdog pattern
ExclusiveRWLock
Writer side of a readers-writer lock. Acquire succeeds only when there are zero readers and no existing writer.
from diracx.tasks.plumbing.locks import ExclusiveRWLock
ExclusiveRWLock(obj, key, *extra_keys, ttl_ms=30000)
- Redis key:
lock:rw:{obj}:{key}:{extra_keys...} - Pair with
SharedRWLockon the same(obj, key)to allow concurrent readers or a single exclusive writer - Supports
extend()
SharedRWLock
Reader side of a readers-writer lock. Multiple readers can hold the lock concurrently. Acquire succeeds as long as no writer holds the exclusive side.
- Redis key:
lock:rw:{obj}:{key}:{extra_keys...}(same hash asExclusiveRWLock) - No TTL — readers are tracked via an atomic counter, not a per-owner key
- Does not support
extend()
RateLimiter
Sliding-window rate limiter. Limits the number of operations within a fixed time window.
n_items: number of quota units consumed per acquire (default: 1)limit: class variable — maximum operations per window (None= disabled)window_seconds: class variable — window duration (None= disabled)- Redis key:
limiter:rate:{obj}:{key}:{extra_keys...} release()is a no-op — consumed quota is not returned- Skipped in interactive mode
ConcurrencyLimiter
Semaphore-style concurrency cap backed by a Redis sorted set.
from diracx.tasks.plumbing.locks import ConcurrencyLimiter
ConcurrencyLimiter(obj, key, *extra_keys, ttl_ms=30000)
limit: class variable — maximum concurrent holders (None= disabled)ttl_ms: per-holder expiry — crashed workers' slots are automatically reclaimed- Redis key:
limiter:conc:{obj}:{key}:{extra_keys...} - Supports
extend()to push back expiry for long-running tasks - Skipped in interactive mode
Default locks
BaseTask returns a disabled RateLimiter and ConcurrencyLimiter by default (both with limit=None). This means limits can be enabled via configuration without code changes.
PeriodicBaseTask overrides this with a MutexLock keyed by the task class name, preventing concurrent execution. PeriodicVoAwareBaseTask adds the VO name to the lock key, so each VO gets its own mutex.
Schedules
Schedules determine when periodic tasks are submitted by the scheduler. All schedules implement next_occurrence() -> datetime.
IntervalSeconds
Fixed-interval schedule.
from diracx.tasks.plumbing.schedules import IntervalSeconds
IntervalSeconds(seconds=3600) # every hour
CronSchedule
Cron-expression schedule using croniter.
from diracx.tasks.plumbing.schedules import CronSchedule
CronSchedule("0 6 * * *") # daily at 06:00
CronSchedule("*/15 * * * *") # every 15 minutes
CronSchedule("0 0 * * 0") # weekly on Sunday at midnight
RRuleSchedule
RFC 2445 recurrence rule using dateutil.
from diracx.tasks.plumbing.schedules import RRuleSchedule
RRuleSchedule("FREQ=WEEKLY;BYDAY=FR") # every Friday
RRuleSchedule("FREQ=HOURLY;INTERVAL=2") # every 2 hours
RRuleSchedule("FREQ=MONTHLY;BYDAY=-1FR") # last Friday of each month
Raises ValueError if the rule has no future occurrences.
Retry policies
Retry policies determine whether and when a failed task is retried. They are set as class variables on task classes.
NoRetry
Never retries. This is the default.
from diracx.tasks.plumbing.retry_policies import NoRetry
class MyTask(BaseTask):
retry_policy = NoRetry()
ExponentialBackoff
Retries with exponentially increasing delays: delay = base_delay_seconds * 2^attempt.
from diracx.tasks.plumbing.retry_policies import ExponentialBackoff
class MyTask(BaseTask):
retry_policy = ExponentialBackoff(base_delay_seconds=10, max_retries=5)
base_delay_seconds: initial delay (default: 10)max_retries: maximum number of retry attempts (default: 5)- After
max_retriesis reached, the task is either dropped or sent to the dead-letter queue (ifdlq_eligible = True)