Standard Template Plugin

Intermediate ~320 lines 1-2 hours

DVP CMS is a truth distillation system for AI-generated content. Plugins are evidence suppliers—they verify facts, pull live data, and make content more trustworthy over time. Learn more →

← Back to Plugin Source Code
Plugin ID standard-template
Version 1.0.0
Dependencies httpx
Hooks ContentLifecycleHooks

This template represents a production-ready plugin that demonstrates the patterns most plugins need: configuration handling, HTTP API integration, response caching, error handling, and evidence submission. Use this template when building real plugins. For exhaustive documentation of every pattern, see the Advanced Template.

Features

Configuration Handling Parse and validate plugin configuration
HTTP Client Management Async HTTP with proper lifecycle
Response Caching TTL-based cache with eviction
Error Handling Graceful handling of API failures
Evidence Submission Submit data to authority system
Resource Cleanup Proper shutdown and cleanup

Source Code

plugin.py
"""
Standard Template Plugin for DVP CMS.

This template represents a production-ready plugin that demonstrates the
patterns most plugins need:
- Configuration handling
- HTTP API integration
- Response caching
- Error handling
- Evidence submission

Use this template when building real plugins. For exhaustive documentation
of every pattern, see `_template-canonical/`.

Example:
    >>> from dvp_cms.kernel.event_bus import EventBus
    >>>
    >>> event_bus = EventBus()
    >>> plugin = StandardTemplatePlugin(event_bus)
    >>> await plugin.initialize({
    ...     "api_key": "your-api-key",
    ...     "cache_ttl": 3600,
    ... })
"""

from __future__ import annotations

import hashlib
from dataclasses import dataclass, field
from datetime import datetime, timedelta, UTC
from typing import TYPE_CHECKING

import httpx

from dvp_cms.kernel.plugin import Plugin
from dvp_cms.kernel.event import Event, EventType
from dvp_cms.plugins.hookspec import ContentLifecycleHooks

if TYPE_CHECKING:
    from dvp_cms.kernel.event_bus import EventBus


# -----------------------------------------------------------------------------
# Data Classes
# -----------------------------------------------------------------------------


@dataclass(frozen=True, slots=True)
class APIResponse:
    """Response from the external API.

    Attributes:
        data: The response data dictionary.
        confidence: Confidence score (0.0 to 1.0).
        source: Data source identifier.
        fetched_at: When the data was fetched.
    """

    data: dict[str, object]
    confidence: float
    source: str
    fetched_at: datetime = field(default_factory=lambda: datetime.now(UTC))


@dataclass(slots=True)
class CacheEntry:
    """A cached API response with expiration tracking.

    Attributes:
        response: The cached API response.
        cached_at: When this entry was cached.
    """

    response: APIResponse
    cached_at: datetime = field(default_factory=lambda: datetime.now(UTC))

    def is_expired(self, ttl_seconds: int) -> bool:
        """Check if this cache entry has expired."""
        age = datetime.now(UTC) - self.cached_at
        return age > timedelta(seconds=ttl_seconds)


# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------


class APIConfig:
    """API configuration constants."""

    BASE_URL = "https://api.example.com/v1"
    TIMEOUT_SECONDS = 30.0


class CacheConfig:
    """Cache configuration constants."""

    DEFAULT_TTL_SECONDS = 3600
    MAX_SIZE = 500
    EVICTION_BATCH_SIZE = 50


class ContentFields:
    """Content field names."""

    TEXT_FIELDS = ("body", "content", "text", "description")


# -----------------------------------------------------------------------------
# Plugin Implementation
# -----------------------------------------------------------------------------


class StandardTemplatePlugin(Plugin, ContentLifecycleHooks):
    """A production-ready template plugin with API integration.

    This plugin demonstrates the standard patterns for:
    - Configuration handling with validation
    - HTTP client management
    - Response caching with TTL
    - Error handling
    - Evidence submission

    Configuration:
        api_key: API key for the external service (optional, uses mock if missing).
        cache_ttl: Cache time-to-live in seconds. Default: 3600.

    Example:
        >>> plugin = StandardTemplatePlugin(event_bus)
        >>> await plugin.initialize({"api_key": "your-key"})
    """

    plugin_id = "standard-template"
    plugin_version = "1.0.0"
    plugin_name = "Standard Template"
    plugin_description = "Production-ready template with API integration"
    plugin_author = "DVP CMS Team"
    plugin_capabilities = ["template", "api_integration"]

    def __init__(self, event_bus: EventBus) -> None:
        """Initialize the plugin."""
        super().__init__(event_bus)

        # Configuration
        self._api_key: str = ""
        self._cache_ttl: int = CacheConfig.DEFAULT_TTL_SECONDS

        # Runtime state
        self._cache: dict[str, CacheEntry] = {}
        self._http_client: httpx.AsyncClient | None = None

    async def initialize(self, config: dict[str, object]) -> None:
        """Initialize the plugin with configuration."""
        self._api_key = str(config.get("api_key", ""))
        self._cache_ttl = int(config.get("cache_ttl", CacheConfig.DEFAULT_TTL_SECONDS))

        self._http_client = httpx.AsyncClient(timeout=APIConfig.TIMEOUT_SECONDS)

        await self.subscribe(EventType.CONTENT_CREATED, self._handle_content_created)

        if not self._api_key:
            self._logger.warning("No API key configured; using mock data")

        self._logger.info("Initialized with cache_ttl=%ds", self._cache_ttl)

    async def shutdown(self) -> None:
        """Shut down the plugin and release resources."""
        if self._http_client is not None:
            await self._http_client.aclose()
            self._http_client = None

        self._cache.clear()
        await self.unsubscribe_all()
        self._logger.info("Shutdown complete")

    # -------------------------------------------------------------------------
    # Event Handlers
    # -------------------------------------------------------------------------

    async def _handle_content_created(self, event: Event) -> None:
        """Handle content creation events."""
        content = event.data.get("content")
        if not isinstance(content, dict):
            return

        text = self._extract_text(content)
        if not text:
            return

        response = await self._fetch_data(text)
        if response is not None:
            await self._submit_evidence(event.aggregate_id, response)

    # -------------------------------------------------------------------------
    # API Integration
    # -------------------------------------------------------------------------

    async def _fetch_data(self, text: str) -> APIResponse | None:
        """Fetch data from API with caching."""
        cache_key = hashlib.sha256(text.encode()).hexdigest()

        # Check cache
        cached = self._cache.get(cache_key)
        if cached is not None and not cached.is_expired(self._cache_ttl):
            return cached.response

        # Fetch fresh data
        if not self._api_key:
            response = self._create_mock_response()
        else:
            response = await self._call_api(text)

        if response is not None:
            self._set_cached(cache_key, response)

        return response

    async def _call_api(self, text: str) -> APIResponse | None:
        """Make API call with error handling."""
        if self._http_client is None:
            return None

        try:
            # In a real plugin, make the actual API call here
            self._logger.debug("Would call API with text length: %d", len(text))
            return self._create_mock_response()

        except httpx.TimeoutException:
            self._logger.warning("API timeout")
            return None
        except httpx.HTTPStatusError as e:
            self._logger.warning("API HTTP error %d", e.response.status_code)
            return None
        except httpx.RequestError as e:
            self._logger.warning("API request error: %s", e)
            return None

    def _create_mock_response(self) -> APIResponse:
        """Create mock response for testing."""
        return APIResponse(
            data={"result": "mock_data", "score": 0.75},
            confidence=0.80,
            source="Mock Data",
        )

    # -------------------------------------------------------------------------
    # Evidence Submission
    # -------------------------------------------------------------------------

    async def _submit_evidence(
        self,
        content_id: str,
        response: APIResponse,
    ) -> None:
        """Submit API response as evidence."""
        await self.emit_event(
            EventType.EVIDENCE_ADDED,
            {
                "content_id": content_id,
                "claim": f"API data for {content_id}",
                "source_type": "EXTERNAL_API",
                "supports_claim": True,
                "confidence": response.confidence,
                "evidence_data": {
                    "api_data": response.data,
                    "source": response.source,
                    "fetched_at": response.fetched_at.isoformat(),
                },
            },
            {"source_plugin": self.plugin_id},
        )

        self._logger.info(
            "Submitted evidence for %s (confidence: %.2f)",
            content_id,
            response.confidence,
        )

    # -------------------------------------------------------------------------
    # Helper Methods
    # -------------------------------------------------------------------------

    def _extract_text(self, content: dict[str, object]) -> str:
        """Extract text from content fields."""
        parts: list[str] = []
        for field_name in ContentFields.TEXT_FIELDS:
            value = content.get(field_name)
            if isinstance(value, str) and value.strip():
                parts.append(value.strip())
        return " ".join(parts)

    def _set_cached(self, key: str, response: APIResponse) -> None:
        """Store response in cache with eviction if needed."""
        if len(self._cache) >= CacheConfig.MAX_SIZE:
            self._evict_oldest(CacheConfig.EVICTION_BATCH_SIZE)
        self._cache[key] = CacheEntry(response=response)

    def _evict_oldest(self, count: int) -> None:
        """Evict oldest cache entries."""
        sorted_keys = sorted(
            self._cache.keys(),
            key=lambda k: self._cache[k].cached_at,
        )
        for key in sorted_keys[:count]:
            del self._cache[key]

    # -------------------------------------------------------------------------
    # Hook Implementations
    # -------------------------------------------------------------------------

    def dvp_before_content_create(
        self,
        content: dict[str, object],
        metadata: dict[str, object] | None = None,
    ) -> dict[str, object]:
        """Process content before creation.

        This hook passes through content unchanged. Processing happens
        asynchronously via event subscription.
        """
        return content