Scheduled Tasks

Intermediate Background Jobs

DVP CMS is a truth distillation system for AI-generated content. Plugins are evidence suppliers—they verify facts, pull live data, and make content more trustworthy over time. Learn more →

Build plugins that run background tasks on a schedule using ScheduledTaskHooks.

Overview

Scheduled tasks allow plugins to:

Execution Model:

Quick Start

from typing import TYPE_CHECKING, Any
from dvp_cms.plugins.base import Plugin
from dvp_cms.plugins.hookspec import ScheduledTaskHooks

if TYPE_CHECKING:
    from dvp_cms.plugins.context import HookContext
    from dvp_cms.plugins.hooks.scheduled_tasks import (
        ScheduledTaskDefinition,
        TaskSchedule,
        TaskResult,
        TaskExecutionContext,
    )


class CleanupPlugin(Plugin, ScheduledTaskHooks):
    """Plugin that runs daily cleanup tasks."""

    name = "cleanup"
    version = "1.0.0"

    def dvp_register_scheduled_tasks(
        self,
        ctx: "HookContext",
    ) -> list["ScheduledTaskDefinition"]:
        """Register tasks this plugin provides."""
        from dvp_cms.plugins.hooks.scheduled_tasks import (
            ScheduledTaskDefinition,
            TaskSchedule,
        )

        return [
            ScheduledTaskDefinition(
                task_id="cleanup.temp-files",
                name="Clean Temp Files",
                schedule=TaskSchedule(cron_expression="0 3 * * *"),  # 3 AM daily
                description="Remove temp files older than 7 days",
                max_retries=3,
                timeout_seconds=600,
            ),
        ]

    async def dvp_execute_scheduled_task(
        self,
        task: "ScheduledTaskDefinition",
        ctx: "HookContext",
        execution_ctx: "TaskExecutionContext",
    ) -> "TaskResult":
        """Execute a scheduled task."""
        from dvp_cms.plugins.hooks.scheduled_tasks import TaskResult

        if task.task_id == "cleanup.temp-files":
            count = await self._cleanup_temp_files(ctx.tenant_id)
            return TaskResult(success=True, data={"deleted": count})

        return TaskResult(success=False, error_message="Unknown task")

Hooks Reference

dvp_register_scheduled_tasks

Register tasks from your plugin. Called at startup and when plugins load.

def dvp_register_scheduled_tasks(
    self,
    ctx: "HookContext",
) -> list["ScheduledTaskDefinition"]:
    """
    Type: Collect hook
    Returns: List of task definitions
    """
    return [
        ScheduledTaskDefinition(
            task_id="my-plugin.hourly-sync",
            name="Hourly Data Sync",
            schedule=TaskSchedule(interval_seconds=3600),
            description="Sync data from external API",
            max_retries=3,
            timeout_seconds=300,
        ),
    ]

Task ID Naming:

dvp_execute_scheduled_task

Execute task logic when triggered.

async def dvp_execute_scheduled_task(
    self,
    task: "ScheduledTaskDefinition",
    ctx: "HookContext",
    execution_ctx: "TaskExecutionContext",
) -> "TaskResult":
    """
    Type: Execute hook
    Returns: TaskResult with success/failure
    """
    if task.task_id == "my-plugin.hourly-sync":
        try:
            count = await self._sync_data(ctx.tenant_id)
            return TaskResult(
                success=True,
                data={"synced_records": count},
            )
        except RateLimitError as e:
            return TaskResult(
                success=False,
                error_message=f"Rate limited: {e}",
            )

    return TaskResult(success=False, error_message="Unknown task")

Key Points:

dvp_task_failed

Control retry behavior when tasks fail.

def dvp_task_failed(
    self,
    ctx: "HookContext",
    task: "ScheduledTaskDefinition",
    error: Exception,
    execution_ctx: "TaskExecutionContext",
) -> "TaskRetryDecision" | None:
    """
    Type: First-wins hook
    Returns: TaskRetryDecision or None for default behavior
    """
    from dvp_cms.plugins.hooks.scheduled_tasks import TaskRetryDecision

    # Extended backoff for rate limiting
    if "rate limit" in str(error).lower():
        return TaskRetryDecision(
            should_retry=True,
            delay_seconds=300,  # Wait 5 minutes
            reason="Rate limited - extending backoff",
        )

    # Don't retry auth errors
    if isinstance(error, AuthenticationError):
        return TaskRetryDecision(
            should_retry=False,
            abort_remaining=True,
            reason="Auth failed - manual intervention needed",
        )

    # Use default retry behavior
    return None

Scheduling Options

Cron Expression

Run at specific times using cron syntax:

TaskSchedule(cron_expression="0 3 * * *")   # 3 AM daily
TaskSchedule(cron_expression="0 */4 * * *") # Every 4 hours
TaskSchedule(cron_expression="0 0 * * 0")   # Sunday midnight
TaskSchedule(cron_expression="0 9 * * 1-5") # 9 AM weekdays

Cron Format: minute hour day month weekday

Interval

Run at fixed intervals:

TaskSchedule(interval_seconds=3600)    # Every hour
TaskSchedule(interval_seconds=86400)   # Every 24 hours
TaskSchedule(interval_seconds=300)     # Every 5 minutes

Data Classes

TaskResult

@dataclass
class TaskResult:
    success: bool                         # Did task succeed?
    data: dict[str, Any] | None = None   # Result data
    error_message: str | None = None     # Error details
    duration_ms: int | None = None       # Execution time

# Examples
TaskResult(success=True, data={"processed": 100})
TaskResult(success=False, error_message="API timeout")

TaskRetryDecision

@dataclass
class TaskRetryDecision:
    should_retry: bool              # Whether to retry
    delay_seconds: int = 0          # Wait before retry
    abort_remaining: bool = False   # Cancel remaining retries
    reason: str = ""                # Explanation

# Examples
TaskRetryDecision(should_retry=True, delay_seconds=60)
TaskRetryDecision(should_retry=False, reason="Permanent failure")

Best Practices

  1. Use plugin prefix for task IDs - Ensures uniqueness
  2. Set appropriate timeouts - Prevent runaway tasks
  3. Return TaskResult, don't raise - Cleaner error handling
  4. Log task execution - Aid debugging
  5. Handle retries gracefully - Use execution_ctx.attempt
  6. Test task registration and execution - Verify behavior

See Also