Cross-plugin coordination, capability discovery, and inter-plugin communication.
Overview
Advanced plugin patterns enable:
- Loose coupling: Plugins interact without direct dependencies
- Capability discovery: Find plugins by what they do, not what they're named
- Message passing: Communicate through a mediated bus
- Dependency injection: Access shared services safely
Capability-Based Discovery
Find plugins by capability rather than name, enabling loose coupling.
Registering Capabilities
class SemanticSearchPlugin(Plugin, PluginLifecycleHooks):
"""Plugin that provides semantic search capability."""
name = "semantic-search"
version = "1.0.0"
async def initialize(self) -> None:
await super().initialize()
# Register capability with priority (lower = preferred)
await self.discovery.register_capability(
plugin_name=self.name,
capability="semantic_search",
priority=100,
metadata={"model": "text-embedding-ada-002"},
)
async def search(self, query: str, limit: int = 10) -> list[dict]:
"""Perform semantic search."""
# Implementation here
pass
Discovering Capabilities
class ContentPlugin(Plugin):
"""Plugin that uses semantic search capability."""
async def enhance_content(self, content: dict) -> dict:
try:
# Find best provider for capability
provider_name = self.discovery.get_best_provider(
"semantic_search",
require_healthy=True,
)
# Get the plugin instance
provider = self.registry.get(provider_name)
# Use the capability
related = await provider.search(query=content["title"], limit=5)
content["related_content"] = related
except CapabilityNotFoundError:
self.logger.warning("No semantic search provider available")
return content
Inter-Plugin Messaging
Communicate between plugins through a mediated bus.
Communication Patterns
- Broadcast: Fire-and-forget to all plugins
- Targeted: Send to specific plugin, no response
- Request-Response: Send and wait for response
- Broadcast-Collect: Send to all, collect responses
Request-Response
class QueryPlugin(Plugin):
"""Plugin that needs responses."""
async def get_analytics(self, content_id: str) -> dict:
try:
response = await self.bus.request(
sender=self.name,
target="analytics-plugin",
message_type="get_metrics",
payload={"content_id": content_id},
timeout_ms=5000, # 5 second timeout
)
return response.payload
except MessageTimeoutError:
self.logger.warning("Analytics request timed out")
return {}
Saga Pattern
For multi-step operations that may need rollback:
class PublishSagaPlugin(Plugin):
"""Coordinates multi-step publishing process."""
async def execute_publish_saga(self, content_id: str, ctx: HookContext) -> bool:
completed_steps = []
try:
# Step 1: Validate content
await self.bus.request(
sender=self.name,
target="validator-plugin",
message_type="validate",
payload={"content_id": content_id},
)
completed_steps.append("validate")
# Step 2: Optimize images
await self.bus.request(
sender=self.name,
target="image-optimizer",
message_type="optimize",
payload={"content_id": content_id},
)
completed_steps.append("optimize")
# Step 3: Deploy to CDN
await self.bus.request(
sender=self.name,
target="cdn-deployer",
message_type="deploy",
payload={"content_id": content_id},
)
completed_steps.append("deploy")
return True
except Exception as e:
self.logger.error(f"Saga failed: {e}")
# Compensate in reverse order
await self._compensate(content_id, completed_steps)
return False
Best Practices
1. Prefer Capabilities Over Direct References
# Wrong - tight coupling
deployer = self.registry.get("netlify-deployer")
# Right - loose coupling via capability
deployer_name = self.discovery.get_best_provider("static_deployment")
deployer = self.registry.get(deployer_name)
2. Use Timeouts for All Remote Calls
# Wrong - may hang forever
response = await self.bus.request(sender, target, msg_type, payload)
# Right - explicit timeout
response = await self.bus.request(
sender, target, msg_type, payload,
timeout_ms=5000,
)
3. Handle Missing Capabilities Gracefully
# Wrong - crashes if capability unavailable
provider = self.discovery.get_best_provider("optional_feature")
# Right - graceful degradation
try:
provider = self.discovery.get_best_provider("optional_feature")
except CapabilityNotFoundError:
self.logger.info("Optional feature not available")
provider = None