Architecture Overview
Core Concepts
DVP CMS plugins are Python classes that:
- Extend the Plugin base class - Provides registration, configuration, and secrets management
- Implement hook protocols - Define which events your plugin responds to
- Are tenant-isolated - Each tenant has their own configuration and secrets
Available Hook Protocols
| Protocol | Purpose | When Called |
|---|---|---|
ContentLifecycleHooks |
React to content events | On publish, update, delete |
ScheduledTaskHooks |
Run periodic tasks | On configured schedule (cron/interval) |
HealthMonitorHooks |
Monitor external services | During health check cycles |
Plugin Lifecycle
- Registration - Plugin registered with PluginRegistry
- Enable - Plugin enabled for tenant
- Configuration - Tenant-specific config loaded
- Hook Execution - Plugin methods called on events
- Disable - Plugin disabled (config preserved)
Getting Started
Project Structure
my_plugin/
├── __init__.py # Package exports
├── config.py # Configuration dataclass
├── client.py # External API client (if needed)
└── plugin.py # Main plugin class
Minimal Plugin Example
config.py
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class MyPluginConfig:
"""Immutable configuration for MyPlugin."""
api_endpoint: str
timeout_seconds: int = 30
def __post_init__(self) -> None:
if not self.api_endpoint:
raise ValueError("api_endpoint is required")
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MyPluginConfig":
return cls(
api_endpoint=data.get("api_endpoint", ""),
timeout_seconds=data.get("timeout_seconds", 30),
)
plugin.py
from typing import Any
from dvp_cms.plugins.base import Plugin
from dvp_cms.plugins.hookspec import ContentLifecycleHooks
from dvp_cms.plugins.context import HookContext
from dvp_cms.kernel.logging import get_logger
from .config import MyPluginConfig
logger = get_logger(__name__)
class MyPlugin(Plugin, ContentLifecycleHooks):
"""Example plugin that reacts to content publishing."""
name = "my-plugin"
version = "1.0.0"
description = "Does something useful when content is published"
author = "Your Name"
def __init__(self) -> None:
super().__init__()
self._configs: dict[str, MyPluginConfig] = {}
def _get_config(self, tenant_id: str) -> MyPluginConfig:
"""Get tenant-specific configuration."""
if tenant_id not in self._configs:
config_dict = self.get_config(tenant_id)
self._configs[tenant_id] = MyPluginConfig.from_dict(config_dict)
return self._configs[tenant_id]
async def dvp_content_published(
self,
ctx: HookContext,
content_id: str,
content: dict[str, Any],
metadata: dict[str, Any] | None = None,
) -> None:
"""Called when content is published."""
tenant_id = str(ctx.tenant_id)
try:
config = self._get_config(tenant_id)
# Your logic here
logger.info(
f"Content published: {content_id}",
extra={"tenant_id": tenant_id, "domain": content.get("domain")},
)
except Exception as e:
# Action hooks should not propagate exceptions
logger.exception(f"Error in my-plugin: {e}")
__init__.py
from .config import MyPluginConfig
from .plugin import MyPlugin
__all__ = ["MyPlugin", "MyPluginConfig"]
Hook Protocols Reference
ContentLifecycleHooks
React to content publishing, updating, and deletion.
class ContentLifecycleHooks(Protocol):
async def dvp_content_published(
self,
ctx: HookContext,
content_id: str,
content: dict[str, Any],
metadata: dict[str, Any] | None = None,
) -> None:
"""Called after content is successfully published."""
...
Content Dictionary Fields
content_id- Unique identifierdomain- Site domain (e.g., "example.com")page_type- Content type (e.g., "article", "product")rendered_htmlorhtml- The HTML contentquality_score- Content quality score (0-100)
Use Cases
- Trigger static site deployments (Vercel, Netlify, S3)
- Send notifications
- Update external caches
- Sync to external systems
ScheduledTaskHooks
Run periodic tasks on a schedule.
class ScheduledTaskHooks(Protocol):
def dvp_register_scheduled_tasks(
self,
ctx: HookContext,
) -> list[ScheduledTaskDefinition]:
"""Register tasks to be scheduled."""
...
async def dvp_execute_scheduled_task(
self,
task: ScheduledTaskDefinition,
ctx: HookContext,
execution_ctx: TaskExecutionContext,
) -> TaskResult:
"""Execute a scheduled task."""
...
Task Definition
from dvp_cms.plugins.hooks.scheduled_tasks import (
ScheduledTaskDefinition,
TaskSchedule,
TaskResult,
)
# Register a daily task at 6 AM
ScheduledTaskDefinition(
task_id="my-plugin.daily-sync",
name="Daily Data Sync",
schedule=TaskSchedule(cron_expression="0 6 * * *"),
description="Syncs data from external API daily",
max_retries=3,
timeout_seconds=600,
)
# Or use interval scheduling (every 5 minutes)
TaskSchedule(interval_seconds=300)
Return TaskResult
# Success
return TaskResult(
success=True,
data={"items_processed": 150},
)
# Failure
return TaskResult(
success=False,
error_message="API rate limit exceeded",
)
Use Cases
- Fetch data from external APIs (Google Search Console, Ahrefs, etc.)
- Generate reports
- Clean up old data
- Sync with external systems
HealthMonitorHooks
Monitor external services and report health status.
class HealthMonitorHooks(Protocol):
async def dvp_execute_health_check(
self,
ctx: HookContext,
check_type: str,
target: str,
) -> HealthCheckResult:
"""Execute a health check."""
...
Use Cases
- SSL certificate monitoring
- Uptime monitoring
- API availability checks
- DNS verification
Configuration & Secrets
Plugin Configuration
Configuration is stored per-tenant and accessed via self.get_config(tenant_id):
def _get_config(self, tenant_id: str) -> MyPluginConfig:
config_dict = self.get_config(tenant_id) # Returns dict
return MyPluginConfig.from_dict(config_dict)
Configuration is public - suitable for:
- API endpoints
- Feature flags
- Thresholds and limits
- Domain lists
Plugin Secrets
Secrets are stored per-tenant and accessed via self.get_secrets(tenant_id):
secrets = self.get_secrets(tenant_id)
# Required secret (raises KeyError if missing)
api_key = secrets.require("api_key")
# Optional secret with default
api_key = secrets.get("api_key", default="")
Secrets are encrypted - suitable for:
- API tokens and keys
- OAuth credentials
- Passwords
- Private keys
Building an API Client
For plugins that integrate with external services, create a dedicated client class:
# client.py
import asyncio
from typing import Any
import httpx
from dvp_cms.kernel.logging import get_logger
logger = get_logger(__name__)
class MyAPIClient:
"""Async client for MyService API."""
def __init__(self, api_key: str, base_url: str = "https://api.example.com"):
self._api_key = api_key
self._base_url = base_url
async def fetch_data(self, resource_id: str) -> dict[str, Any]:
"""Fetch data from the API with retry logic."""
headers = {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
}
last_error: Exception | None = None
for attempt in range(3): # 3 retry attempts
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{self._base_url}/resources/{resource_id}",
headers=headers,
)
if response.status_code == 200:
return {"success": True, "data": response.json()}
if response.status_code == 429: # Rate limited
delay = min(2 ** attempt * 5, 60)
logger.warning(f"Rate limited, retrying in {delay}s")
await asyncio.sleep(delay)
continue
if response.status_code == 401:
return {"success": False, "error": "Authentication failed"}
return {
"success": False,
"error": f"HTTP {response.status_code}",
}
except Exception as e:
last_error = e
if attempt < 2:
await asyncio.sleep(2 ** attempt)
continue
return {"success": False, "error": f"Max retries exceeded: {last_error}"}
- Always use async HTTP clients (
httpx.AsyncClient) - Implement retry with exponential backoff
- Handle rate limits gracefully (429 responses)
- Return structured results (success/error dicts)
- Log errors with context
HookContext Reference
Every hook receives a HookContext with tenant and tracing information:
from uuid import UUID
from datetime import datetime
from dataclasses import dataclass
from typing import Any
from dvp_cms.plugins.context import HookContext
@dataclass(frozen=True)
class HookContext:
# Required - always present
tenant_id: UUID # Tenant's unique identifier
tenant_slug: str # Human-readable tenant name (e.g., "acme-corp")
tenant_plan: str # Subscription plan: "free", "pro", "enterprise"
# Distributed tracing - may be None
correlation_id: UUID | None # Links related operations across services
causation_id: UUID | None # ID of the event that caused this hook
# Metadata
timestamp: datetime # When the hook was invoked (UTC)
metadata: dict[str, Any] # Additional context from the system
Accessing Context in Hooks
def dvp_before_content_create(
self,
ctx: HookContext,
content: dict[str, Any],
metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""All hooks: ctx is always first. Filter hooks must return value.
All DVP CMS hooks receive HookContext as the first parameter
for consistency and predictability.
"""
# Check tenant plan for feature gating
if ctx.tenant_plan == "free" and content.get("word_count", 0) > 5000:
raise PluginError("Free plan limited to 5000 words")
# Use correlation_id for logging
self.logger.info(
"Processing content",
extra={
"tenant": ctx.tenant_slug,
"correlation_id": str(ctx.correlation_id) if ctx.correlation_id else None,
},
)
return content # Filter hooks MUST return a value
Scheduled Tasks
TaskExecutionContext Reference
When your scheduled task executes, it receives a TaskExecutionContext:
from dataclasses import dataclass
from datetime import datetime
from uuid import UUID
from dvp_cms.plugins.hooks.scheduled_tasks import TaskExecutionContext
@dataclass(frozen=True)
class TaskExecutionContext:
task_id: str # ID of the task being executed
execution_id: UUID # Unique ID for this execution attempt
scheduled_time: datetime # When the task was scheduled to run
actual_time: datetime # When execution actually started
attempt: int # Current attempt number (1 = first try)
tenant_id: UUID | None # Tenant context for the task
correlation_id: UUID | None # For distributed tracing
Usage in Your Plugin
async def dvp_execute_scheduled_task(
self,
task: ScheduledTaskDefinition,
ctx: HookContext,
execution_ctx: TaskExecutionContext,
) -> TaskResult:
logger.info(
f"Executing {task.task_id}",
extra={
"execution_id": str(execution_ctx.execution_id),
"attempt": execution_ctx.attempt,
"scheduled_for": execution_ctx.scheduled_time.isoformat(),
},
)
if execution_ctx.attempt > 1:
logger.warning(f"Retry attempt {execution_ctx.attempt}")
# ... your logic ...
return TaskResult(success=True)
Testing Your Plugin
Test Structure
tests/
└── plugins/
└── test_my_plugin.py
Test Example
"""Tests for MyPlugin."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from my_plugin import MyPlugin, MyPluginConfig
class TestMyPluginConfig:
"""Test configuration validation."""
def test_api_endpoint_required(self):
"""api_endpoint is required."""
with pytest.raises(ValueError, match="api_endpoint"):
MyPluginConfig(api_endpoint="")
def test_valid_config_accepted(self):
"""Valid configuration is accepted."""
config = MyPluginConfig(api_endpoint="https://api.example.com")
assert config.api_endpoint == "https://api.example.com"
assert config.timeout_seconds == 30 # default
def test_from_dict(self):
"""from_dict creates valid config."""
config = MyPluginConfig.from_dict({
"api_endpoint": "https://api.example.com",
"timeout_seconds": 60,
})
assert config.timeout_seconds == 60
class TestMyPluginRegistration:
"""Test plugin metadata."""
def test_plugin_has_correct_name(self):
plugin = MyPlugin()
assert plugin.name == "my-plugin"
def test_plugin_has_version(self):
plugin = MyPlugin()
assert plugin.version is not None
class TestMyPluginContentPublished:
"""Test dvp_content_published hook."""
@pytest.fixture
def hook_context(self):
from dvp_cms.plugins.context import HookContext
return HookContext(
tenant_id="11111111-1111-1111-1111-111111111111",
tenant_slug="test-tenant",
tenant_plan="pro",
)
@pytest.fixture
def sample_content(self):
return {
"content_id": "content-123",
"domain": "example.com",
"page_type": "article",
"rendered_html": "<html><body>Test</body></html>",
}
@pytest.mark.asyncio
async def test_handles_content_published(self, hook_context, sample_content):
"""Plugin handles content published event."""
plugin = MyPlugin()
# Mock configuration
with patch.object(plugin, "_get_config") as mock_config:
mock_config.return_value = MagicMock(
api_endpoint="https://api.example.com",
)
# Should not raise
await plugin.dvp_content_published(
ctx=hook_context,
content_id="content-123",
content=sample_content,
metadata=None,
)
Running Tests
# Run plugin tests
pytest tests/plugins/test_my_plugin.py -v
# Run with coverage
pytest tests/plugins/test_my_plugin.py -v --cov=my_plugin
Error Handling
Action Hooks (ContentLifecycleHooks)
Action hooks should never propagate exceptions - they run alongside other plugins and shouldn't break the publishing flow:
async def dvp_content_published(self, ctx, content_id, content, metadata):
try:
# Your logic
await self._do_something()
except Exception as e:
# Log but don't raise
logger.exception(f"Error in plugin: {e}")
Scheduled Tasks
Scheduled tasks return TaskResult to indicate success/failure:
async def dvp_execute_scheduled_task(self, task, ctx, execution_ctx):
try:
result = await self._fetch_data()
if result["success"]:
return TaskResult(success=True, data=result)
else:
return TaskResult(success=False, error_message=result["error"])
except Exception as e:
return TaskResult(success=False, error_message=str(e))
Configuration UI
Tenants configure plugins through the DVP CMS Admin Dashboard:
Tenant Admin → Settings → Plugins → [Your Plugin]
The UI is auto-generated from your plugin's configuration schema.
Defining Configuration Fields
Schemas use JSON Schema (draft-07), validated by the jsonschema library. UI components are auto-generated from schema properties.
class MyPlugin(Plugin):
# JSON Schema (draft-07) - validated by jsonschema library
# UI components auto-generated from schema properties
CONFIG_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"api_endpoint": {
"type": "string",
"title": "API Endpoint",
"description": "Your service's API URL",
"format": "uri", # Renders as URL input
},
"batch_size": {
"type": "integer",
"title": "Batch Size",
"description": "Records per sync",
"default": 100,
"minimum": 1,
"maximum": 1000,
},
"enabled_features": {
"type": "array",
"title": "Enabled Features",
"items": {
"type": "string",
"enum": ["keywords", "backlinks", "rankings"]
},
},
},
"required": ["api_endpoint"],
}
# Secrets are stored encrypted and masked in UI
SECRETS_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"api_token": {
"type": "string",
"title": "API Token",
"description": "Your service API token",
},
},
"required": ["api_token"],
}
Local Development
Initial Setup
# Clone the CMS and your plugin
git clone https://github.com/your-org/dvp-cms.git
cd dvp-cms
# Install in development mode
uv sync
# Create your plugin directory
mkdir -p src/dvp_cms/plugins/integrations/my_plugin
Development Cycle
# 1. Write tests first
pytest tests/plugins/integrations/test_my_plugin.py -v
# 2. Run tests in watch mode during development
pytest tests/plugins/integrations/test_my_plugin.py -v --tb=short -x
# 3. Run full suite before committing
pytest tests/ -v --cov=src --cov-report=term-missing
# 4. Type checking
mypy src/dvp_cms/plugins/integrations/my_plugin/
Hot Reload
DVP CMS does not have automatic hot-reload for plugins. However:
- Test files rerun automatically with pytest-watch
- For manual testing, restart the development server
- Integration tests use fresh plugin instances per test
# Install pytest-watch for auto-rerun
uv add pytest-watch --dev
# Run tests on file changes
ptw tests/plugins/integrations/test_my_plugin.py
Plugin Dependencies
Plugins declare dependencies in their pyproject.toml:
[project]
name = "my-dvp-plugin"
version = "1.0.0"
dependencies = [
"dvp-cms>=1.0.0", # Core CMS (always required)
]
[project.optional-dependencies]
# Group extra dependencies by feature
aws = ["boto3>=1.28.0"]
analytics = ["pandas>=2.0.0", "numpy>=1.24.0"]
all = ["boto3>=1.28.0", "pandas>=2.0.0", "numpy>=1.24.0"]
In your plugin code, handle missing optional dependencies:
class S3DeployerPlugin(Plugin, ContentLifecycleHooks):
def __init__(self) -> None:
super().__init__()
try:
import boto3
self._boto3 = boto3
except ImportError:
raise PluginError(
"S3 Deployer requires boto3. "
"Install with: pip install my-dvp-plugin[aws]"
)
Installation by CMS administrators:
# Install plugin with all dependencies
uv add my-dvp-plugin[all]
# Or specific feature sets
uv add my-dvp-plugin[aws]
Production Checklist
Before publishing your plugin:
Code Quality
- All functions have type hints
- All public methods have docstrings
- Configuration dataclass is frozen (immutable)
- Configuration validation in
__post_init__ from_dictclass method for deserialization
Error Handling
- All external API calls have retry logic
- Rate limits are handled gracefully
- Action hooks catch exceptions (don't propagate)
- Errors are logged with context
Testing
- Configuration validation tests
- Plugin registration tests
- Hook execution tests (mocked external calls)
- Error path tests
- All tests passing
Documentation
- Clear description in plugin class
- Required secrets documented
- Configuration options documented
Version Requirements
Python Version: 3.12+ (uses PEP 695 type syntax and | union syntax)
DVP CMS Version Compatibility
# In your plugin's pyproject.toml
[project]
dependencies = [
"dvp-cms>=1.0.0,<2.0.0", # Compatible with CMS 1.x
]
Hook Protocol Stability
- Hook protocols are versioned with the CMS
- Breaking changes only in major versions (1.x → 2.x)
- New hooks may be added in minor versions (1.0 → 1.1)
- Deprecated hooks will warn for one minor version before removal
Example Plugins
The following production plugins are available as references:
| Plugin | Hook Protocol | Purpose |
|---|---|---|
| s3-static-deployer | ContentLifecycleHooks | Upload to S3 + CloudFront invalidation |
| vercel-deployer | ContentLifecycleHooks | Trigger Vercel deployments |
| netlify-deployer | ContentLifecycleHooks | Trigger Netlify builds |
| gsc-integration | ScheduledTaskHooks | Fetch Google Search Console data |
| ahrefs-integration | ScheduledTaskHooks | Fetch Ahrefs SEO data |
| domain-health-monitor | HealthMonitorHooks | SSL and uptime monitoring |
Source code: src/dvp_cms/plugins/integrations/
Plugin Registration
Automatic Discovery
Place your plugin in src/dvp_cms/plugins/integrations/ for automatic discovery.
Manual Registration
from dvp_cms.plugins.registry import PluginRegistry
from my_plugin import MyPlugin
registry = PluginRegistry()
plugin = MyPlugin()
registry.register(plugin)
registry.enable(plugin.name)
DVP CMS Plugin SDK v1.0 — December 2025