Plugin Development Guide

Complete reference for building, testing, and deploying DVP CMS plugins

Reference Python 3.12+ DVP CMS v1.0+

Architecture Overview

Core Concepts

DVP CMS plugins are Python classes that:

  1. Extend the Plugin base class - Provides registration, configuration, and secrets management
  2. Implement hook protocols - Define which events your plugin responds to
  3. Are tenant-isolated - Each tenant has their own configuration and secrets

Available Hook Protocols

Protocol Purpose When Called
ContentLifecycleHooks React to content events On publish, update, delete
ScheduledTaskHooks Run periodic tasks On configured schedule (cron/interval)
HealthMonitorHooks Monitor external services During health check cycles

Plugin Lifecycle

  1. Registration - Plugin registered with PluginRegistry
  2. Enable - Plugin enabled for tenant
  3. Configuration - Tenant-specific config loaded
  4. Hook Execution - Plugin methods called on events
  5. Disable - Plugin disabled (config preserved)

Getting Started

Project Structure

my_plugin/
├── __init__.py        # Package exports
├── config.py          # Configuration dataclass
├── client.py          # External API client (if needed)
└── plugin.py          # Main plugin class

Minimal Plugin Example

config.py

from dataclasses import dataclass
from typing import Any


@dataclass(frozen=True)
class MyPluginConfig:
    """Immutable configuration for MyPlugin."""

    api_endpoint: str
    timeout_seconds: int = 30

    def __post_init__(self) -> None:
        if not self.api_endpoint:
            raise ValueError("api_endpoint is required")

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "MyPluginConfig":
        return cls(
            api_endpoint=data.get("api_endpoint", ""),
            timeout_seconds=data.get("timeout_seconds", 30),
        )

plugin.py

from typing import Any

from dvp_cms.plugins.base import Plugin
from dvp_cms.plugins.hookspec import ContentLifecycleHooks
from dvp_cms.plugins.context import HookContext
from dvp_cms.kernel.logging import get_logger

from .config import MyPluginConfig

logger = get_logger(__name__)


class MyPlugin(Plugin, ContentLifecycleHooks):
    """Example plugin that reacts to content publishing."""

    name = "my-plugin"
    version = "1.0.0"
    description = "Does something useful when content is published"
    author = "Your Name"

    def __init__(self) -> None:
        super().__init__()
        self._configs: dict[str, MyPluginConfig] = {}

    def _get_config(self, tenant_id: str) -> MyPluginConfig:
        """Get tenant-specific configuration."""
        if tenant_id not in self._configs:
            config_dict = self.get_config(tenant_id)
            self._configs[tenant_id] = MyPluginConfig.from_dict(config_dict)
        return self._configs[tenant_id]

    async def dvp_content_published(
        self,
        ctx: HookContext,
        content_id: str,
        content: dict[str, Any],
        metadata: dict[str, Any] | None = None,
    ) -> None:
        """Called when content is published."""
        tenant_id = str(ctx.tenant_id)

        try:
            config = self._get_config(tenant_id)

            # Your logic here
            logger.info(
                f"Content published: {content_id}",
                extra={"tenant_id": tenant_id, "domain": content.get("domain")},
            )

        except Exception as e:
            # Action hooks should not propagate exceptions
            logger.exception(f"Error in my-plugin: {e}")

__init__.py

from .config import MyPluginConfig
from .plugin import MyPlugin

__all__ = ["MyPlugin", "MyPluginConfig"]

Hook Protocols Reference

ContentLifecycleHooks

React to content publishing, updating, and deletion.

class ContentLifecycleHooks(Protocol):
    async def dvp_content_published(
        self,
        ctx: HookContext,
        content_id: str,
        content: dict[str, Any],
        metadata: dict[str, Any] | None = None,
    ) -> None:
        """Called after content is successfully published."""
        ...

Content Dictionary Fields

Use Cases

ScheduledTaskHooks

Run periodic tasks on a schedule.

class ScheduledTaskHooks(Protocol):
    def dvp_register_scheduled_tasks(
        self,
        ctx: HookContext,
    ) -> list[ScheduledTaskDefinition]:
        """Register tasks to be scheduled."""
        ...

    async def dvp_execute_scheduled_task(
        self,
        task: ScheduledTaskDefinition,
        ctx: HookContext,
        execution_ctx: TaskExecutionContext,
    ) -> TaskResult:
        """Execute a scheduled task."""
        ...

Task Definition

from dvp_cms.plugins.hooks.scheduled_tasks import (
    ScheduledTaskDefinition,
    TaskSchedule,
    TaskResult,
)

# Register a daily task at 6 AM
ScheduledTaskDefinition(
    task_id="my-plugin.daily-sync",
    name="Daily Data Sync",
    schedule=TaskSchedule(cron_expression="0 6 * * *"),
    description="Syncs data from external API daily",
    max_retries=3,
    timeout_seconds=600,
)

# Or use interval scheduling (every 5 minutes)
TaskSchedule(interval_seconds=300)

Return TaskResult

# Success
return TaskResult(
    success=True,
    data={"items_processed": 150},
)

# Failure
return TaskResult(
    success=False,
    error_message="API rate limit exceeded",
)

Use Cases

HealthMonitorHooks

Monitor external services and report health status.

class HealthMonitorHooks(Protocol):
    async def dvp_execute_health_check(
        self,
        ctx: HookContext,
        check_type: str,
        target: str,
    ) -> HealthCheckResult:
        """Execute a health check."""
        ...

Use Cases

Configuration & Secrets

Plugin Configuration

Configuration is stored per-tenant and accessed via self.get_config(tenant_id):

def _get_config(self, tenant_id: str) -> MyPluginConfig:
    config_dict = self.get_config(tenant_id)  # Returns dict
    return MyPluginConfig.from_dict(config_dict)

Configuration is public - suitable for:

Plugin Secrets

Secrets are stored per-tenant and accessed via self.get_secrets(tenant_id):

secrets = self.get_secrets(tenant_id)

# Required secret (raises KeyError if missing)
api_key = secrets.require("api_key")

# Optional secret with default
api_key = secrets.get("api_key", default="")

Secrets are encrypted - suitable for:

Building an API Client

For plugins that integrate with external services, create a dedicated client class:

# client.py
import asyncio
from typing import Any

import httpx
from dvp_cms.kernel.logging import get_logger

logger = get_logger(__name__)


class MyAPIClient:
    """Async client for MyService API."""

    def __init__(self, api_key: str, base_url: str = "https://api.example.com"):
        self._api_key = api_key
        self._base_url = base_url

    async def fetch_data(self, resource_id: str) -> dict[str, Any]:
        """Fetch data from the API with retry logic."""
        headers = {
            "Authorization": f"Bearer {self._api_key}",
            "Content-Type": "application/json",
        }

        last_error: Exception | None = None

        for attempt in range(3):  # 3 retry attempts
            try:
                async with httpx.AsyncClient(timeout=30.0) as client:
                    response = await client.get(
                        f"{self._base_url}/resources/{resource_id}",
                        headers=headers,
                    )

                    if response.status_code == 200:
                        return {"success": True, "data": response.json()}

                    if response.status_code == 429:  # Rate limited
                        delay = min(2 ** attempt * 5, 60)
                        logger.warning(f"Rate limited, retrying in {delay}s")
                        await asyncio.sleep(delay)
                        continue

                    if response.status_code == 401:
                        return {"success": False, "error": "Authentication failed"}

                    return {
                        "success": False,
                        "error": f"HTTP {response.status_code}",
                    }

            except Exception as e:
                last_error = e
                if attempt < 2:
                    await asyncio.sleep(2 ** attempt)
                    continue

        return {"success": False, "error": f"Max retries exceeded: {last_error}"}
Best Practices:
  • Always use async HTTP clients (httpx.AsyncClient)
  • Implement retry with exponential backoff
  • Handle rate limits gracefully (429 responses)
  • Return structured results (success/error dicts)
  • Log errors with context

HookContext Reference

Every hook receives a HookContext with tenant and tracing information:

from uuid import UUID
from datetime import datetime
from dataclasses import dataclass
from typing import Any

from dvp_cms.plugins.context import HookContext


@dataclass(frozen=True)
class HookContext:
    # Required - always present
    tenant_id: UUID        # Tenant's unique identifier
    tenant_slug: str       # Human-readable tenant name (e.g., "acme-corp")
    tenant_plan: str       # Subscription plan: "free", "pro", "enterprise"

    # Distributed tracing - may be None
    correlation_id: UUID | None   # Links related operations across services
    causation_id: UUID | None     # ID of the event that caused this hook

    # Metadata
    timestamp: datetime    # When the hook was invoked (UTC)
    metadata: dict[str, Any]  # Additional context from the system

Accessing Context in Hooks

def dvp_before_content_create(
    self,
    ctx: HookContext,
    content: dict[str, Any],
    metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """All hooks: ctx is always first. Filter hooks must return value.

    All DVP CMS hooks receive HookContext as the first parameter
    for consistency and predictability.
    """
    # Check tenant plan for feature gating
    if ctx.tenant_plan == "free" and content.get("word_count", 0) > 5000:
        raise PluginError("Free plan limited to 5000 words")

    # Use correlation_id for logging
    self.logger.info(
        "Processing content",
        extra={
            "tenant": ctx.tenant_slug,
            "correlation_id": str(ctx.correlation_id) if ctx.correlation_id else None,
        },
    )
    return content  # Filter hooks MUST return a value

Scheduled Tasks

TaskExecutionContext Reference

When your scheduled task executes, it receives a TaskExecutionContext:

from dataclasses import dataclass
from datetime import datetime
from uuid import UUID

from dvp_cms.plugins.hooks.scheduled_tasks import TaskExecutionContext


@dataclass(frozen=True)
class TaskExecutionContext:
    task_id: str              # ID of the task being executed
    execution_id: UUID        # Unique ID for this execution attempt
    scheduled_time: datetime  # When the task was scheduled to run
    actual_time: datetime     # When execution actually started
    attempt: int              # Current attempt number (1 = first try)
    tenant_id: UUID | None    # Tenant context for the task
    correlation_id: UUID | None  # For distributed tracing

Usage in Your Plugin

async def dvp_execute_scheduled_task(
    self,
    task: ScheduledTaskDefinition,
    ctx: HookContext,
    execution_ctx: TaskExecutionContext,
) -> TaskResult:
    logger.info(
        f"Executing {task.task_id}",
        extra={
            "execution_id": str(execution_ctx.execution_id),
            "attempt": execution_ctx.attempt,
            "scheduled_for": execution_ctx.scheduled_time.isoformat(),
        },
    )

    if execution_ctx.attempt > 1:
        logger.warning(f"Retry attempt {execution_ctx.attempt}")

    # ... your logic ...
    return TaskResult(success=True)

Testing Your Plugin

Test Structure

tests/
└── plugins/
    └── test_my_plugin.py

Test Example

"""Tests for MyPlugin."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from my_plugin import MyPlugin, MyPluginConfig


class TestMyPluginConfig:
    """Test configuration validation."""

    def test_api_endpoint_required(self):
        """api_endpoint is required."""
        with pytest.raises(ValueError, match="api_endpoint"):
            MyPluginConfig(api_endpoint="")

    def test_valid_config_accepted(self):
        """Valid configuration is accepted."""
        config = MyPluginConfig(api_endpoint="https://api.example.com")
        assert config.api_endpoint == "https://api.example.com"
        assert config.timeout_seconds == 30  # default

    def test_from_dict(self):
        """from_dict creates valid config."""
        config = MyPluginConfig.from_dict({
            "api_endpoint": "https://api.example.com",
            "timeout_seconds": 60,
        })
        assert config.timeout_seconds == 60


class TestMyPluginRegistration:
    """Test plugin metadata."""

    def test_plugin_has_correct_name(self):
        plugin = MyPlugin()
        assert plugin.name == "my-plugin"

    def test_plugin_has_version(self):
        plugin = MyPlugin()
        assert plugin.version is not None


class TestMyPluginContentPublished:
    """Test dvp_content_published hook."""

    @pytest.fixture
    def hook_context(self):
        from dvp_cms.plugins.context import HookContext
        return HookContext(
            tenant_id="11111111-1111-1111-1111-111111111111",
            tenant_slug="test-tenant",
            tenant_plan="pro",
        )

    @pytest.fixture
    def sample_content(self):
        return {
            "content_id": "content-123",
            "domain": "example.com",
            "page_type": "article",
            "rendered_html": "<html><body>Test</body></html>",
        }

    @pytest.mark.asyncio
    async def test_handles_content_published(self, hook_context, sample_content):
        """Plugin handles content published event."""
        plugin = MyPlugin()

        # Mock configuration
        with patch.object(plugin, "_get_config") as mock_config:
            mock_config.return_value = MagicMock(
                api_endpoint="https://api.example.com",
            )

            # Should not raise
            await plugin.dvp_content_published(
                ctx=hook_context,
                content_id="content-123",
                content=sample_content,
                metadata=None,
            )

Running Tests

# Run plugin tests
pytest tests/plugins/test_my_plugin.py -v

# Run with coverage
pytest tests/plugins/test_my_plugin.py -v --cov=my_plugin

Error Handling

Action Hooks (ContentLifecycleHooks)

Action hooks should never propagate exceptions - they run alongside other plugins and shouldn't break the publishing flow:

async def dvp_content_published(self, ctx, content_id, content, metadata):
    try:
        # Your logic
        await self._do_something()
    except Exception as e:
        # Log but don't raise
        logger.exception(f"Error in plugin: {e}")

Scheduled Tasks

Scheduled tasks return TaskResult to indicate success/failure:

async def dvp_execute_scheduled_task(self, task, ctx, execution_ctx):
    try:
        result = await self._fetch_data()
        if result["success"]:
            return TaskResult(success=True, data=result)
        else:
            return TaskResult(success=False, error_message=result["error"])
    except Exception as e:
        return TaskResult(success=False, error_message=str(e))

Configuration UI

Tenants configure plugins through the DVP CMS Admin Dashboard:

Tenant Admin → Settings → Plugins → [Your Plugin]

The UI is auto-generated from your plugin's configuration schema.

Defining Configuration Fields

Schemas use JSON Schema (draft-07), validated by the jsonschema library. UI components are auto-generated from schema properties.

class MyPlugin(Plugin):
    # JSON Schema (draft-07) - validated by jsonschema library
    # UI components auto-generated from schema properties
    CONFIG_SCHEMA = {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "properties": {
            "api_endpoint": {
                "type": "string",
                "title": "API Endpoint",
                "description": "Your service's API URL",
                "format": "uri",  # Renders as URL input
            },
            "batch_size": {
                "type": "integer",
                "title": "Batch Size",
                "description": "Records per sync",
                "default": 100,
                "minimum": 1,
                "maximum": 1000,
            },
            "enabled_features": {
                "type": "array",
                "title": "Enabled Features",
                "items": {
                    "type": "string",
                    "enum": ["keywords", "backlinks", "rankings"]
                },
            },
        },
        "required": ["api_endpoint"],
    }

    # Secrets are stored encrypted and masked in UI
    SECRETS_SCHEMA = {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "properties": {
            "api_token": {
                "type": "string",
                "title": "API Token",
                "description": "Your service API token",
            },
        },
        "required": ["api_token"],
    }

Local Development

Initial Setup

# Clone the CMS and your plugin
git clone https://github.com/your-org/dvp-cms.git
cd dvp-cms

# Install in development mode
uv sync

# Create your plugin directory
mkdir -p src/dvp_cms/plugins/integrations/my_plugin

Development Cycle

# 1. Write tests first
pytest tests/plugins/integrations/test_my_plugin.py -v

# 2. Run tests in watch mode during development
pytest tests/plugins/integrations/test_my_plugin.py -v --tb=short -x

# 3. Run full suite before committing
pytest tests/ -v --cov=src --cov-report=term-missing

# 4. Type checking
mypy src/dvp_cms/plugins/integrations/my_plugin/

Hot Reload

DVP CMS does not have automatic hot-reload for plugins. However:

# Install pytest-watch for auto-rerun
uv add pytest-watch --dev

# Run tests on file changes
ptw tests/plugins/integrations/test_my_plugin.py

Plugin Dependencies

Plugins declare dependencies in their pyproject.toml:

[project]
name = "my-dvp-plugin"
version = "1.0.0"
dependencies = [
    "dvp-cms>=1.0.0",  # Core CMS (always required)
]

[project.optional-dependencies]
# Group extra dependencies by feature
aws = ["boto3>=1.28.0"]
analytics = ["pandas>=2.0.0", "numpy>=1.24.0"]
all = ["boto3>=1.28.0", "pandas>=2.0.0", "numpy>=1.24.0"]

In your plugin code, handle missing optional dependencies:

class S3DeployerPlugin(Plugin, ContentLifecycleHooks):
    def __init__(self) -> None:
        super().__init__()
        try:
            import boto3
            self._boto3 = boto3
        except ImportError:
            raise PluginError(
                "S3 Deployer requires boto3. "
                "Install with: pip install my-dvp-plugin[aws]"
            )

Installation by CMS administrators:

# Install plugin with all dependencies
uv add my-dvp-plugin[all]

# Or specific feature sets
uv add my-dvp-plugin[aws]

Production Checklist

Before publishing your plugin:

Code Quality

Error Handling

Testing

Documentation

Version Requirements

Python Version: 3.12+ (uses PEP 695 type syntax and | union syntax)

DVP CMS Version Compatibility

# In your plugin's pyproject.toml
[project]
dependencies = [
    "dvp-cms>=1.0.0,<2.0.0",  # Compatible with CMS 1.x
]

Hook Protocol Stability

Example Plugins

The following production plugins are available as references:

Plugin Hook Protocol Purpose
s3-static-deployer ContentLifecycleHooks Upload to S3 + CloudFront invalidation
vercel-deployer ContentLifecycleHooks Trigger Vercel deployments
netlify-deployer ContentLifecycleHooks Trigger Netlify builds
gsc-integration ScheduledTaskHooks Fetch Google Search Console data
ahrefs-integration ScheduledTaskHooks Fetch Ahrefs SEO data
domain-health-monitor HealthMonitorHooks SSL and uptime monitoring

Source code: src/dvp_cms/plugins/integrations/

Plugin Registration

Automatic Discovery

Place your plugin in src/dvp_cms/plugins/integrations/ for automatic discovery.

Manual Registration

from dvp_cms.plugins.registry import PluginRegistry
from my_plugin import MyPlugin

registry = PluginRegistry()
plugin = MyPlugin()
registry.register(plugin)
registry.enable(plugin.name)

DVP CMS Plugin SDK v1.0 — December 2025