Conversation System Architecture¶
Overview β ¶
The AICO conversation system provides REST API endpoints for managing conversation threads and processing user messages. It features automatic thread resolution, real-time WebSocket delivery, and integration with AI services for response generation.
Current Implementation: Basic conversation API with ThreadManager for automatic thread resolution and direct LLM integration.
Core Principles β ¶
- Thread-Based Organization: Conversations organized into threads with automatic resolution
- REST API Design: Clean separation between thread management and message processing
- Real-time Delivery: WebSocket support for immediate AI response delivery
- Local Processing: All conversation data remains local with encrypted storage
- Extensible Design: Plugin architecture allows for conversation capability extensions
Thread Architecture β ¶
Current Implementation¶
AdvancedThreadManager Service: - Multi-factor thread resolution (semantic + temporal + behavioral + intent-based) - Vector similarity engine using AICO embeddings for semantic thread matching - Advanced intent classification using XLM-RoBERTa transformer model - Entity extraction and context continuity analysis - Robust fallback chain with graceful degradation - UUID-based thread identifiers with confidence scoring
Thread Resolution Logic:
class ThreadResolution:
thread_id: str
action: str # "continued", "created", "reactivated"
confidence: float
reasoning: str
created_at: Optional[datetime]
Thread States: - Active: Recent activity with high semantic continuity - Dormant: No activity or low semantic similarity to recent messages - Created: New thread based on intent shift or topic boundary detection - Branched: New thread created from specific message with semantic clustering - Reactivated: Dormant thread resumed based on semantic similarity
Multi-Factor Scoring: - Semantic Similarity: Vector similarity between message embeddings - Temporal Continuity: Exponential decay based on time gaps - Intent Alignment: Intent classification consistency across messages - Entity Overlap: Named entity recognition and continuity - Conversation Flow: Boundary detection and topic shift analysis - User Pattern Match: Behavioral pattern recognition
User Management π§¶
Current Implementation¶
JWT-Based Authentication:
- User identification via JWT tokens
- user_uuid
extracted from token for thread association
- Single-user model (no multi-user recognition)
Thread-User Association:
- Threads associated with user_id
from JWT token
- Basic access validation in ThreadManager
- No complex user relationship modeling
Planned Features: - Multi-user family recognition - Voice biometrics integration - Behavioral pattern analysis - Relationship context modeling
Current API Implementation β ¶
REST Endpoints¶
Primary Endpoint:
- POST /api/v1/conversation/messages
- Unified message processing with auto-thread resolution
Advanced Endpoints:
- POST /api/v1/conversation/threads
- Explicit thread creation
- POST /api/v1/conversation/threads/{thread_id}/messages
- Send to specific thread
- GET /api/v1/conversation/threads/{thread_id}
- Get thread metadata
- GET /api/v1/conversation/threads
- List user threads
Real-time:
- WebSocket /api/v1/conversation/ws/{thread_id}
- Real-time AI responses
Message Processing Flow β ¶
Current Implementation:
1. REST Endpoint receives user message
2. ThreadManager resolves appropriate thread (continue existing or create new)
3. Message Bus publishes to conversation/user/input
topic
4. ModelService Client generates LLM response directly
5. Response returned immediately with AI reply
6. WebSocket delivers real-time updates (if connected)
LLM Integration:
# Direct modelservice integration
modelservice_client = get_modelservice_client(config_manager)
llm_response = await modelservice_client.get_completions(
model="hermes3:8b",
prompt=request.message
)
AI Processing π§¶
Current State: - Direct LLM Integration: Simple prompt-response via modelservice - No Complex AI Coordination: Single LLM call without emotion/personality processing - Basic Error Handling: Fallback messages on LLM failures
Planned Enhancements: - Emotion Recognition: User emotional state analysis - Personality Integration: AICO personality traits application - Memory Retrieval: Context from conversation history - Parallel Processing: Multiple AI systems coordination - Context Assembly: Rich context from multiple sources
Context Management¶
Context Types¶
- Thread Context: Current conversation state and history
- User Context: Individual user relationship and preferences
- Emotional Context: Current emotional state and mood
- Personality Context: Active personality traits and behavioral parameters
- Memory Context: Relevant episodic and semantic memories
- Environmental Context: Time, location, device, and situational factors
Context Integration Strategy¶
Context integration is critical for maintaining conversation coherence and relationship continuity. The system assembles context from multiple sources to provide AI systems with comprehensive situational awareness.
Context Sources: - Thread Context: Conversation history, topic evolution, turn patterns - User Context: Individual preferences, communication style, relationship dynamics - Emotional Context: Current emotional state, mood trends, emotional history - Personality Context: Active personality traits, behavioral parameters, expression style - Memory Context: Relevant episodic memories, learned preferences, relationship milestones - Environmental Context: Time of day, location, device context, situational factors
Integration Process: 1. Context Assembly: Gather context from all available sources 2. Relevance Filtering: Prioritize context based on current conversation topic 3. Context Weighting: Apply importance weights based on recency and relevance 4. Context Compression: Optimize context size for AI system consumption 5. Context Validation: Ensure context consistency and remove conflicts
Message Bus Integration¶
Topic Structure¶
Following the standardized pattern: {domain}/{action}/{type}
Core Conversation Topics:
conversation/thread/created
conversation/thread/updated
conversation/message/received
conversation/response/generated
conversation/context/updated
conversation/memory/stored
AI Processing Topics:
ai/emotion/analyze/request
ai/emotion/analyze/response
ai/personality/express/request
ai/personality/express/response
ai/memory/retrieve/request
ai/memory/retrieve/response
ai/llm/generate/request
ai/llm/generate/response
Message Processing Flow¶
The conversation system uses a message-driven architecture where the REST endpoint publishes to the message bus and background systems handle all AI processing:
Flow Sequence:
1. REST Endpoint: Receives user message, resolves thread, publishes to conversation/user/input
2. Conversation Engine: Subscribes to user input, coordinates AI processing pipeline
3. AI Systems: Process emotion, personality, memory, and LLM generation in parallel
4. Response Assembly: Conversation engine assembles final response
5. Response Delivery: Published to conversation/ai/response
for client consumption
Message Bus Topics:
- conversation/user/input
- User messages from REST layer
- conversation/ai/response
- AI responses for client delivery
- conversation/thread/created
- New thread notifications
- conversation/context/updated
- Thread context changes
Persistence Architecture¶
Database Schema¶
The conversation system uses a normalized database schema optimized for thread retrieval, message history, and metadata queries. All sensitive data is encrypted at rest using AES-256-GCM.
Core Tables: - conversation_threads: Thread lifecycle and ownership - conversation_messages: Complete message history with turn tracking - thread_metadata: Conversation topic, phase, and analytics
Schema Design Principles: - Normalized Structure: Separate concerns for optimal query performance - Encrypted Content: All user content encrypted with user-specific keys - Audit Trail: Complete timestamp tracking for all operations - Flexible Metadata: JSON fields for extensible conversation analytics - Foreign Key Integrity: Referential integrity across all relationships
Memory Integration¶
The conversation system integrates with AICO's comprehensive memory system to maintain long-term relationship context and learned preferences. Memory formation happens automatically during conversation processing.
Memory Types: - Episodic Memory: Specific conversation events, shared experiences, emotional moments - Semantic Memory: Learned facts about users, preferences, relationship dynamics - Procedural Memory: Communication patterns, successful interaction strategies
Memory Formation Process: 1. Automatic Extraction: Identify significant conversation moments during processing 2. Context Enrichment: Add emotional, temporal, and relational context to memories 3. Vector Embedding: Generate semantic embeddings for similarity-based retrieval 4. Memory Classification: Categorize memories by type, importance, and privacy level 5. Storage Integration: Persist memories with conversation thread references
Memory Retrieval Strategy: - Semantic Search: Vector similarity matching for relevant memory retrieval - Temporal Filtering: Weight recent memories higher than older ones - Relationship Context: Prioritize memories specific to current user relationship - Privacy Boundaries: Respect user-specific memory access controls
Automatic Thread Management¶
Modern 2025 Approach¶
The conversation system implements intelligent, automatic thread management that eliminates manual thread switching while maintaining natural conversation flow. This approach uses semantic analysis, temporal context, and user behavior patterns to seamlessly manage conversation continuity.
Core Principles¶
Semantic Thread Detection: - Topic Segmentation: Automatically detect topic shifts using semantic similarity analysis - Intent Classification: Identify when user intent changes significantly from current thread context - Contextual Continuity: Maintain thread when topics are related or build upon previous discussion - Natural Boundaries: Recognize natural conversation breakpoints (greetings, farewells, major topic shifts)
Temporal Context Management: - Session Boundaries: Automatically create new threads after extended inactivity periods - Daily Rhythms: Respect natural daily conversation patterns (morning greetings, evening check-ins) - Context Decay: Gradually reduce thread relevance as time passes without interaction - Reactivation Logic: Intelligently reactivate dormant threads when topics resurface
User Behavior Integration: - Communication Patterns: Learn individual user preferences for conversation continuity - Context Switching Tolerance: Adapt to user comfort level with topic jumping vs. linear conversation - Explicit Override Support: Allow manual thread specification when automatic detection isn't desired - Feedback Learning: Improve thread management based on user corrections and preferences
ThreadManager Service Architecture¶
Core Components:
class ThreadManager:
def __init__(self):
self.semantic_analyzer = SemanticAnalyzer()
self.temporal_analyzer = TemporalAnalyzer()
self.user_behavior_analyzer = UserBehaviorAnalyzer()
self.thread_cache = ThreadCache()
self.decision_engine = ThreadDecisionEngine()
Decision Engine Logic: 1. Semantic Analysis: Compare incoming message with active thread topics using vector similarity 2. Temporal Analysis: Evaluate time gaps and conversation rhythm patterns 3. Behavioral Analysis: Consider user-specific conversation preferences and patterns 4. Context Evaluation: Assess thread relevance and continuation appropriateness 5. Decision Synthesis: Combine all factors to make thread continuation or creation decision
Implementation Strategy¶
Semantic Similarity Detection: - Vector Embeddings: Generate semantic embeddings for all messages using modern language models - Similarity Thresholds: Dynamic thresholds based on conversation context and user preferences - Topic Modeling: Identify conversation topics using clustering and classification techniques - Context Windows: Analyze message context within configurable conversation windows
Temporal Pattern Recognition: - Activity Patterns: Learn user's typical conversation timing and frequency patterns - Session Detection: Identify natural conversation session boundaries - Dormancy Thresholds: Configure appropriate timeouts for thread dormancy - Reactivation Triggers: Detect when dormant threads should be reactivated
User Preference Learning: - Implicit Feedback: Learn from user behavior patterns and conversation flow - Explicit Feedback: Allow users to correct thread management decisions - Preference Profiles: Build individual user profiles for thread management preferences - Adaptive Algorithms: Continuously improve thread decisions based on user interactions
Thread Decision Matrix¶
Continue Existing Thread When: - Semantic similarity > 0.7 with recent messages - Time gap < user's typical conversation pause duration - Topic relates to ongoing thread discussion - User explicitly references previous context - No clear conversation boundary markers detected
Create New Thread When: - Semantic similarity < 0.4 with active thread - Time gap > dormancy threshold (default: 2 hours) - Clear topic shift detected (greetings, new subjects) - User indicates new conversation ("Let's talk about something else") - Maximum thread length reached (configurable limit)
Reactivate Dormant Thread When: - High semantic similarity with dormant thread content - User references specific past conversation elements - Temporal patterns suggest natural conversation resumption - Explicit thread reference or continuation cues detected
Integration with Conversation Flow¶
Thread Manager Integration:
@router.post("/threads", response_model=ThreadResponse)
async def create_thread(
request: ThreadCreateRequest,
current_user = Depends(get_current_user),
thread_manager = Depends(get_thread_manager)
):
# Create new thread with automatic context detection
thread_decision = await thread_manager.create_or_resolve_thread(
user_id=current_user['user_uuid'],
initial_message=request.initial_message,
context=request.context
)
return ThreadResponse(
success=True,
thread_id=thread_decision.thread_id,
status="created",
created_at=thread_decision.created_at
)
@router.post("/threads/{thread_id}/messages", response_model=MessageResponse)
async def send_message(
thread_id: str,
request: MessageSendRequest,
current_user = Depends(get_current_user),
message_service = Depends(get_message_service)
):
# Validate and process message
message_result = await message_service.send_message(
thread_id=thread_id,
user_id=current_user['user_uuid'],
message=request.message,
metadata=request.metadata
)
return MessageResponse(
success=True,
message_id=message_result.message_id,
thread_id=thread_id,
status="processing",
timestamp=message_result.timestamp
)
Thread Resolution Process: 1. Input Analysis: Analyze incoming message for semantic content and intent 2. Active Thread Evaluation: Check if active threads are suitable for continuation 3. Dormant Thread Scanning: Search dormant threads for potential reactivation 4. Decision Calculation: Apply decision matrix to determine optimal thread choice 5. User Notification: Transparently inform user of thread management decisions when appropriate
Advanced Features¶
Multi-Thread Awareness: - Parallel Conversations: Support multiple active conversation threads simultaneously - Thread Prioritization: Rank threads by relevance, recency, and user engagement - Context Bleeding Prevention: Ensure thread isolation while allowing beneficial context sharing - Thread Merging: Intelligently merge related threads when appropriate
Proactive Thread Management: - Thread Suggestions: Suggest thread reactivation based on environmental triggers - Context Bridging: Help users transition between related conversation threads - Thread Summarization: Provide thread summaries when reactivating dormant conversations - Relationship Threading: Maintain separate threads for different relationship contexts
Performance Optimization: - Lazy Loading: Load thread context only when needed for decision making - Caching Strategy: Cache frequently accessed thread metadata and decision factors - Batch Processing: Process multiple thread decisions efficiently - Background Analysis: Continuously analyze conversation patterns in background
User Experience Design¶
Transparency Principles: - Invisible When Working: Thread management should be seamless and unnoticed - Visible When Needed: Clearly communicate thread decisions when user awareness is beneficial - User Control: Always allow manual override of automatic thread decisions - Learning Feedback: Provide mechanisms for users to train the system
Notification Strategy: - Subtle Indicators: Use UI elements to indicate thread context without interrupting flow - Transition Smoothing: Provide context bridges when switching between threads - History Access: Easy access to recent thread history and conversation context - Thread Visualization: Optional thread relationship visualization for power users
Configuration and Customization¶
System-Level Configuration:
thread_management:
semantic_similarity:
continuation_threshold: 0.7
creation_threshold: 0.4
embedding_model: "sentence-transformers/all-MiniLM-L6-v2"
temporal_analysis:
dormancy_threshold: "2h"
session_gap_threshold: "30m"
daily_reset_time: "06:00"
behavioral_learning:
adaptation_rate: 0.1
feedback_weight: 0.8
pattern_history_days: 30
User-Level Customization: - Thread Sensitivity: Adjust how easily new threads are created - Context Preferences: Configure desired level of conversation context retention - Notification Preferences: Control when and how thread decisions are communicated - Manual Override Frequency: Set preference for automatic vs. manual thread management
API Design¶
Separated Resource Architecture¶
The conversation API follows a clean resource-based design that separates thread management from message processing. This approach provides clear boundaries, better testability, and optimal performance characteristics.
Core Design Philosophy: - Resource Separation: Clear distinction between threads (conversation contexts) and messages (content) - Single Responsibility: Each endpoint has one clear, well-defined purpose - RESTful Design: Follows REST principles for resource creation, retrieval, and manipulation - Asynchronous Processing: All AI processing happens in background via message bus - Progressive Enhancement: Clients can build sophisticated conversation management incrementally
Thread Management Endpoints:
- POST /api/v1/conversation/threads
- Create new conversation thread
- GET /api/v1/conversation/threads/{thread_id}
- Get thread metadata and status
- GET /api/v1/conversation/threads
- List user's conversation threads
- DELETE /api/v1/conversation/threads/{thread_id}
- Archive/delete thread
Message Processing Endpoints:
- POST /api/v1/conversation/threads/{thread_id}/messages
- Send message to thread
- GET /api/v1/conversation/threads/{thread_id}/messages
- Get message history
- WebSocket /api/v1/conversation/threads/{thread_id}/ws
- Real-time message streaming
Implementation Details¶
Thread Creation Schema:
class ThreadCreateRequest(BaseModel):
initial_message: Optional[str] = Field(None, description="Optional first message")
context: Optional[Dict[str, Any]] = Field(None, description="Initial context metadata")
thread_type: str = Field("conversation", description="Thread type for categorization")
class ThreadResponse(BaseModel):
success: bool
thread_id: str
status: str # "created", "active", "processing"
created_at: datetime
message_count: int = 0
Message Sending Schema:
class MessageSendRequest(BaseModel):
message: str = Field(..., description="Message content")
message_type: str = Field("text", description="Message type")
metadata: Optional[Dict[str, Any]] = Field(None, description="Message metadata")
class MessageResponse(BaseModel):
success: bool
message_id: str
thread_id: str
status: str # "received", "processing", "completed"
timestamp: datetime
Advantages of Separated Architecture: - Clear Resource Boundaries: Threads and messages are distinct, cacheable resources - Independent Scaling: Thread creation and message processing can scale independently - Better Error Handling: Thread errors vs message errors are clearly separated - Enhanced Testing: Each operation can be tested in isolation - Client Flexibility: Clients can implement sophisticated conversation management - Monitoring Clarity: Separate metrics for thread lifecycle vs message processing - Caching Optimization: Thread metadata cached separately from message content
Thread Creation Flow: 1. Thread Creation: Client creates thread, optionally with initial message 2. Thread Validation: System validates user permissions and thread limits 3. Context Initialization: Initialize thread with user context and metadata 4. Initial Message Processing: If provided, queue initial message for processing 5. Response: Return thread ID and status immediately
Message Processing Flow: 1. Thread Validation: Verify thread exists and user has access 2. Message Creation: Create message with unique ID and metadata 3. Message Publication: Publish to conversation message bus 4. Immediate Response: Return message ID and processing status 5. Background Processing: Conversation engine processes asynchronously 6. Response Delivery: AI response delivered via WebSocket or polling
Proactive Agency Integration¶
Background Conversation Awareness¶
AICO maintains continuous awareness of conversation threads to identify opportunities for proactive engagement. This background monitoring enables natural follow-ups, check-ins, and relationship building initiatives.
Monitoring Capabilities: - Thread Activity Tracking: Monitor conversation patterns, response times, topic evolution - Emotional State Monitoring: Track emotional trends and identify support opportunities - Follow-up Detection: Identify unresolved topics or promised actions requiring follow-up - Relationship Milestone Recognition: Detect significant relationship moments for acknowledgment - Context Change Detection: Notice environmental or situational changes affecting conversation
Proactive Initiative Types: - Check-in Initiatives: "How did your presentation go?" based on previous conversations - Follow-up Questions: "Did you finish that book you mentioned?" for topic continuity - Emotional Support: "You seem stressed lately, want to talk?" based on pattern recognition - Relationship Building: "I've been thinking about what you said..." for deeper connection - Contextual Awareness: "Good morning! How are you feeling today?" based on time and mood patterns
Scheduling Strategy: - Optimal Timing: Consider user availability, mood, and conversation context - Natural Integration: Initiatives feel organic, not automated or intrusive - User Preferences: Respect individual preferences for proactive engagement frequency - Context Sensitivity: Adapt initiative style to current relationship dynamics
Initiative Types¶
- Check-in Initiatives: "How did your presentation go?"
- Follow-up Initiatives: "Did you finish that book you were reading?"
- Contextual Initiatives: "I noticed you seem stressed lately"
- Learning Initiatives: "I've been thinking about what you said about..."
- Relationship Initiatives: "I appreciate how patient you've been with me"
Error Handling & Resilience¶
Thread Recovery¶
The conversation system includes robust recovery mechanisms to handle thread corruption, data loss, or system failures. Recovery prioritizes conversation continuity over perfect historical accuracy.
Recovery Strategies: - Database Recovery: Attempt to restore thread from primary database backup - Memory System Recovery: Reconstruct thread context from episodic memory system - Minimal Recovery: Create functional thread with basic user context when full recovery fails - Graceful Degradation: Continue conversation with reduced context rather than complete failure
Recovery Process: 1. Corruption Detection: Identify thread integrity issues during normal operations 2. Recovery Prioritization: Attempt recovery methods in order of data completeness 3. Context Reconstruction: Rebuild conversation context from available sources 4. User Notification: Transparently inform user of any context limitations 5. Background Restoration: Continue attempting full recovery in background
Graceful Degradation¶
- Context Loss: Continue conversation with reduced context
- AI System Failure: Fallback to simpler response generation
- Memory System Failure: Continue without memory integration
- Emotion System Failure: Use default emotional parameters
Performance Considerations¶
Thread Caching Strategy¶
The conversation system uses intelligent caching to optimize thread access patterns and minimize database queries. Caching strategy balances memory usage with conversation responsiveness.
Cache Hierarchy: - Active Cache: Recently accessed threads kept in memory for immediate access - Dormant Cache: Less active threads cached with compressed context - Database Storage: Full thread persistence with encrypted content
Cache Management: - LRU Eviction: Least recently used threads moved to dormant cache or storage - Predictive Loading: Preload threads based on user activity patterns - Context Compression: Reduce memory footprint of dormant threads - Cache Warming: Proactively load threads for expected user interactions
Performance Optimization: - Batch Operations: Group database operations for efficiency - Async Loading: Non-blocking thread loading with fallback to cached data - Memory Monitoring: Dynamic cache sizing based on available system resources - Access Pattern Learning: Adapt caching strategy based on user behavior
Message Processing Optimization¶
- Async Processing: All AI processing happens asynchronously
- Streaming Responses: Progressive response delivery via WebSocket
- Context Preloading: Predictive context loading for active threads
- Batch Processing: Group similar operations for efficiency
Security & Privacy¶
Thread Access Control¶
class ThreadAccessManager:
def verify_thread_access(self, thread_id: str, user_context: UserContext) -> bool:
"""Verify user has access to thread"""
thread = self.get_thread(thread_id)
return (
thread.user_context.user_id == user_context.user_id or
self.is_family_member(user_context, thread.user_context) or
self.has_explicit_permission(user_context, thread_id)
)
Data Encryption¶
- Thread Data: AES-256-GCM encryption for all thread data
- Message Content: End-to-end encryption for sensitive content
- Memory References: Encrypted links to memory system
- Context Data: Encrypted emotional and personality state
Extension Points¶
Plugin Integration¶
The conversation system provides extensive plugin capabilities for extending conversation processing, adding custom AI components, and integrating external services.
Plugin Types: - Context Enhancers: Add specialized context information (weather, calendar, location) - AI Processors: Custom emotion recognition, personality simulation, or response generation - Response Filters: Content moderation, safety checks, or response refinement - Integration Plugins: External service integration (smart home, productivity tools) - Analytics Plugins: Conversation analysis, relationship metrics, usage tracking
Plugin Architecture: - Message Bus Integration: Plugins subscribe to relevant conversation topics - Context Pipeline: Plugins can modify conversation context at various processing stages - Response Pipeline: Plugins can enhance or filter AI responses before delivery - Event Hooks: Plugins receive notifications for conversation lifecycle events - Configuration Management: Plugin-specific settings and user preferences
Plugin Development: - Standard Interface: Consistent plugin API for easy development and integration - Async Processing: Non-blocking plugin execution with timeout protection - Error Isolation: Plugin failures don't affect core conversation processing - Hot Reloading: Dynamic plugin loading and updating without system restart
Custom AI Integration¶
The conversation system supports integration of custom AI models and processing components, enabling specialized conversation capabilities and domain-specific intelligence.
AI Integration Points: - Custom LLM Models: Integrate specialized language models for specific domains - Emotion Recognition: Custom emotion detection models trained on specific datasets - Personality Simulation: Domain-specific personality models for specialized applications - Intent Classification: Custom intent recognition for application-specific commands - Response Generation: Specialized response generators for technical or creative content
Integration Architecture: - Processor Registry: Central registry for AI component discovery and routing - Request Routing: Intelligent routing based on context, user preferences, and capability requirements - Fallback Handling: Graceful fallback to default processors when custom components fail - Performance Monitoring: Track custom AI component performance and reliability - A/B Testing: Compare custom AI components against default implementations
Development Support: - Standard Interfaces: Consistent API contracts for AI component integration - Testing Framework: Comprehensive testing tools for AI component validation - Performance Profiling: Built-in profiling for AI component optimization - Documentation Templates: Standardized documentation for custom AI components