Skip to content

Memory System Architecture

Overview

AICO's memory system implements a sophisticated four-tier architecture designed for local-first operation on consumer hardware. The system balances comprehensive relationship intelligence with performance efficiency, avoiding external dependencies while maintaining state-of-the-art conversational AI capabilities.

V2 Architecture: Queue-Based Semantic Processing

CRITICAL UPDATE: The semantic memory layer has been completely redesigned to address cascade failure issues and implement modern async patterns.

Key Improvements

  • SemanticRequestQueue: Modern async queue with circuit breaker and rate limiting
  • Controlled Concurrency: Semaphore-based request management (max 2 concurrent)
  • Batch Processing: Intelligent batching for 600x performance improvement
  • Circuit Breaker: Automatic failure detection and recovery (3 failures → 15s timeout)
  • Rate Limiting: Token bucket algorithm (3 requests/second) prevents overload
  • Graceful Degradation: Fallback mechanisms maintain user experience

Performance Characteristics

Metric V1 (Fire-and-forget) V2 (Queue-based) Improvement
Response Time 1.8-4.7s 1-3s 40% faster
Concurrent Safety Cascade failures Controlled limits 100% reliable
Embedding Throughput 1 req/60s 5 batch/0.5s 600x faster
Failure Recovery Manual restart 15s automatic Fully automated

Memory Architecture: Three Tiers

1. Working Memory

Purpose: Real-time conversation state and immediate context management

Implementation: - Storage: LMDB (Lightning Memory-Mapped Database) for high-performance key-value operations - Scope: Recent conversation history, scoped by conversation_id - Lifecycle: TTL-based expiration (24 hours default) - Performance: Sub-millisecond access, memory-mapped files - Dual Role: Serves both immediate context AND conversation history (no separate episodic tier needed)

Data Structures:

# Messages stored with key pattern: {conversation_id}:{timestamp}
# Retrieved by conversation_id prefix scan
{
    "conversation_id": "user123_1699123456",
    "user_id": "user123",
    "role": "user" | "assistant",
    "content": "message text",
    "timestamp": "2025-11-05T15:30:00Z"
}

Responsibilities: - Store all conversation messages per conversation_id - Provide fast retrieval for context assembly - Automatic expiration of old messages (24hr TTL) - Cross-conversation message queries for semantic memory - Serve as conversation history store (replaces traditional "episodic memory")

Purpose: Long-term knowledge storage with semantic search and graph relationships

V3 Implementation:

A. Conversation Segments (ChromaDB) - Storage: ChromaDB with cosine similarity - Search: Hybrid search combining semantic similarity with BM25 keyword matching - Fusion: Reciprocal Rank Fusion (RRF) for robust score combination - Filtering: IDF-based term filtering (min_idf=0.6) + semantic relevance thresholds (min_score=0.35) - Embeddings: paraphrase-multilingual (768-dim) via Ollama modelservice - Scope: Conversation chunks scoped by conversation_id and user_id - Performance: Full-corpus BM25 for accurate IDF statistics

B. Knowledge Graph (ChromaDB + libSQL) - Storage: Hybrid ChromaDB (vectors) + libSQL (relational queries) - Extraction: Multi-pass with GLiNER (entities) + LLM (relationships) - Entity Resolution: 3-step process (semantic blocking → LLM matching → LLM merging) - Graph Fusion: Conflict resolution, temporal updates, canonical IDs - Schema: kg_nodes, kg_edges, kg_node_properties, kg_edge_properties (with triggers) - Production Status: 204 nodes, 27 edges, 552 indexed properties - Scope: Cross-conversation knowledge accumulation - Integration: Tightly coupled with semantic memory for fact extraction and retrieval

Hybrid Search Pipeline:

# Three-stage retrieval pipeline
def query_semantic_memory(query_text: str, user_id: str):
    # Stage 1: Full corpus retrieval for proper IDF statistics
    all_documents = collection.query(
        query_embeddings=[embedding],
        n_results=collection.count()  # CRITICAL: Full corpus
    )

    # Stage 2: Dual scoring (semantic + BM25 with IDF filtering)
    scored_docs = calculate_scores(
        documents=all_documents,
        query_text=query_text,
        min_idf=0.6  # Filter common terms
    )

    # Stage 3: RRF fusion with semantic relevance filtering
    results = fuse_with_rrf(
        scored_docs,
        k=60,  # Adaptive rank constant
        min_semantic_score=0.35  # Relevance threshold
    )

    return results[:max_results]

Key Features: - ✅ Full corpus BM25: Accurate IDF calculation on all documents - ✅ IDF term filtering: Removes overly common words (min_idf=0.6) - ✅ Semantic threshold: Filters irrelevant results (min_semantic_score=0.35) - ✅ RRF fusion: Industry-standard rank-based combination - ✅ Configurable: All thresholds tunable via config

For detailed hybrid search documentation, see Hybrid Search Guide.

Multi-Threading Architecture:

# Thread Pool Strategy
class SemanticRequestQueue:
    def __init__(self):
        # I/O bound operations (network requests)
        self._thread_pool = ThreadPoolExecutor(max_workers=32)

        # CPU bound operations (large batch processing)
        self._cpu_intensive_pool = ProcessPoolExecutor(max_workers=4)

    async def _process_embedding_batch(self, batch):
        batch_size = len(batch)

        if batch_size >= 10:  # Large batches
            # Use process pool for CPU-intensive work
            return await loop.run_in_executor(
                self._cpu_intensive_pool,
                self._process_large_batch, batch
            )
        elif batch_size >= 3:  # Medium batches
            # Use thread pool for I/O bound work
            tasks = [loop.run_in_executor(
                self._thread_pool, self._process_single, item
            ) for item in batch]
            return await asyncio.gather(*tasks)
        else:  # Small batches
            # Use regular async processing
            return await self._async_process(batch)

Hardware Utilization Analysis: - CPU Cores: Automatically detects os.cpu_count() and scales thread pools - Thread Pool: 32 threads for I/O bound operations (network, disk) - Process Pool: 4 processes for CPU-intensive batch processing - Memory Efficiency: Bounded queues prevent memory exhaustion - Cache Optimization: Thread-local storage for embedding model caching

Data Structures:

@dataclass
class SemanticFact:
    id: str
    user_id: str
    fact_type: str  # 'preference', 'knowledge', 'relationship'
    content: str
    confidence: float
    source_episodes: List[str]  # episode IDs that support this fact
    embedding: np.ndarray
    created_at: datetime
    last_confirmed: datetime
    contradiction_count: int

Vector Collections: - User Preferences: Food, activities, communication style, values - Factual Knowledge: Personal information, relationships, history - Domain Knowledge: Work context, technical interests, hobbies - Communication Patterns: Successful interaction strategies

Embedding Model Architecture:

# Unified model access via modelservice (DRY principle)
class EmbeddingService:
    """Abstraction layer for embedding generation via modelservice."""

    def __init__(self, config: ConfigurationManager):
        self.modelservice_client = ModelserviceClient()
        self.model_config = config.get("memory.semantic", {})
        self.embedding_model = self.model_config.get("embedding_model", "paraphrase-multilingual")

    async def generate_embeddings(self, texts: List[str]) -> List[np.ndarray]:
        """Generate embeddings via modelservice/Ollama."""
        embeddings = []
        for text in texts:
            response = await self.modelservice_client.generate_embeddings(
                model=self.embedding_model,
                prompt=text
            )
            embeddings.append(np.array(response.embedding))
        return embeddings

Model Selection Strategy: - Primary: paraphrase-multilingual (278M parameters, multilingual) - Fallback: all-minilm (22M parameters, English-focused, faster) - Auto-management: Ollama handles model downloads and lifecycle - Local-first: No external API dependencies

V2 Responsibilities: - Controlled Processing: Queue-based request management with circuit breaker - Batch Optimization: Intelligent batching for 600x performance improvement - Hardware Utilization: Multi-threading for CPU and I/O bound operations - Graceful Degradation: Fallback mechanisms maintain user experience - Monitoring: Real-time performance metrics and health monitoring - Store and retrieve user preferences and facts - Enable semantic search across conversation history - Maintain knowledge consistency and conflict resolution - Support cross-conversation learning and adaptation - Generate embeddings via unified modelservice interface

Performance Characteristics: - Throughput: 1000+ messages/second queue processing - Latency: <100ms queue processing overhead - Concurrency: 2-32 configurable workers - Batch Efficiency: 5-10 requests per batch - Circuit Recovery: 15-30 seconds automatic recovery - Thread Utilization: Scales with available CPU cores

3. Behavioral Learning (Planned)

Purpose: Learn user interaction patterns, preferences, and behavioral adaptation

Planned Implementation: - Storage: libSQL for fast pattern queries (no embeddings needed) - Scope: User-level behavioral patterns across all conversations - Learning: Continuous observation and pattern extraction - Adaptation: Real-time response style and content adjustment

Pattern Types:

# Interaction patterns to learn
- Response length preferences (by time of day, topic, context)
- Topic interests and avoidances
- Communication style preferences (formal/casual, brief/detailed)
- Engagement signals (follow-up questions, topic changes)
- Successful conversation patterns
- Time-of-day behavioral patterns

Data Structures:

@dataclass
class UserPattern:
    pattern_id: str
    user_id: str
    pattern_type: str  # 'response_length', 'topic_preference', 'time_of_day'
    pattern_data: Dict[str, Any]
    confidence: float  # Based on observation frequency
    observation_count: int
    last_observed: datetime
    created_at: datetime

Responsibilities: - Track user interaction patterns and preferences - Learn optimal response styles and timing - Enable adaptive personalization - Provide conversation quality metrics - Support proactive engagement strategies

Model Management Architecture

Unified Model Service Integration

AICO's memory system integrates with the existing modelservice architecture to maintain DRY principles and provide a single, consistent interface for all AI model interactions.

class ModelServiceIntegration:
    """Unified interface to modelservice for all AI model needs."""

    def __init__(self, config: ConfigurationManager):
        self.config = config
        self.modelservice_client = ModelserviceClient()

        # Model configurations from unified config
        self.llm_model = config.get("core.modelservice.default_models.conversation", "hermes3:8b")
        self.embedding_model = config.get("memory.semantic.embedding_model", "paraphrase-multilingual")

    async def generate_embeddings(self, texts: List[str]) -> List[np.ndarray]:
        """Generate embeddings via modelservice/Ollama."""
        embeddings = []
        for text in texts:
            response = await self.modelservice_client.generate_embeddings(
                model=self.embedding_model,
                prompt=text
            )
            embeddings.append(np.array(response.embedding))
        return embeddings

    async def ensure_models_available(self) -> bool:
        """Ensure required models are available via Ollama."""
        try:
            # Check if embedding model is available
            models_response = await self.modelservice_client.list_models()
            available_models = [m.name for m in models_response.models]

            if self.embedding_model not in available_models:
                # Request model download via modelservice
                await self.modelservice_client.pull_model(self.embedding_model)

            return True
        except Exception as e:
            logger.error(f"Failed to ensure models available: {e}")
            return False

Ollama Model Management Strategy

Embedding Models Supported by Ollama: - ✅ paraphrase-multilingual (278M) - Primary choice for multilingual support - ✅ all-minilm (22M/33M) - Lightweight fallback option
- ✅ bge-m3 (567M) - Advanced multilingual option - ✅ mxbai-embed-large (335M) - High-performance English option - ✅ nomic-embed-text - Large context window option

Why Ollama vs. HuggingFace/PyTorch: 1. Consistency: Same management system as LLM models 2. Simplicity: No additional PyTorch/transformers dependencies 3. Performance: Ollama's optimized inference engine 4. Local-first: Unified local model management 5. DRY Compliance: Single modelservice interface for all models

System Integration

Memory Manager Coordination

class AICOMemoryManager:
    """Unified memory management with coordinated access across all tiers"""

    def __init__(self, config: MemoryConfig):
        # Storage backends
        self.working_memory = LMDBStore(config.working_memory_path)  # Conversation history + context
        self.semantic_store = ChromaDBStore(config.semantic_db_path)  # Segments + KG
        self.procedural_store = LibSQLStore(config.procedural_db_path)  # User patterns (planned)

        # Unified model service integration
        self.model_service = ModelServiceIntegration(config)

        # Coordination components
        self.context_assembler = ContextAssembler()
        self.relevance_scorer = RelevanceScorer()
        self.conflict_resolver = ConflictResolver()

    async def assemble_context(self, user_id: str, message: str) -> ConversationContext:
        """Coordinate memory retrieval across all tiers with unified model service."""
        # 1. Get working memory (conversation history + immediate context)
        working_ctx = await self.working_memory.get_conversation_context(user_id)

        # 2. Generate embeddings for semantic search via modelservice
        message_embeddings = await self.model_service.generate_embeddings([message])
        message_embedding = message_embeddings[0]

        # 3. Get semantic knowledge using hybrid search (segments + KG)
        semantic_results = await self.semantic_store.query_hybrid(
            message, message_embedding, user_id, threshold=0.4
        )

        # 4. Get knowledge graph facts
        kg_facts = await self.semantic_store.query_knowledge_graph(
            user_id, entities=semantic_results.entities
        )

        # 5. Apply procedural patterns (when implemented)
        interaction_patterns = await self.procedural_store.get_user_patterns(user_id)

        # 6. Assemble and optimize context
        return self.context_assembler.build_context(
            working_ctx, semantic_results, kg_facts, interaction_patterns
        )

Cross-Tier Communication

Memory Update Pipeline: 1. Real-time Updates: Working memory receives and stores all conversation messages 2. Semantic Extraction: Background process extracts segments and KG facts from conversations 3. Pattern Learning: Procedural memory updated based on interaction outcomes (planned) 4. Memory Consolidation: Periodic cleanup and optimization of stored data

Consistency Management: - Conflict Detection: Identify contradictory information across memory tiers - Confidence Scoring: Weight information based on recency and source reliability - Resolution Strategies: Automated conflict resolution with user confirmation when needed

Performance Characteristics

Resource Usage

Memory Footprint: - Working Memory (LMDB): 50-100MB (conversation history + context) - Semantic Memory (ChromaDB): 200-500MB (segments + KG embeddings) - Database Connections: 20-50MB - Total: ~300-650MB typical usage

CPU Usage: - Context Assembly: 10-50ms per message - Vector Similarity: 5-20ms per query - Database Operations: 1-10ms per query - Background Processing: 5-10% CPU during idle

Storage Requirements: - Working Memory: ~1MB per 1000 conversation turns (24hr TTL) - Semantic Segments: ~500MB per 100,000 segments (with embeddings) - Knowledge Graph: ~10MB per 1000 nodes + edges - Procedural Patterns: ~1MB per user (planned)

Optimization Strategies

Lazy Loading: - Load context components only when needed - Cache frequently accessed patterns - Preload based on conversation patterns

Batch Operations: - Group database writes to reduce I/O - Batch vector embedding generation - Periodic memory consolidation

Adaptive Performance: - Scale context depth based on available resources - Reduce embedding dimensions on low-end hardware - Defer non-critical operations during high load

Configuration and Deployment

Model Configuration Strategy

# Unified configuration in core.yaml
core:
  modelservice:
    default_models:
      conversation: "hermes3:8b"
      embedding: "paraphrase-multilingual"  # Primary multilingual model
      embedding_fallback: "all-minilm"      # Lightweight fallback
    ollama:
      auto_install: true
      auto_pull_models: true
      host: "127.0.0.1"
      port: 11434

memory:
  semantic:
    embedding_model: "paraphrase-multilingual"
    dimensions: 768  # Matches paraphrase-multilingual output
    fallback_model: "all-minilm"
    fallback_dimensions: 384
    auto_model_download: true

    # Hybrid search configuration (V3)
    fusion_method: "rrf"  # "rrf" (recommended) or "weighted" (legacy)
    rrf_rank_constant: 0  # 0 = adaptive, or 10-60 manual
    bm25_min_idf: 0.6     # Minimum IDF threshold for query term filtering
    min_semantic_score: 0.35  # Minimum semantic score for relevance filtering
    min_similarity: 0.4   # Minimum hybrid score for final results

    # Legacy weighted fusion (if fusion_method="weighted")
    semantic_weight: 0.7  # Weight for semantic similarity
    bm25_weight: 0.3      # Weight for BM25 score

Deployment Considerations

Model Download Strategy: 1. Automatic: Models downloaded on first use via Ollama 2. LLM vs Embedding: LLM models are "started" (kept in memory), embedding models are loaded on-demand 3. Validation: Verify model availability before semantic operations 4. Caching: Models persist locally after download

Resource Requirements: - paraphrase-multilingual: ~278MB disk, ~512MB RAM during inference - all-minilm: ~22MB disk, ~128MB RAM during inference - Ollama overhead: ~200MB base memory usage

Local-First Design Principles

No External Dependencies

  • All processing happens locally
  • No cloud services or external APIs required
  • Embedded databases with no server processes
  • Offline-capable with full functionality
  • Model Privacy: All embedding generation happens locally via Ollama

Privacy-First Architecture

  • All personal data encrypted at rest
  • User-specific encryption keys
  • No data transmission outside device
  • Granular privacy controls

Hardware Efficiency

  • Optimized for consumer-grade hardware
  • Configurable resource limits
  • Graceful degradation on resource constraints
  • Battery-aware processing modes

Implementation Location and Integration

The memory system is implemented as a shared AI module at shared/aico/ai/memory/, following AICO's established patterns and guidelines:

shared/aico/ai/memory/
├── __init__.py          # Module exports and public interface
├── manager.py           # MemoryManager - central coordinator extending BaseAIProcessor
├── working.py           # WorkingMemoryStore - LMDB conversation history + context
├── semantic.py          # SemanticMemoryStore - ChromaDB segments with hybrid search
├── procedural.py        # ProceduralMemoryStore - libSQL user patterns (planned)
├── context/             # Context assembly module
│   ├── assembler.py     # ContextAssembler - cross-tier coordination
│   ├── retrievers.py    # Memory tier retrievers
│   ├── scorers.py       # Relevance scoring
│   └── graph_ranking.py # KG-based context ranking
├── fusion.py            # Hybrid search fusion (RRF, weighted)
├── bm25.py              # BM25 keyword ranking
└── memory_album.py      # Memory consolidation and browsing (planned)

Architecture Integration Patterns

Configuration Management: - Uses ConfigurationManager for hierarchical configuration following AICO patterns - Memory-specific settings under memory.* configuration namespace - Supports environment-specific and user-specific configuration overrides

Database Integration: - Reuses EncryptedLibSQLConnection for persistent storage components - Follows existing schema management and migration patterns - Maintains encryption-at-rest for all persistent data using AICO's key management

AI Processing Pipeline: - MemoryManager extends BaseAIProcessor for seamless message bus integration - Processes ProcessingContext objects from AI coordination system - Returns ProcessingResult with assembled memory context for downstream processors

Frontend Integration: - Flutter frontend accesses memory functionality through REST API endpoints - Backend exposes memory operations via API Gateway following existing patterns - Maintains clean separation between frontend and Python shared modules

Message Bus Communication: - Memory operations publish to memory.* topics for loose coupling - Subscribes to conversation events for real-time context updates - Integrates with existing message routing and processing infrastructure

This architecture provides the foundation for AICO's sophisticated memory capabilities while maintaining the local-first, privacy-first principles essential to the companion's design philosophy.