Build plugins that run background tasks on a schedule using ScheduledTaskHooks.
Overview
Scheduled tasks allow plugins to:
- Run recurring jobs (daily cleanup, hourly sync, etc.)
- Execute tasks at specific times (cron expressions)
- Handle task failures with retry logic
- Track task completion and metrics
Execution Model:
- Development: asyncio-based internal scheduler
- Production: API endpoint triggered by cron/k8s CronJob
Quick Start
from typing import TYPE_CHECKING, Any
from dvp_cms.plugins.base import Plugin
from dvp_cms.plugins.hookspec import ScheduledTaskHooks
if TYPE_CHECKING:
from dvp_cms.plugins.context import HookContext
from dvp_cms.plugins.hooks.scheduled_tasks import (
ScheduledTaskDefinition,
TaskSchedule,
TaskResult,
TaskExecutionContext,
)
class CleanupPlugin(Plugin, ScheduledTaskHooks):
"""Plugin that runs daily cleanup tasks."""
name = "cleanup"
version = "1.0.0"
def dvp_register_scheduled_tasks(
self,
ctx: "HookContext",
) -> list["ScheduledTaskDefinition"]:
"""Register tasks this plugin provides."""
from dvp_cms.plugins.hooks.scheduled_tasks import (
ScheduledTaskDefinition,
TaskSchedule,
)
return [
ScheduledTaskDefinition(
task_id="cleanup.temp-files",
name="Clean Temp Files",
schedule=TaskSchedule(cron_expression="0 3 * * *"), # 3 AM daily
description="Remove temp files older than 7 days",
max_retries=3,
timeout_seconds=600,
),
]
async def dvp_execute_scheduled_task(
self,
task: "ScheduledTaskDefinition",
ctx: "HookContext",
execution_ctx: "TaskExecutionContext",
) -> "TaskResult":
"""Execute a scheduled task."""
from dvp_cms.plugins.hooks.scheduled_tasks import TaskResult
if task.task_id == "cleanup.temp-files":
count = await self._cleanup_temp_files(ctx.tenant_id)
return TaskResult(success=True, data={"deleted": count})
return TaskResult(success=False, error_message="Unknown task")
Hooks Reference
dvp_register_scheduled_tasks
Register tasks from your plugin. Called at startup and when plugins load.
def dvp_register_scheduled_tasks(
self,
ctx: "HookContext",
) -> list["ScheduledTaskDefinition"]:
"""
Type: Collect hook
Returns: List of task definitions
"""
return [
ScheduledTaskDefinition(
task_id="my-plugin.hourly-sync",
name="Hourly Data Sync",
schedule=TaskSchedule(interval_seconds=3600),
description="Sync data from external API",
max_retries=3,
timeout_seconds=300,
),
]
Task ID Naming:
- Use plugin name prefix:
my-plugin.task-name - Must be globally unique
- Lowercase with hyphens
dvp_execute_scheduled_task
Execute task logic when triggered.
async def dvp_execute_scheduled_task(
self,
task: "ScheduledTaskDefinition",
ctx: "HookContext",
execution_ctx: "TaskExecutionContext",
) -> "TaskResult":
"""
Type: Execute hook
Returns: TaskResult with success/failure
"""
if task.task_id == "my-plugin.hourly-sync":
try:
count = await self._sync_data(ctx.tenant_id)
return TaskResult(
success=True,
data={"synced_records": count},
)
except RateLimitError as e:
return TaskResult(
success=False,
error_message=f"Rate limited: {e}",
)
return TaskResult(success=False, error_message="Unknown task")
Key Points:
- Only handle tasks YOUR plugin registered
- Return
TaskResulteven on failure (don't raise exceptions) - Use
execution_ctx.attemptfor retry-specific logic - Scheduler enforces timeout
dvp_task_failed
Control retry behavior when tasks fail.
def dvp_task_failed(
self,
ctx: "HookContext",
task: "ScheduledTaskDefinition",
error: Exception,
execution_ctx: "TaskExecutionContext",
) -> "TaskRetryDecision" | None:
"""
Type: First-wins hook
Returns: TaskRetryDecision or None for default behavior
"""
from dvp_cms.plugins.hooks.scheduled_tasks import TaskRetryDecision
# Extended backoff for rate limiting
if "rate limit" in str(error).lower():
return TaskRetryDecision(
should_retry=True,
delay_seconds=300, # Wait 5 minutes
reason="Rate limited - extending backoff",
)
# Don't retry auth errors
if isinstance(error, AuthenticationError):
return TaskRetryDecision(
should_retry=False,
abort_remaining=True,
reason="Auth failed - manual intervention needed",
)
# Use default retry behavior
return None
Scheduling Options
Cron Expression
Run at specific times using cron syntax:
TaskSchedule(cron_expression="0 3 * * *") # 3 AM daily
TaskSchedule(cron_expression="0 */4 * * *") # Every 4 hours
TaskSchedule(cron_expression="0 0 * * 0") # Sunday midnight
TaskSchedule(cron_expression="0 9 * * 1-5") # 9 AM weekdays
Cron Format: minute hour day month weekday
Interval
Run at fixed intervals:
TaskSchedule(interval_seconds=3600) # Every hour
TaskSchedule(interval_seconds=86400) # Every 24 hours
TaskSchedule(interval_seconds=300) # Every 5 minutes
Data Classes
TaskResult
@dataclass
class TaskResult:
success: bool # Did task succeed?
data: dict[str, Any] | None = None # Result data
error_message: str | None = None # Error details
duration_ms: int | None = None # Execution time
# Examples
TaskResult(success=True, data={"processed": 100})
TaskResult(success=False, error_message="API timeout")
TaskRetryDecision
@dataclass
class TaskRetryDecision:
should_retry: bool # Whether to retry
delay_seconds: int = 0 # Wait before retry
abort_remaining: bool = False # Cancel remaining retries
reason: str = "" # Explanation
# Examples
TaskRetryDecision(should_retry=True, delay_seconds=60)
TaskRetryDecision(should_retry=False, reason="Permanent failure")
Best Practices
- Use plugin prefix for task IDs - Ensures uniqueness
- Set appropriate timeouts - Prevent runaway tasks
- Return TaskResult, don't raise - Cleaner error handling
- Log task execution - Aid debugging
- Handle retries gracefully - Use execution_ctx.attempt
- Test task registration and execution - Verify behavior