Skip to content

Conversation System Architecture

Overview βœ…

The AICO conversation system provides REST API endpoints for managing conversation threads and processing user messages. It features automatic thread resolution, real-time WebSocket delivery, and integration with AI services for response generation.

Current Implementation: Basic conversation API with ThreadManager for automatic thread resolution and direct LLM integration.

Core Principles βœ…

  1. Thread-Based Organization: Conversations organized into threads with automatic resolution
  2. REST API Design: Clean separation between thread management and message processing
  3. Real-time Delivery: WebSocket support for immediate AI response delivery
  4. Local Processing: All conversation data remains local with encrypted storage
  5. Extensible Design: Plugin architecture allows for conversation capability extensions

Thread Architecture βœ…

Current Implementation

AdvancedThreadManager Service: - Multi-factor thread resolution (semantic + temporal + behavioral + intent-based) - Vector similarity engine using AICO embeddings for semantic thread matching - Advanced intent classification using XLM-RoBERTa transformer model - Entity extraction and context continuity analysis - Robust fallback chain with graceful degradation - UUID-based thread identifiers with confidence scoring

Thread Resolution Logic:

class ThreadResolution:
    thread_id: str
    action: str  # "continued", "created", "reactivated"
    confidence: float
    reasoning: str
    created_at: Optional[datetime]

Thread States: - Active: Recent activity with high semantic continuity - Dormant: No activity or low semantic similarity to recent messages - Created: New thread based on intent shift or topic boundary detection - Branched: New thread created from specific message with semantic clustering - Reactivated: Dormant thread resumed based on semantic similarity

Multi-Factor Scoring: - Semantic Similarity: Vector similarity between message embeddings - Temporal Continuity: Exponential decay based on time gaps - Intent Alignment: Intent classification consistency across messages - Entity Overlap: Named entity recognition and continuity - Conversation Flow: Boundary detection and topic shift analysis - User Pattern Match: Behavioral pattern recognition

User Management 🚧

Current Implementation

JWT-Based Authentication: - User identification via JWT tokens - user_uuid extracted from token for thread association - Single-user model (no multi-user recognition)

Thread-User Association: - Threads associated with user_id from JWT token - Basic access validation in ThreadManager - No complex user relationship modeling

Planned Features: - Multi-user family recognition - Voice biometrics integration - Behavioral pattern analysis - Relationship context modeling

Current API Implementation βœ…

REST Endpoints

Primary Endpoint: - POST /api/v1/conversation/messages - Unified message processing with auto-thread resolution

Advanced Endpoints: - POST /api/v1/conversation/threads - Explicit thread creation - POST /api/v1/conversation/threads/{thread_id}/messages - Send to specific thread - GET /api/v1/conversation/threads/{thread_id} - Get thread metadata - GET /api/v1/conversation/threads - List user threads

Real-time: - WebSocket /api/v1/conversation/ws/{thread_id} - Real-time AI responses

Message Processing Flow βœ…

Current Implementation: 1. REST Endpoint receives user message 2. ThreadManager resolves appropriate thread (continue existing or create new) 3. Message Bus publishes to conversation/user/input topic 4. ModelService Client generates LLM response directly 5. Response returned immediately with AI reply 6. WebSocket delivers real-time updates (if connected)

LLM Integration:

# Direct modelservice integration
modelservice_client = get_modelservice_client(config_manager)
llm_response = await modelservice_client.get_completions(
    model="hermes3:8b",
    prompt=request.message
)

AI Processing 🚧

Current State: - Direct LLM Integration: Simple prompt-response via modelservice - No Complex AI Coordination: Single LLM call without emotion/personality processing - Basic Error Handling: Fallback messages on LLM failures

Planned Enhancements: - Emotion Recognition: User emotional state analysis - Personality Integration: AICO personality traits application - Memory Retrieval: Context from conversation history - Parallel Processing: Multiple AI systems coordination - Context Assembly: Rich context from multiple sources

Context Management

Context Types

  1. Thread Context: Current conversation state and history
  2. User Context: Individual user relationship and preferences
  3. Emotional Context: Current emotional state and mood
  4. Personality Context: Active personality traits and behavioral parameters
  5. Memory Context: Relevant episodic and semantic memories
  6. Environmental Context: Time, location, device, and situational factors

Context Integration Strategy

Context integration is critical for maintaining conversation coherence and relationship continuity. The system assembles context from multiple sources to provide AI systems with comprehensive situational awareness.

Context Sources: - Thread Context: Conversation history, topic evolution, turn patterns - User Context: Individual preferences, communication style, relationship dynamics - Emotional Context: Current emotional state, mood trends, emotional history - Personality Context: Active personality traits, behavioral parameters, expression style - Memory Context: Relevant episodic memories, learned preferences, relationship milestones - Environmental Context: Time of day, location, device context, situational factors

Integration Process: 1. Context Assembly: Gather context from all available sources 2. Relevance Filtering: Prioritize context based on current conversation topic 3. Context Weighting: Apply importance weights based on recency and relevance 4. Context Compression: Optimize context size for AI system consumption 5. Context Validation: Ensure context consistency and remove conflicts

Message Bus Integration

Topic Structure

Following the standardized pattern: {domain}/{action}/{type}

Core Conversation Topics:

conversation/thread/created
conversation/thread/updated
conversation/message/received
conversation/response/generated
conversation/context/updated
conversation/memory/stored

AI Processing Topics:

ai/emotion/analyze/request
ai/emotion/analyze/response
ai/personality/express/request
ai/personality/express/response
ai/memory/retrieve/request
ai/memory/retrieve/response
ai/llm/generate/request
ai/llm/generate/response

Message Processing Flow

The conversation system uses a message-driven architecture where the REST endpoint publishes to the message bus and background systems handle all AI processing:

Flow Sequence: 1. REST Endpoint: Receives user message, resolves thread, publishes to conversation/user/input 2. Conversation Engine: Subscribes to user input, coordinates AI processing pipeline 3. AI Systems: Process emotion, personality, memory, and LLM generation in parallel 4. Response Assembly: Conversation engine assembles final response 5. Response Delivery: Published to conversation/ai/response for client consumption

Message Bus Topics: - conversation/user/input - User messages from REST layer - conversation/ai/response - AI responses for client delivery - conversation/thread/created - New thread notifications - conversation/context/updated - Thread context changes

Persistence Architecture

Database Schema

The conversation system uses a normalized database schema optimized for thread retrieval, message history, and metadata queries. All sensitive data is encrypted at rest using AES-256-GCM.

Core Tables: - conversation_threads: Thread lifecycle and ownership - conversation_messages: Complete message history with turn tracking - thread_metadata: Conversation topic, phase, and analytics

Schema Design Principles: - Normalized Structure: Separate concerns for optimal query performance - Encrypted Content: All user content encrypted with user-specific keys - Audit Trail: Complete timestamp tracking for all operations - Flexible Metadata: JSON fields for extensible conversation analytics - Foreign Key Integrity: Referential integrity across all relationships

Memory Integration

The conversation system integrates with AICO's comprehensive memory system to maintain long-term relationship context and learned preferences. Memory formation happens automatically during conversation processing.

Memory Types: - Episodic Memory: Specific conversation events, shared experiences, emotional moments - Semantic Memory: Learned facts about users, preferences, relationship dynamics - Procedural Memory: Communication patterns, successful interaction strategies

Memory Formation Process: 1. Automatic Extraction: Identify significant conversation moments during processing 2. Context Enrichment: Add emotional, temporal, and relational context to memories 3. Vector Embedding: Generate semantic embeddings for similarity-based retrieval 4. Memory Classification: Categorize memories by type, importance, and privacy level 5. Storage Integration: Persist memories with conversation thread references

Memory Retrieval Strategy: - Semantic Search: Vector similarity matching for relevant memory retrieval - Temporal Filtering: Weight recent memories higher than older ones - Relationship Context: Prioritize memories specific to current user relationship - Privacy Boundaries: Respect user-specific memory access controls

Automatic Thread Management

Modern 2025 Approach

The conversation system implements intelligent, automatic thread management that eliminates manual thread switching while maintaining natural conversation flow. This approach uses semantic analysis, temporal context, and user behavior patterns to seamlessly manage conversation continuity.

Core Principles

Semantic Thread Detection: - Topic Segmentation: Automatically detect topic shifts using semantic similarity analysis - Intent Classification: Identify when user intent changes significantly from current thread context - Contextual Continuity: Maintain thread when topics are related or build upon previous discussion - Natural Boundaries: Recognize natural conversation breakpoints (greetings, farewells, major topic shifts)

Temporal Context Management: - Session Boundaries: Automatically create new threads after extended inactivity periods - Daily Rhythms: Respect natural daily conversation patterns (morning greetings, evening check-ins) - Context Decay: Gradually reduce thread relevance as time passes without interaction - Reactivation Logic: Intelligently reactivate dormant threads when topics resurface

User Behavior Integration: - Communication Patterns: Learn individual user preferences for conversation continuity - Context Switching Tolerance: Adapt to user comfort level with topic jumping vs. linear conversation - Explicit Override Support: Allow manual thread specification when automatic detection isn't desired - Feedback Learning: Improve thread management based on user corrections and preferences

ThreadManager Service Architecture

Core Components:

class ThreadManager:
    def __init__(self):
        self.semantic_analyzer = SemanticAnalyzer()
        self.temporal_analyzer = TemporalAnalyzer()
        self.user_behavior_analyzer = UserBehaviorAnalyzer()
        self.thread_cache = ThreadCache()
        self.decision_engine = ThreadDecisionEngine()

Decision Engine Logic: 1. Semantic Analysis: Compare incoming message with active thread topics using vector similarity 2. Temporal Analysis: Evaluate time gaps and conversation rhythm patterns 3. Behavioral Analysis: Consider user-specific conversation preferences and patterns 4. Context Evaluation: Assess thread relevance and continuation appropriateness 5. Decision Synthesis: Combine all factors to make thread continuation or creation decision

Implementation Strategy

Semantic Similarity Detection: - Vector Embeddings: Generate semantic embeddings for all messages using modern language models - Similarity Thresholds: Dynamic thresholds based on conversation context and user preferences - Topic Modeling: Identify conversation topics using clustering and classification techniques - Context Windows: Analyze message context within configurable conversation windows

Temporal Pattern Recognition: - Activity Patterns: Learn user's typical conversation timing and frequency patterns - Session Detection: Identify natural conversation session boundaries - Dormancy Thresholds: Configure appropriate timeouts for thread dormancy - Reactivation Triggers: Detect when dormant threads should be reactivated

User Preference Learning: - Implicit Feedback: Learn from user behavior patterns and conversation flow - Explicit Feedback: Allow users to correct thread management decisions - Preference Profiles: Build individual user profiles for thread management preferences - Adaptive Algorithms: Continuously improve thread decisions based on user interactions

Thread Decision Matrix

Continue Existing Thread When: - Semantic similarity > 0.7 with recent messages - Time gap < user's typical conversation pause duration - Topic relates to ongoing thread discussion - User explicitly references previous context - No clear conversation boundary markers detected

Create New Thread When: - Semantic similarity < 0.4 with active thread - Time gap > dormancy threshold (default: 2 hours) - Clear topic shift detected (greetings, new subjects) - User indicates new conversation ("Let's talk about something else") - Maximum thread length reached (configurable limit)

Reactivate Dormant Thread When: - High semantic similarity with dormant thread content - User references specific past conversation elements - Temporal patterns suggest natural conversation resumption - Explicit thread reference or continuation cues detected

Integration with Conversation Flow

Thread Manager Integration:

@router.post("/threads", response_model=ThreadResponse)
async def create_thread(
    request: ThreadCreateRequest,
    current_user = Depends(get_current_user),
    thread_manager = Depends(get_thread_manager)
):
    # Create new thread with automatic context detection
    thread_decision = await thread_manager.create_or_resolve_thread(
        user_id=current_user['user_uuid'],
        initial_message=request.initial_message,
        context=request.context
    )

    return ThreadResponse(
        success=True,
        thread_id=thread_decision.thread_id,
        status="created",
        created_at=thread_decision.created_at
    )

@router.post("/threads/{thread_id}/messages", response_model=MessageResponse)
async def send_message(
    thread_id: str,
    request: MessageSendRequest,
    current_user = Depends(get_current_user),
    message_service = Depends(get_message_service)
):
    # Validate and process message
    message_result = await message_service.send_message(
        thread_id=thread_id,
        user_id=current_user['user_uuid'],
        message=request.message,
        metadata=request.metadata
    )

    return MessageResponse(
        success=True,
        message_id=message_result.message_id,
        thread_id=thread_id,
        status="processing",
        timestamp=message_result.timestamp
    )

Thread Resolution Process: 1. Input Analysis: Analyze incoming message for semantic content and intent 2. Active Thread Evaluation: Check if active threads are suitable for continuation 3. Dormant Thread Scanning: Search dormant threads for potential reactivation 4. Decision Calculation: Apply decision matrix to determine optimal thread choice 5. User Notification: Transparently inform user of thread management decisions when appropriate

Advanced Features

Multi-Thread Awareness: - Parallel Conversations: Support multiple active conversation threads simultaneously - Thread Prioritization: Rank threads by relevance, recency, and user engagement - Context Bleeding Prevention: Ensure thread isolation while allowing beneficial context sharing - Thread Merging: Intelligently merge related threads when appropriate

Proactive Thread Management: - Thread Suggestions: Suggest thread reactivation based on environmental triggers - Context Bridging: Help users transition between related conversation threads - Thread Summarization: Provide thread summaries when reactivating dormant conversations - Relationship Threading: Maintain separate threads for different relationship contexts

Performance Optimization: - Lazy Loading: Load thread context only when needed for decision making - Caching Strategy: Cache frequently accessed thread metadata and decision factors - Batch Processing: Process multiple thread decisions efficiently - Background Analysis: Continuously analyze conversation patterns in background

User Experience Design

Transparency Principles: - Invisible When Working: Thread management should be seamless and unnoticed - Visible When Needed: Clearly communicate thread decisions when user awareness is beneficial - User Control: Always allow manual override of automatic thread decisions - Learning Feedback: Provide mechanisms for users to train the system

Notification Strategy: - Subtle Indicators: Use UI elements to indicate thread context without interrupting flow - Transition Smoothing: Provide context bridges when switching between threads - History Access: Easy access to recent thread history and conversation context - Thread Visualization: Optional thread relationship visualization for power users

Configuration and Customization

System-Level Configuration:

thread_management:
  semantic_similarity:
    continuation_threshold: 0.7
    creation_threshold: 0.4
    embedding_model: "sentence-transformers/all-MiniLM-L6-v2"

  temporal_analysis:
    dormancy_threshold: "2h"
    session_gap_threshold: "30m"
    daily_reset_time: "06:00"

  behavioral_learning:
    adaptation_rate: 0.1
    feedback_weight: 0.8
    pattern_history_days: 30

User-Level Customization: - Thread Sensitivity: Adjust how easily new threads are created - Context Preferences: Configure desired level of conversation context retention - Notification Preferences: Control when and how thread decisions are communicated - Manual Override Frequency: Set preference for automatic vs. manual thread management

API Design

Separated Resource Architecture

The conversation API follows a clean resource-based design that separates thread management from message processing. This approach provides clear boundaries, better testability, and optimal performance characteristics.

Core Design Philosophy: - Resource Separation: Clear distinction between threads (conversation contexts) and messages (content) - Single Responsibility: Each endpoint has one clear, well-defined purpose - RESTful Design: Follows REST principles for resource creation, retrieval, and manipulation - Asynchronous Processing: All AI processing happens in background via message bus - Progressive Enhancement: Clients can build sophisticated conversation management incrementally

Thread Management Endpoints: - POST /api/v1/conversation/threads - Create new conversation thread - GET /api/v1/conversation/threads/{thread_id} - Get thread metadata and status - GET /api/v1/conversation/threads - List user's conversation threads - DELETE /api/v1/conversation/threads/{thread_id} - Archive/delete thread

Message Processing Endpoints: - POST /api/v1/conversation/threads/{thread_id}/messages - Send message to thread - GET /api/v1/conversation/threads/{thread_id}/messages - Get message history - WebSocket /api/v1/conversation/threads/{thread_id}/ws - Real-time message streaming

Implementation Details

Thread Creation Schema:

class ThreadCreateRequest(BaseModel):
    initial_message: Optional[str] = Field(None, description="Optional first message")
    context: Optional[Dict[str, Any]] = Field(None, description="Initial context metadata")
    thread_type: str = Field("conversation", description="Thread type for categorization")

class ThreadResponse(BaseModel):
    success: bool
    thread_id: str
    status: str  # "created", "active", "processing"
    created_at: datetime
    message_count: int = 0

Message Sending Schema:

class MessageSendRequest(BaseModel):
    message: str = Field(..., description="Message content")
    message_type: str = Field("text", description="Message type")
    metadata: Optional[Dict[str, Any]] = Field(None, description="Message metadata")

class MessageResponse(BaseModel):
    success: bool
    message_id: str
    thread_id: str
    status: str  # "received", "processing", "completed"
    timestamp: datetime

Advantages of Separated Architecture: - Clear Resource Boundaries: Threads and messages are distinct, cacheable resources - Independent Scaling: Thread creation and message processing can scale independently - Better Error Handling: Thread errors vs message errors are clearly separated - Enhanced Testing: Each operation can be tested in isolation - Client Flexibility: Clients can implement sophisticated conversation management - Monitoring Clarity: Separate metrics for thread lifecycle vs message processing - Caching Optimization: Thread metadata cached separately from message content

Thread Creation Flow: 1. Thread Creation: Client creates thread, optionally with initial message 2. Thread Validation: System validates user permissions and thread limits 3. Context Initialization: Initialize thread with user context and metadata 4. Initial Message Processing: If provided, queue initial message for processing 5. Response: Return thread ID and status immediately

Message Processing Flow: 1. Thread Validation: Verify thread exists and user has access 2. Message Creation: Create message with unique ID and metadata 3. Message Publication: Publish to conversation message bus 4. Immediate Response: Return message ID and processing status 5. Background Processing: Conversation engine processes asynchronously 6. Response Delivery: AI response delivered via WebSocket or polling

Proactive Agency Integration

Background Conversation Awareness

AICO maintains continuous awareness of conversation threads to identify opportunities for proactive engagement. This background monitoring enables natural follow-ups, check-ins, and relationship building initiatives.

Monitoring Capabilities: - Thread Activity Tracking: Monitor conversation patterns, response times, topic evolution - Emotional State Monitoring: Track emotional trends and identify support opportunities - Follow-up Detection: Identify unresolved topics or promised actions requiring follow-up - Relationship Milestone Recognition: Detect significant relationship moments for acknowledgment - Context Change Detection: Notice environmental or situational changes affecting conversation

Proactive Initiative Types: - Check-in Initiatives: "How did your presentation go?" based on previous conversations - Follow-up Questions: "Did you finish that book you mentioned?" for topic continuity - Emotional Support: "You seem stressed lately, want to talk?" based on pattern recognition - Relationship Building: "I've been thinking about what you said..." for deeper connection - Contextual Awareness: "Good morning! How are you feeling today?" based on time and mood patterns

Scheduling Strategy: - Optimal Timing: Consider user availability, mood, and conversation context - Natural Integration: Initiatives feel organic, not automated or intrusive - User Preferences: Respect individual preferences for proactive engagement frequency - Context Sensitivity: Adapt initiative style to current relationship dynamics

Initiative Types

  1. Check-in Initiatives: "How did your presentation go?"
  2. Follow-up Initiatives: "Did you finish that book you were reading?"
  3. Contextual Initiatives: "I noticed you seem stressed lately"
  4. Learning Initiatives: "I've been thinking about what you said about..."
  5. Relationship Initiatives: "I appreciate how patient you've been with me"

Error Handling & Resilience

Thread Recovery

The conversation system includes robust recovery mechanisms to handle thread corruption, data loss, or system failures. Recovery prioritizes conversation continuity over perfect historical accuracy.

Recovery Strategies: - Database Recovery: Attempt to restore thread from primary database backup - Memory System Recovery: Reconstruct thread context from episodic memory system - Minimal Recovery: Create functional thread with basic user context when full recovery fails - Graceful Degradation: Continue conversation with reduced context rather than complete failure

Recovery Process: 1. Corruption Detection: Identify thread integrity issues during normal operations 2. Recovery Prioritization: Attempt recovery methods in order of data completeness 3. Context Reconstruction: Rebuild conversation context from available sources 4. User Notification: Transparently inform user of any context limitations 5. Background Restoration: Continue attempting full recovery in background

Graceful Degradation

  • Context Loss: Continue conversation with reduced context
  • AI System Failure: Fallback to simpler response generation
  • Memory System Failure: Continue without memory integration
  • Emotion System Failure: Use default emotional parameters

Performance Considerations

Thread Caching Strategy

The conversation system uses intelligent caching to optimize thread access patterns and minimize database queries. Caching strategy balances memory usage with conversation responsiveness.

Cache Hierarchy: - Active Cache: Recently accessed threads kept in memory for immediate access - Dormant Cache: Less active threads cached with compressed context - Database Storage: Full thread persistence with encrypted content

Cache Management: - LRU Eviction: Least recently used threads moved to dormant cache or storage - Predictive Loading: Preload threads based on user activity patterns - Context Compression: Reduce memory footprint of dormant threads - Cache Warming: Proactively load threads for expected user interactions

Performance Optimization: - Batch Operations: Group database operations for efficiency - Async Loading: Non-blocking thread loading with fallback to cached data - Memory Monitoring: Dynamic cache sizing based on available system resources - Access Pattern Learning: Adapt caching strategy based on user behavior

Message Processing Optimization

  • Async Processing: All AI processing happens asynchronously
  • Streaming Responses: Progressive response delivery via WebSocket
  • Context Preloading: Predictive context loading for active threads
  • Batch Processing: Group similar operations for efficiency

Security & Privacy

Thread Access Control

class ThreadAccessManager:
    def verify_thread_access(self, thread_id: str, user_context: UserContext) -> bool:
        """Verify user has access to thread"""
        thread = self.get_thread(thread_id)
        return (
            thread.user_context.user_id == user_context.user_id or
            self.is_family_member(user_context, thread.user_context) or
            self.has_explicit_permission(user_context, thread_id)
        )

Data Encryption

  • Thread Data: AES-256-GCM encryption for all thread data
  • Message Content: End-to-end encryption for sensitive content
  • Memory References: Encrypted links to memory system
  • Context Data: Encrypted emotional and personality state

Extension Points

Plugin Integration

The conversation system provides extensive plugin capabilities for extending conversation processing, adding custom AI components, and integrating external services.

Plugin Types: - Context Enhancers: Add specialized context information (weather, calendar, location) - AI Processors: Custom emotion recognition, personality simulation, or response generation - Response Filters: Content moderation, safety checks, or response refinement - Integration Plugins: External service integration (smart home, productivity tools) - Analytics Plugins: Conversation analysis, relationship metrics, usage tracking

Plugin Architecture: - Message Bus Integration: Plugins subscribe to relevant conversation topics - Context Pipeline: Plugins can modify conversation context at various processing stages - Response Pipeline: Plugins can enhance or filter AI responses before delivery - Event Hooks: Plugins receive notifications for conversation lifecycle events - Configuration Management: Plugin-specific settings and user preferences

Plugin Development: - Standard Interface: Consistent plugin API for easy development and integration - Async Processing: Non-blocking plugin execution with timeout protection - Error Isolation: Plugin failures don't affect core conversation processing - Hot Reloading: Dynamic plugin loading and updating without system restart

Custom AI Integration

The conversation system supports integration of custom AI models and processing components, enabling specialized conversation capabilities and domain-specific intelligence.

AI Integration Points: - Custom LLM Models: Integrate specialized language models for specific domains - Emotion Recognition: Custom emotion detection models trained on specific datasets - Personality Simulation: Domain-specific personality models for specialized applications - Intent Classification: Custom intent recognition for application-specific commands - Response Generation: Specialized response generators for technical or creative content

Integration Architecture: - Processor Registry: Central registry for AI component discovery and routing - Request Routing: Intelligent routing based on context, user preferences, and capability requirements - Fallback Handling: Graceful fallback to default processors when custom components fail - Performance Monitoring: Track custom AI component performance and reliability - A/B Testing: Compare custom AI components against default implementations

Development Support: - Standard Interfaces: Consistent API contracts for AI component integration - Testing Framework: Comprehensive testing tools for AI component validation - Performance Profiling: Built-in profiling for AI component optimization - Documentation Templates: Standardized documentation for custom AI components