Schema Management Developer Guide¶
This guide provides developers with practical instructions for implementing database schemas using AICO's decorator-based schema registry system.
Quick Start¶
Basic Schema Registration¶
# 1. Register your schema with a decorator
from aico.data import register_schema, SchemaVersion
@register_schema("my_module", "core", priority=5)
MY_MODULE_SCHEMA = {
1: SchemaVersion(
version=1,
name="My Module Schema",
description="Core tables for my module functionality",
sql_statements=[
"""
CREATE TABLE my_module_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
data_field TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"CREATE INDEX idx_my_module_user_id ON my_module_data(user_id)"
],
rollback_statements=[
"DROP INDEX IF EXISTS idx_my_module_user_id",
"DROP TABLE IF EXISTS my_module_data"
]
)
}
# 2. Schema is automatically applied during startup - no manual intervention needed!
# Just import the module containing the decorated schema
Schema Registration Patterns¶
Core Module Schema¶
Core AICO modules register schemas with priority ordering:
# aico/core/conversations/schema.py
from aico.data import register_schema, SchemaVersion
@register_schema("conversations", "core", priority=1)
CONVERSATION_SCHEMA = {
1: SchemaVersion(
version=1,
name="Conversation System",
description="Core conversation storage and retrieval",
sql_statements=[
"""
CREATE TABLE conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
message TEXT NOT NULL,
is_ai_response BOOLEAN DEFAULT FALSE,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
context_id TEXT,
metadata JSON
)
""",
"""
CREATE TABLE conversation_threads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
thread_name TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_activity TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"CREATE INDEX idx_conversations_user_id ON conversations(user_id)",
"CREATE INDEX idx_conversations_timestamp ON conversations(timestamp)",
"CREATE INDEX idx_conversations_context ON conversations(context_id)",
"CREATE INDEX idx_threads_user_id ON conversation_threads(user_id)"
],
rollback_statements=[
"DROP INDEX IF EXISTS idx_threads_user_id",
"DROP INDEX IF EXISTS idx_conversations_context",
"DROP INDEX IF EXISTS idx_conversations_timestamp",
"DROP INDEX IF EXISTS idx_conversations_user_id",
"DROP TABLE IF EXISTS conversation_threads",
"DROP TABLE IF EXISTS conversations"
]
),
2: SchemaVersion(
version=2,
name="Add Conversation Analytics",
description="Add analytics and metrics tracking",
sql_statements=[
"""
CREATE TABLE conversation_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id INTEGER NOT NULL,
response_time_ms INTEGER,
user_satisfaction INTEGER CHECK(user_satisfaction BETWEEN 1 AND 5),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (conversation_id) REFERENCES conversations (id)
)
""",
"CREATE INDEX idx_metrics_conversation_id ON conversation_metrics(conversation_id)"
],
rollback_statements=[
"DROP INDEX IF EXISTS idx_metrics_conversation_id",
"DROP TABLE IF EXISTS conversation_metrics"
]
)
}
Plugin Schema Pattern¶
Plugins register schemas that are automatically managed during plugin lifecycle:
# aico/plugins/calendar/schema.py
from aico.data import register_schema, SchemaVersion
@register_schema("calendar", "plugin")
CALENDAR_SCHEMA = {
1: SchemaVersion(
version=1,
name="Calendar Plugin",
description="Calendar events and scheduling",
sql_statements=[
"CREATE TABLE calendar_events (...)",
"CREATE TABLE calendar_reminders (...)",
"CREATE INDEX idx_calendar_events_user_id ON calendar_events(user_id)"
],
rollback_statements=[
"DROP INDEX IF EXISTS idx_calendar_events_user_id",
"DROP TABLE IF EXISTS calendar_reminders",
"DROP TABLE IF EXISTS calendar_events"
]
)
}
# No manual activation/deactivation code needed!
# Plugin system automatically handles schema application
Schema Evolution¶
Adding New Versions¶
When evolving your schema, always add new versions incrementally:
# Original schema
MY_SCHEMA = {
1: SchemaVersion(
version=1,
name="Initial Schema",
sql_statements=["CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)"],
rollback_statements=["DROP TABLE IF EXISTS users"]
)
}
# Evolution - add new version, don't modify existing ones
MY_SCHEMA = {
1: SchemaVersion(
version=1,
name="Initial Schema",
sql_statements=["CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)"],
rollback_statements=["DROP TABLE IF EXISTS users"]
),
2: SchemaVersion( # New version
version=2,
name="Add User Email",
description="Add email field to users table",
sql_statements=[
"ALTER TABLE users ADD COLUMN email TEXT",
"CREATE INDEX idx_users_email ON users(email)"
],
rollback_statements=[
"DROP INDEX IF EXISTS idx_users_email",
# Note: SQLite doesn't support DROP COLUMN
# Consider table recreation for complex rollbacks
]
)
}
Complex Schema Changes¶
For complex changes that require data migration:
COMPLEX_SCHEMA = {
3: SchemaVersion(
version=3,
name="Restructure User Data",
description="Split user data into separate tables",
sql_statements=[
# Create new table
"""
CREATE TABLE user_profiles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
first_name TEXT,
last_name TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""",
# Migrate existing data
"""
INSERT INTO user_profiles (user_id, first_name, last_name)
SELECT id,
SUBSTR(name, 1, INSTR(name || ' ', ' ') - 1) as first_name,
SUBSTR(name, INSTR(name || ' ', ' ') + 1) as last_name
FROM users
WHERE name IS NOT NULL
""",
# Create indexes
"CREATE INDEX idx_user_profiles_user_id ON user_profiles(user_id)"
],
rollback_statements=[
"DROP INDEX IF EXISTS idx_user_profiles_user_id",
"DROP TABLE IF EXISTS user_profiles"
]
)
}
Automated Integration¶
Application Startup¶
# backend/main.py - Fully automated schema application
from aico.data import EncryptedLibSQLConnection, SchemaRegistry
# Import all modules with registered schemas
import aico.core.conversations.schema # Registers conversation schema
import aico.core.users.schema # Registers user schema
import aico.core.memory.schema # Registers memory schema
def initialize_database():
connection = EncryptedLibSQLConnection(
db_path="~/.aico/user.db",
master_password=get_master_password()
)
# Automatically applies all registered core schemas in priority order
applied_versions = SchemaRegistry.apply_core_schemas(connection)
logger.info(f"Applied {len(applied_versions)} core schemas: {applied_versions}")
return connection
Plugin System Integration¶
# aico/plugins/manager.py - Automated plugin schema management
from aico.data import SchemaRegistry
class PluginManager:
def __init__(self, connection):
self.connection = connection
self.active_plugins = set()
def activate_plugin(self, plugin_name: str):
"""Activate a plugin - schema is automatically applied"""
# Import plugin module to register its schema
__import__(f"aico.plugins.{plugin_name}.schema")
# Automatically apply registered plugin schema
version = SchemaRegistry.apply_plugin_schema(plugin_name, self.connection)
self.active_plugins.add(plugin_name)
logger.info(f"Plugin {plugin_name} activated at schema version {version}")
def deactivate_plugin(self, plugin_name: str, remove_data: bool = False):
"""Deactivate a plugin - schema automatically removed if requested"""
if remove_data:
SchemaRegistry.remove_plugin_schema(plugin_name, self.connection)
logger.info(f"Plugin {plugin_name} schema removed")
self.active_plugins.discard(plugin_name)
logger.info(f"Plugin {plugin_name} deactivated")
def get_plugin_info(self, plugin_name: str) -> dict:
"""Get plugin schema information"""
schema_info = SchemaRegistry.get_schema_info(self.connection)
return schema_info.get("plugin_schemas", {}).get(plugin_name, {"status": "inactive"})
Testing Schema Changes¶
Unit Testing¶
# tests/test_schema.py
import pytest
import tempfile
from pathlib import Path
from aico.data import EncryptedLibSQLConnection, SchemaManager
@pytest.fixture
def temp_db():
"""Create a temporary encrypted database for testing"""
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
db_path = Path(f.name)
connection = EncryptedLibSQLConnection(
db_path=db_path,
master_password="test_password"
)
yield connection
# Cleanup
connection.disconnect()
db_path.unlink(missing_ok=True)
db_path.with_suffix('.db.salt').unlink(missing_ok=True)
def test_schema_migration(temp_db):
"""Test basic schema migration"""
test_schema = {
1: SchemaVersion(
version=1,
name="Test Schema",
sql_statements=["CREATE TABLE test_table (id INTEGER PRIMARY KEY, name TEXT)"],
rollback_statements=["DROP TABLE IF EXISTS test_table"]
)
}
# Apply schema
schema_manager = SchemaManager(temp_db, test_schema)
schema_manager.migrate_to_latest()
# Verify schema applied
assert schema_manager.get_current_version() == 1
assert temp_db.table_exists("test_table")
# Test rollback
schema_manager.rollback_to_version(0)
assert schema_manager.get_current_version() == 0
assert not temp_db.table_exists("test_table")
def test_schema_evolution(temp_db):
"""Test schema evolution with multiple versions"""
evolving_schema = {
1: SchemaVersion(
version=1,
name="Initial",
sql_statements=["CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)"],
rollback_statements=["DROP TABLE IF EXISTS users"]
),
2: SchemaVersion(
version=2,
name="Add Email",
sql_statements=[
"ALTER TABLE users ADD COLUMN email TEXT",
"CREATE INDEX idx_users_email ON users(email)"
],
rollback_statements=[
"DROP INDEX IF EXISTS idx_users_email"
# Note: Can't drop column in SQLite
]
)
}
schema_manager = SchemaManager(temp_db, evolving_schema)
# Apply version 1
schema_manager.migrate_to_version(1)
assert schema_manager.get_current_version() == 1
# Apply version 2
schema_manager.migrate_to_version(2)
assert schema_manager.get_current_version() == 2
# Verify table structure
table_info = temp_db.get_table_info("users")
column_names = [col['name'] for col in table_info]
assert 'email' in column_names
@pytest.mark.asyncio
async def test_plugin_schema_isolation(temp_db):
"""Test that plugin schemas are properly isolated"""
plugin_a_schema = {
1: SchemaVersion(
version=1,
name="Plugin A Schema",
sql_statements=["CREATE TABLE plugin_a_data (id INTEGER PRIMARY KEY, data TEXT)"],
rollback_statements=["DROP TABLE IF EXISTS plugin_a_data"]
)
}
plugin_b_schema = {
1: SchemaVersion(
version=1,
name="Plugin B Schema",
sql_statements=["CREATE TABLE plugin_b_data (id INTEGER PRIMARY KEY, info TEXT)"],
rollback_statements=["DROP TABLE IF EXISTS plugin_b_data"]
)
}
# Apply both plugin schemas
manager_a = SchemaManager(temp_db, plugin_a_schema)
manager_b = SchemaManager(temp_db, plugin_b_schema)
manager_a.migrate_to_latest()
manager_b.migrate_to_latest()
# Verify both tables exist
assert temp_db.table_exists("plugin_a_data")
assert temp_db.table_exists("plugin_b_data")
# Remove plugin A schema
manager_a.rollback_to_version(0)
# Verify plugin A table removed, plugin B table remains
assert not temp_db.table_exists("plugin_a_data")
assert temp_db.table_exists("plugin_b_data")
Integration Testing¶
# tests/integration/test_schema_integration.py
import pytest
from aico.plugins.manager import PluginSchemaManager
@pytest.mark.asyncio
async def test_full_plugin_lifecycle(temp_db):
"""Test complete plugin activation/deactivation lifecycle"""
plugin_manager = PluginSchemaManager(temp_db)
# Activate plugin
await plugin_manager.activate_plugin("calendar")
# Verify plugin is active
info = await plugin_manager.get_plugin_info("calendar")
assert info["status"] == "active"
assert info["schema_info"]["current_version"] > 0
# Deactivate plugin without removing data
await plugin_manager.deactivate_plugin("calendar", remove_data=False)
# Verify plugin is inactive but tables still exist
info = await plugin_manager.get_plugin_info("calendar")
assert info["status"] == "inactive"
assert temp_db.table_exists("calendar_events")
# Reactivate and then deactivate with data removal
await plugin_manager.activate_plugin("calendar")
await plugin_manager.deactivate_plugin("calendar", remove_data=True)
# Verify tables are removed
assert not temp_db.table_exists("calendar_events")
Best Practices¶
Schema Design Guidelines¶
- Use Incremental Versions: Each version should represent a single logical change
- Provide Complete Rollbacks: Always include rollback statements for every change
- Index Strategy: Create indexes for commonly queried columns
- Foreign Key Constraints: Use foreign keys to maintain referential integrity
- Data Types: Use appropriate SQLite data types (INTEGER, TEXT, REAL, BLOB)
Security Considerations¶
- No User Data in Schema: Schema definitions should contain structure only
- Validate SQL: Validate SQL statements don't contain malicious code
- Plugin Isolation: Use table prefixes to isolate plugin data
- Encryption Integration: Always use encrypted connections for sensitive data
Performance Optimization¶
- Index Planning: Create indexes for WHERE, ORDER BY, and JOIN columns
- Query Optimization: Design schema to support efficient queries
- Batch Operations: Use transactions for multiple schema operations
- Connection Reuse: Reuse database connections across schema operations
Error Handling¶
def safe_schema_migration(connection, schema_definitions):
"""Safely apply schema migration with proper error handling"""
schema_manager = SchemaManager(connection, schema_definitions)
try:
# Backup current state
backup_info = schema_manager.get_schema_info()
# Apply migration
schema_manager.migrate_to_latest()
# Validate result
validation = schema_manager.validate_schema()
if not validation['valid']:
raise SchemaValidationError(f"Schema validation failed: {validation}")
logger.info("Schema migration completed successfully")
return True
except Exception as e:
logger.error(f"Schema migration failed: {e}")
# Attempt rollback
try:
if backup_info['current_version'] > 0:
schema_manager.rollback_to_version(backup_info['current_version'])
logger.info("Successfully rolled back to previous schema version")
except Exception as rollback_error:
logger.critical(f"Rollback failed: {rollback_error}")
raise SchemaRollbackError("Migration failed and rollback unsuccessful")
raise
Troubleshooting¶
Common Issues¶
- Migration Already Applied: Check migration history before applying
- Rollback Limitations: SQLite doesn't support DROP COLUMN - plan accordingly
- Foreign Key Violations: Ensure proper order when creating/dropping tables
- Index Conflicts: Use IF NOT EXISTS for index creation
- Transaction Deadlocks: Keep transactions short and avoid nested transactions
Debugging Tools¶
def debug_schema_state(connection, schema_definitions):
"""Debug current schema state"""
schema_manager = SchemaManager(connection, schema_definitions)
print("=== Schema Debug Information ===")
print(f"Current Version: {schema_manager.get_current_version()}")
print(f"Available Versions: {list(schema_definitions.keys())}")
# Migration history
history = schema_manager.get_migration_history()
print(f"Migration History: {len(history)} entries")
for entry in history[-5:]: # Last 5 migrations
print(f" - Version {entry['version']} applied at {entry['applied_at']}")
# Schema validation
validation = schema_manager.validate_schema()
print(f"Schema Valid: {validation['valid']}")
if not validation['valid']:
print(f"Validation Issues: {validation}")
# Table information
tables = connection.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
print(f"Tables: {[t[0] for t in tables]}")
This developer guide provides comprehensive coverage of AICO's schema management system, enabling developers to effectively implement and maintain database schemas in the local-first, plugin-extensible architecture.