Backend Development Patterns¶
Overview¶
This guide covers current development patterns for AICO's backend service, including plugin development, middleware implementation, and integration with the shared library system.
Plugin Development¶
Plugin Architecture¶
AICO uses a service container architecture with standardized plugin base classes:
from aico.core.logging_context import create_infrastructure_logger
from backend.core.plugin_base import BasePlugin, PluginMetadata, PluginPriority
from backend.core.service_container import ServiceContainer
class MyPlugin(BasePlugin):
def __init__(self, name: str, container: ServiceContainer):
super().__init__(name, container)
self.logger = create_infrastructure_logger("aico.infrastructure.plugin.my_plugin")
@property
def metadata(self):
return PluginMetadata(
name="My Plugin",
version="1.0.0",
description="Example plugin implementation",
priority=PluginPriority.NORMAL
)
async def initialize(self) -> None:
"""Initialize plugin with required services."""
self.config = self.require_service('config')
self.db_connection = self.require_service('database')
async def start(self) -> None:
"""Start plugin services."""
self.logger.info("Starting MyPlugin")
# Plugin startup logic
async def stop(self) -> None:
"""Stop plugin services."""
self.logger.info("Stopping MyPlugin")
# Plugin cleanup logic
Plugin Registration¶
Plugins are registered with the service container during backend startup:
# In BackendLifecycleManager
from backend.api_gateway.plugins.my_plugin import MyPlugin
# Register plugin with service container
container.register_service(
"my_plugin",
lambda: MyPlugin("my_plugin", container),
dependencies=["config", "database"],
priority=100
)
Database Integration¶
Plugins receive a shared encrypted database connection:
class DatabasePlugin(BasePlugin):
def __init__(self, name: str, container: ServiceContainer):
super().__init__(name, container)
@property
def metadata(self):
return PluginMetadata(
name="Database Plugin",
version="1.0.0",
description="Database integration plugin",
priority=PluginPriority.NORMAL
)
async def initialize(self) -> None:
self.db_connection = self.require_service('database')
async def start(self):
# Use shared connection
cursor = self.db_connection.execute("SELECT * FROM logs LIMIT 5")
results = cursor.fetchall()
self.logger.info(f"Found {len(results)} recent logs")
Middleware Development¶
ASGI Middleware Pattern¶
The current architecture uses ASGI middleware for cross-cutting concerns:
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
class CustomMiddleware(BaseHTTPMiddleware):
def __init__(self, app, key_manager):
super().__init__(app)
self.key_manager = key_manager
async def dispatch(self, request: Request, call_next):
# Pre-processing
response = await call_next(request)
# Post-processing
return response
Encryption Middleware Integration¶
The encryption middleware wraps the entire FastAPI app:
from backend.api_gateway.middleware.encryption import EncryptionMiddleware
from aico.security.key_manager import AICOKeyManager
# In main.py
key_manager = AICOKeyManager(config_manager)
app = EncryptionMiddleware(fastapi_app, key_manager)
Message Bus Integration¶
Publishing Messages¶
Use the shared message bus client for inter-module communication:
from aico.core.bus import MessageBusClient
class ServicePlugin(BasePlugin):
async def start(self):
self.bus_client = MessageBusClient()
await self.bus_client.connect()
async def publish_event(self, event_data):
await self.bus_client.publish("events.service", event_data)
Subscribing to Topics¶
Subscribe to message bus topics for reactive processing:
async def handle_message(self, topic: str, message: dict):
self.logger.info(f"Received message on {topic}: {message}")
async def start(self):
await self.bus_client.subscribe("logs.*", self.handle_message)
Shared Library Integration¶
Using Core Utilities¶
Import shared utilities following the namespace pattern:
from aico.core.config import ConfigurationManager
from aico.core.logging import get_logger
from aico.core.paths import AICOPaths
from aico.security import AICOKeyManager
from aico.data.libsql.encrypted import EncryptedLibSQLConnection
Configuration Management¶
Access configuration through the centralized system:
class ConfiguredService:
def __init__(self, config_manager: ConfigurationManager):
self.config = config_manager
# Access nested configuration
api_config = self.config.get("core.api_gateway", {})
port = api_config.get("rest.port", 8771)
# Plugin-specific configuration
plugin_config = self.config.get("plugins.my_plugin", {})
Logging Patterns¶
Use structured logging with ZMQ transport:
from aico.core.logging import get_logger
class LoggingService:
def __init__(self):
self.logger = get_logger("service", "component")
def process_request(self, request_id: str):
self.logger.info(
"Processing request",
extra={
"request_id": request_id,
"event_type": "request_processing",
"component": "service"
}
)
Process Management¶
Graceful Shutdown¶
Implement proper shutdown handling in plugins:
class ManagedPlugin(BasePlugin):
def __init__(self, name: str, container: ServiceContainer):
super().__init__(name, container)
self.running = False
self.background_tasks = set()
async def start(self):
self.running = True
task = asyncio.create_task(self.background_worker())
self.background_tasks.add(task)
async def stop(self):
self.running = False
# Cancel background tasks
for task in self.background_tasks:
if not task.done():
task.cancel()
# Wait for cleanup
await asyncio.gather(*self.background_tasks, return_exceptions=True)
Background Task Management¶
Coordinate background tasks with the main process:
async def background_worker(self):
while self.running:
try:
# Background work
await self.process_queue()
await asyncio.sleep(1.0)
except asyncio.CancelledError:
self.logger.info("Background worker cancelled")
break
except Exception as e:
self.logger.error(f"Background worker error: {e}")
await asyncio.sleep(5.0)
Database Patterns¶
Encrypted Connection Usage¶
Use the shared encrypted database connection:
class DatabaseService:
def __init__(self, db_connection):
self.db = db_connection
def insert_record(self, data):
cursor = self.db.execute(
"INSERT INTO table_name (column1, column2) VALUES (?, ?)",
(data['value1'], data['value2'])
)
self.db.commit()
return cursor.lastrowid
def query_records(self, filter_value):
cursor = self.db.execute(
"SELECT * FROM table_name WHERE column1 = ?",
(filter_value,)
)
return cursor.fetchall()
Transaction Management¶
Handle transactions properly with error recovery:
def transactional_operation(self, operations):
try:
for operation in operations:
self.db.execute(operation['sql'], operation['params'])
self.db.commit()
self.logger.info("Transaction completed successfully")
except Exception as e:
self.db.rollback()
self.logger.error(f"Transaction failed, rolled back: {e}")
raise
Error Handling¶
Plugin Error Recovery¶
Implement robust error handling in plugins:
class ResilientPlugin(BasePlugin):
async def start(self):
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
await self.initialize_service()
break
except Exception as e:
retry_count += 1
self.logger.warning(f"Initialization failed (attempt {retry_count}): {e}")
if retry_count >= max_retries:
self.logger.error("Max retries exceeded, plugin failed to start")
raise
await asyncio.sleep(2 ** retry_count) # Exponential backoff
Middleware Error Handling¶
Handle errors gracefully in middleware:
async def dispatch(self, request: Request, call_next):
try:
response = await call_next(request)
return response
except Exception as e:
self.logger.error(f"Request processing failed: {e}")
return Response(
content={"error": "Internal server error"},
status_code=500,
media_type="application/json"
)
Testing Patterns¶
Plugin Testing¶
Test plugins with mock dependencies:
import pytest
from unittest.mock import Mock, AsyncMock
@pytest.fixture
def mock_config():
config = Mock()
config.get.return_value = {"test": "value"}
return config
@pytest.fixture
def mock_db():
db = Mock()
db.execute.return_value = Mock()
return db
@pytest.mark.asyncio
async def test_plugin_start(mock_config, mock_db):
plugin = MyPlugin(mock_config, db_connection=mock_db)
await plugin.start()
assert plugin.running is True
Integration Testing¶
Test with real shared components:
@pytest.mark.integration
async def test_message_bus_integration():
from aico.core.config import ConfigurationManager
from aico.core.bus import MessageBusClient
config = ConfigurationManager()
config.initialize(lightweight=True)
client = MessageBusClient()
await client.connect()
# Test publish/subscribe
received_messages = []
async def handler(topic, message):
received_messages.append((topic, message))
await client.subscribe("test.*", handler)
await client.publish("test.message", {"data": "test"})
# Allow message processing
await asyncio.sleep(0.1)
assert len(received_messages) == 1
assert received_messages[0][0] == "test.message"
Performance Considerations¶
Async/Await Best Practices¶
Use proper async patterns for non-blocking operations:
class AsyncService:
async def process_batch(self, items):
# Process items concurrently
tasks = [self.process_item(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle results and exceptions
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
self.logger.info(f"Processed {len(successful)} items, {len(failed)} failed")
return successful
Resource Management¶
Manage resources efficiently in long-running services:
class ResourceManagedService:
def __init__(self):
self.connection_pool = {}
self.cache = {}
self.max_cache_size = 1000
async def get_connection(self, key):
if key not in self.connection_pool:
self.connection_pool[key] = await self.create_connection(key)
return self.connection_pool[key]
def cache_result(self, key, value):
if len(self.cache) >= self.max_cache_size:
# Remove oldest entry
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[key] = value
Deployment Patterns¶
Service Configuration¶
Configure services for different deployment environments:
class DeploymentAwareService:
def __init__(self, config_manager):
self.config = config_manager
self.environment = self.config.get("environment", "development")
if self.environment == "production":
self.setup_production_config()
else:
self.setup_development_config()
def setup_production_config(self):
# Production-specific configuration
self.log_level = "INFO"
self.enable_metrics = True
self.connection_timeout = 30
def setup_development_config(self):
# Development-specific configuration
self.log_level = "DEBUG"
self.enable_metrics = False
self.connection_timeout = 5
Health Checks¶
Implement health checks for service monitoring:
class HealthCheckPlugin(BasePlugin):
async def health_check(self):
checks = {
"database": await self.check_database(),
"message_bus": await self.check_message_bus(),
"external_services": await self.check_external_services()
}
all_healthy = all(checks.values())
return {
"status": "healthy" if all_healthy else "unhealthy",
"checks": checks,
"timestamp": datetime.utcnow().isoformat()
}