Data Layer Architecture¶
AICO's data layer implements Clean Architecture principles with a multi-database approach optimized for different data access patterns.
Database Architecture¶
Production Databases: - ✅ PostgreSQL 18.1 (Docker): Core application data (users, conversations, knowledge graph, agency) - ✅ Loki 2.9 (Docker): Log aggregation with LogQL queries - ✅ InfluxDB 2.x (Pro/Enterprise) (Docker): Time-series metrics (performance data, telemetry) - ✅ ChromaDB: Vector embeddings for semantic search - ✅ LMDB: Working memory with 30-day TTL
Planned: - ⏳ DuckDB: Analytics engine for conversation analysis
Architecture Principles¶
Clean Architecture Layers¶
- Domain Models: Pure business logic (Pydantic models)
- Repositories: Data access abstraction
- UnitOfWork: Transaction and session management
- Database Adapters: Technology-specific implementations
Database Specialization¶
Each database serves a specific purpose optimized for its workload: - PostgreSQL: ACID transactions, referential integrity, relational data - Loki: Purpose-built log aggregation, LogQL queries, label-based indexing - InfluxDB: High-performance time-series metrics, automatic downsampling - ChromaDB: Vector similarity search, semantic retrieval - LMDB: Fast key-value working memory, sub-millisecond access
PostgreSQL - Core Application Data¶
Purpose: Transactional application data requiring ACID guarantees
Deployment: Docker container (postgres:18.1) with persistent volume
Schema Organization:
Data Stored:
- Users & Auth: user_profiles, auth_sessions, auth_user_credentials
- Conversations: conversations, messages, conversation_participants
- Knowledge Graph: kg_nodes, kg_edges, kg_node_properties
- Agency: agency_goals, agency_plans, agency_skill_executions
- Memory: semantic_memory_metadata, user_preferences, facts
Technology Stack: - PostgreSQL 18.1 in Docker container - asyncpg for async operations (backend API) - psycopg2 for sync operations (CLI tools) - SQLAlchemy Core for query building - Connection pooling via SQLAlchemy Engine
Backend API Pattern (Repository + UnitOfWork):
from aico.data.uow import UnitOfWork
async def create_conversation(user_id: str, title: str):
async with UnitOfWork() as uow:
conversation = await uow.conversations.create({
"user_id": user_id,
"title": title
})
await uow.commit()
return conversation
CLI Tools Pattern (Direct SQL):
from cli.utils.pg_connection import get_pg_connection
def list_users():
db = get_pg_connection()
cursor = db.cursor()
cursor.execute("SELECT uuid, full_name FROM aico_core.user_profiles")
return cursor.fetchall()
Loki - Log Aggregation¶
Purpose: Purpose-built log storage and querying
Deployment: Docker container (grafana/loki:2.9.0) with persistent volume
Data Organization:
Storage: Filesystem (BoltDB + filesystem chunks)
Retention: 30 days (configurable)
Index: Label-based (service, level, logger_prefix)
Data Stored: - Application Logs: Structured log events from all services - Labels: service, level, logger_prefix for fast filtering - Metadata: JSON-encoded metadata in log lines
Technology Stack: - Loki 2.9 in Docker container - requests library for HTTP push API - LogQL query language - Label-based indexing (not full-text)
Access Pattern:
import requests
import json
# Push logs to Loki
loki_url = "http://localhost:3100/loki/api/v1/push"
payload = {
"streams": [{
"stream": {
"service": "backend",
"level": "INFO",
"logger_prefix": "backend.api"
},
"values": [
[str(int(time.time() * 1e9)), "Log message | {\"metadata\": \"json\"}"]
]
}]
}
requests.post(loki_url, json=payload)
# Query logs with LogQL
query_url = "http://localhost:3100/loki/api/v1/query_range"
params = {
"query": '{service="backend", level="ERROR"}',
"limit": 100
}
response = requests.get(query_url, params=params)
CLI Access:
# Tail logs
aico logs tail --service backend --level ERROR --lines 50
# List logs
aico logs ls --service modelservice --limit 100
InfluxDB - Time-Series Metrics¶
Purpose: High-performance metrics and telemetry data
Deployment: Docker container (influxdb:2-alpine) with persistent volume
Data Organization:
Data Stored:
- System Metrics: CPU, memory, disk usage
- API Metrics: Request/response times, error rates
- Model Metrics: Inference latency, token counts
- Performance Data: Time-series telemetry
Technology Stack: - InfluxDB 2.x in Docker container - influxdb-client-python - OpenTelemetry instrumentation - Flux query language
Access Pattern:
from influxdb_client import InfluxDBClient, Point
client = InfluxDBClient(url="http://localhost:8086", token=token, org="aico")
write_api = client.write_api()
point = Point("api_request") \
.tag("endpoint", "/api/v1/conversation") \
.field("duration_ms", 45.2) \
.time(datetime.now(UTC))
write_api.write(bucket="aico_telemetry", record=point)
ChromaDB - Vector Embeddings¶
Purpose: Semantic search and similarity matching
Collections:
- conversation_segments: Conversation history with embeddings
- kg_node_embeddings: Knowledge graph entity embeddings
- kg_edge_embeddings: Relationship embeddings
Technology Stack: - ChromaDB (persistent mode) - Sentence-transformers via modelservice - Cosine similarity search - Metadata filtering
Access Pattern:
import chromadb
client = chromadb.PersistentClient(path="/path/to/chroma")
collection = client.get_or_create_collection("conversation_segments")
# Store with embeddings
collection.upsert(
ids=["msg_123"],
embeddings=[[0.1, 0.2, ...]], # From modelservice
documents=["User message text"],
metadatas=[{"user_id": "uuid"}]
)
# Semantic search
results = collection.query(
query_embeddings=[[0.1, 0.2, ...]],
n_results=10
)
LMDB - Working Memory¶
Purpose: High-performance ephemeral storage for active conversation context
Features: - 30-day TTL for conversation continuity - Sub-millisecond read/write latency - Memory-mapped for performance - Named databases for different data types
Implementation:
- Database: session_memory
- Storage: /data/lmdb/
- Integration: WorkingMemoryStore in /shared/aico/ai/memory/working.py
- Coordination: PostgreSQL tracks LMDB sessions via session_metadata table
Repository Pattern¶
The Repository pattern abstracts data access, providing clean interfaces for domain operations.
Base Repository:
class Repository:
def __init__(self, connection, table):
self.connection = connection
self.table = table
async def get_by_id(self, id: str):
query = select(self.table).where(self.table.c.id == id)
result = await self.connection.execute(query)
return result.fetchone()
async def create(self, data: dict):
query = insert(self.table).values(**data).returning(self.table)
result = await self.connection.execute(query)
return result.fetchone()
Specialized Repository:
class ConversationRepository(Repository):
async def get_by_user(self, user_id: str):
query = select(self.table).where(self.table.c.user_id == user_id)
result = await self.connection.execute(query)
return result.fetchall()
UnitOfWork Pattern¶
Manages database transactions and repository lifecycle.
class UnitOfWork:
async def __aenter__(self):
self.connection = await get_async_connection()
self.transaction = await self.connection.begin()
return self
async def commit(self):
await self.transaction.commit()
@property
def conversations(self):
if not self._conversations:
self._conversations = ConversationRepository(self.connection)
return self._conversations
Usage:
async with UnitOfWork() as uow:
conversation = await uow.conversations.create(data)
await uow.commit()
Database Integration Flow¶
Write Flow (New conversation message): 1. LMDB: Store in working memory (immediate access) 2. PostgreSQL: Create metadata record with conversation_id, user_id, timestamp 3. Modelservice: Generate 768-dim embedding via sentence-transformers 4. ChromaDB: Store embedding with metadata for semantic search
Read Flow (Retrieve relevant context): 1. Query: User asks question 2. Modelservice: Generate query embedding 3. ChromaDB: Semantic search returns top-N similar segments 4. PostgreSQL: Fetch full metadata for returned segment IDs 5. LMDB: Check working memory for recent context 6. Fusion: Combine semantic + recency + relationship scores
Why This Architecture: - PostgreSQL: ACID guarantees for metadata, referential integrity for relationships - Loki: Purpose-built for logs, label-based indexing, efficient storage - InfluxDB: Optimized for time-series metrics, automatic retention policies - ChromaDB: Optimized for vector similarity search (cosine distance) - LMDB: Ultra-fast access for active conversations - Separation: Each database handles what it does best
Performance Characteristics¶
Resource Usage:
| Database | Read/Write | Use Case | Latency |
|---|---|---|---|
| PostgreSQL | High | Structured queries | ~1-10ms |
| Loki | Very High | Log writes | ~1-5ms |
| InfluxDB | Very High | Metrics writes | ~1-5ms |
| ChromaDB | High | Semantic search | ~10-50ms |
| LMDB | Very High | Working memory | <1ms |
Characteristics: - PostgreSQL: ACID transactions, connection pooling - Loki: Label-based indexing, LogQL queries, 70% storage reduction vs InfluxDB - InfluxDB: High-throughput writes, automatic downsampling - ChromaDB: Cosine similarity, hybrid search (BM25 + semantic) - LMDB: Memory-mapped, multi-reader/single-writer
Best Practices¶
Database Selection¶
Choose the right database for your use case: - PostgreSQL: User data, conversations, knowledge graph, agency data - Loki: Application logs, structured logging, log queries - InfluxDB: Metrics, performance data, time-series telemetry - ChromaDB: Embeddings, semantic search, similarity matching - LMDB: Active session data, conversation context, temporary cache
Repository Usage¶
# ✅ Good - Repository pattern in backend
async with UnitOfWork() as uow:
user = await uow.users.get_by_id(user_id)
await uow.commit()
# ✅ Good - Direct SQL in CLI tools
db = get_pg_connection()
cursor = db.cursor()
cursor.execute("SELECT * FROM aico_core.user_profiles")
Transaction Management¶
# ✅ Good - Explicit commit for writes
async with UnitOfWork() as uow:
await uow.conversations.create(data)
await uow.commit()
# ✅ Good - No commit for read-only
async with UnitOfWork() as uow:
conversations = await uow.conversations.list_all()
return conversations
Docker Deployment¶
PostgreSQL, Loki, and InfluxDB run in Docker containers for consistent deployment.
Deployment Commands:
# Deploy PostgreSQL
aico deploy pg
# Deploy Loki (log aggregation)
aico deploy loki
# Deploy InfluxDB (metrics)
aico deploy influx
# Reset databases (⚠️ DANGEROUS - deletes all data)
aico deploy pg --nuke
aico deploy loki --nuke
aico deploy influx --nuke
Container Details:
- PostgreSQL: postgres:18.1 on port 5432
- Loki: grafana/loki:2.9.0 on port 3100
- InfluxDB: influxdb:2-alpine on port 8086
- Persistent volumes for data storage
- Docker bridge network for inter-container communication
See Getting Started Guide for detailed deployment instructions.
Related Documentation¶
- Architecture Overview - System architecture
- Tech Stack - Technology decisions
- Backend Patterns - Repository/UnitOfWork patterns
- Getting Started - Database deployment