Skip to content

Core Message Bus Architecture

Overview

The Core Message Bus is the central nervous system of AICO, enabling modular, event-driven communication between all system components. It implements a publish-subscribe (pub/sub) pattern that allows modules to communicate without direct dependencies, supporting AICO's core principles of modularity, autonomy, and extensibility.

πŸ”’ Security First: All message bus communication is encrypted using CurveZMQ with mandatory authentication. There is no plaintext fallback - the system enforces secure communication or fails completely.

⚠️ CRITICAL: Logging Recursion Prevention - Avoid standard logging within message bus operations to prevent infinite recursion loops.

This architecture document describes the design, implementation, and integration patterns of AICO's central message bus system, which serves as the foundation for inter-module communication and coordination.

Design Principles

The Core Message Bus architecture is built on the following key principles:

1. Loose Coupling

Modules communicate exclusively through the message bus rather than direct method calls, enabling: - Independent development and testing of modules - Ability to replace or upgrade modules without affecting others - Simplified integration of new capabilities

2. Event-Driven Architecture

The system operates on an event-driven paradigm where: - Modules publish events (messages) when state changes occur - Interested modules subscribe to relevant topics - Processing occurs asynchronously and reactively

3. Standardized Communication

All messages follow a consistent envelope structure defined in Protocol Buffers:

message AicoMessage {
  MessageMetadata metadata = 1;
  oneof payload {
    EmotionState emotion_state = 2;
    ConversationMessage conversation_message = 3;
    // Other message types...
  }
}

message MessageMetadata {
  string message_id = 1;       // UUID string
  string timestamp = 2;        // ISO 8601 format
  string source = 3;           // Source module name
  string message_type = 4;     // topic/subtopic format
  string version = 5;          // Schema version
}

4. Topic-Based Routing

Messages are organized in a hierarchical topic structure: - Primary category (e.g., emotion, personality, agency) - Subcategory (e.g., state, expression, goals) - Action/type (e.g., current, update, request)

5. Versioned Message Formats

All message formats are explicitly versioned to enable: - Backward compatibility - Graceful evolution of the system - Support for multiple message format versions simultaneously

Technical Implementation

Message Bus Architecture

The Core Message Bus implements a hybrid broker pattern with the backend service acting as the central message coordinator:

Internal Communication (Backend Modules): - Protocol: ZeroMQ with Protocol Buffers - Transport: inproc:// for same-process modules, ipc:// for cross-process - Pattern: Pub/Sub with topic hierarchy - Host: Backend service runs central ZeroMQ broker on tcp://localhost:5555

External Communication (Subsystems): - Frontend (Flutter): WebSocket for real-time updates, REST API for commands - CLI (Python): ZeroMQ IPC with localhost REST fallback - Studio (React): REST API for admin operations, WebSocket for monitoring - Transport: All external clients connect to backend's API Gateway

Message Bus Technology

The Core Message Bus uses ZeroMQ with CurveZMQ encryption:

# Example: Creating encrypted message bus client
from aico.core.bus import create_client

client = create_client("api_gateway")
await client.connect()  # Automatically sets up CurveZMQ encryption
  • High-performance: Asynchronous messaging with minimal overhead
  • Secure by default: Mandatory CurveZMQ encryption for all communication
  • Flexible patterns: Pub/sub with hierarchical topic routing
  • Embedded: No external message broker dependencies

Message Format

// Example: Core message envelope
message AicoMessage {
  MessageMetadata metadata = 1;
  google.protobuf.Any any_payload = 2;
}

message MessageMetadata {
  string message_id = 1;
  string timestamp = 2;
  string source = 3;
  string message_type = 4;
  string version = 5;
}

Protocol Buffers provide: - Binary serialization: Compact, fast encoding/decoding - Strong typing: Compile-time validation and code generation - Versioning: Backward compatibility through schema evolution - Cross-language: Python, Dart, and other language bindings

Message Validation

Messages are validated through Protocol Buffers' built-in validation: - Compile-time type checking - Runtime schema validation - Required fields enforcement - Automatic versioning support

Topic Hierarchy

The message bus uses a hierarchical topic structure that organizes messages by functional domain and purpose:

Core Domains

IMPORTANT: AICO uses a centralized topic registry (AICOTopics) with slash-based notation for all message bus topics.

ZeroMQ Subscription Behavior

Critical Considerations

  1. ZeroMQ uses prefix matching only
  2. When you subscribe to a pattern, ZeroMQ converts it to a prefix filter
  3. Example: logs/* becomes ZMQ filter logs/
  4. This means ZeroMQ will deliver ANY message whose topic starts with that prefix

  5. Application-level pattern matching

  6. After ZeroMQ delivers messages based on prefix, AICO performs application-level pattern matching
  7. This is where wildcard semantics are applied

ZeroMQ Prefix Matching

ZeroMQ uses simple prefix matching (no wildcards):

Pattern ZMQ Filter Behavior Matches
logs/backend logs/backend Exact prefix match logs/backend, logs/backend/main, logs/backend/api
logs/ logs/ Prefix match All topics starting with logs/
* or ** "" (empty) Match all Every message on the bus

Common Subscription Patterns

Use Case Pattern ZMQ Filter Matches
All logs logs/ logs/ All topics starting with logs/
Backend logs logs/backend/ logs/backend/ All topics starting with logs/backend/
Specific module logs/backend/main logs/backend/main Topics starting with logs/backend/main
All messages * or ** "" (empty) Every message on the bus

Best Practices

  1. Use prefix patterns for hierarchical subscriptions
  2. Subscribe to logs/ to receive all log messages
  3. Subscribe to logs/backend/ to receive all backend logs
  4. Be specific with prefixes to avoid unnecessary message delivery

  5. Understand ZeroMQ's prefix behavior

  6. ZeroMQ delivers ANY message whose topic starts with your filter
  7. No application-level filtering is implemented
  8. Design topics carefully to leverage prefix matching effectively

Common Pitfalls

  1. Expecting wildcard behavior
  2. ZeroMQ does NOT support * or ** wildcards
  3. logs/* is treated as literal prefix logs/*, not a wildcard
  4. Use proper prefixes like logs/ instead

  5. Over-subscribing with broad prefixes

  6. Subscribing to logs/ delivers ALL log messages
  7. This can cause performance issues with high message volume
  8. Use specific prefixes when possible

  9. Inconsistent topic structure

  10. Design hierarchical topics to work well with prefix matching
  11. Use consistent separators (slashes) for topic hierarchy

  12. emotion/ - Emotion simulation related messages

  13. emotion/state/current - Current emotional state
  14. emotion/state/update - Emotional state changes
  15. emotion/appraisal/event - Emotional appraisal of events

  16. personality/ - Personality simulation related messages

  17. personality/state/current - Current personality state
  18. personality/expression/communication - Communication style parameters
  19. personality/expression/decision - Decision-making parameters
  20. personality/expression/emotional - Emotional tendency parameters

  21. agency/ - Autonomous agency related messages

  22. agency/goals/current - Current agent goals
  23. agency/initiative - Proactive engagement initiatives
  24. agency/decision/request - Decision-making requests
  25. agency/decision/response - Decision outcomes

  26. conversation/ - Conversation and dialogue related messages

  27. conversation/context/current - Current conversation context
  28. conversation/history/add - Historical conversation data
  29. conversation/intent/detected - Detected user intents

  30. memory/ - Memory and learning related messages

  31. memory/store/request - Memory storage requests
  32. memory/retrieve/request - Memory retrieval requests
  33. memory/consolidation/start - Memory consolidation triggers

  34. user/ - User-related messages

  35. user/interaction/history - User interaction patterns
  36. user/feedback/explicit - Explicit user feedback
  37. user/state/update - Inferred user state changes

  38. llm/ - Large Language Model related messages

  39. llm/conversation/events - Conversation events from LLM
  40. llm/prompt/conditioning/request - Requests for prompt conditioning
  41. llm/prompt/conditioning/response - Prompt conditioning parameters

  42. ui/ - User Interface related messages

  43. ui/state/update - UI state changes (theme, navigation, connection status)
  44. ui/interaction/event - User interactions (clicks, input, gestures)
  45. ui/notification/show - Display notifications and alerts
  46. ui/command/execute - Backend commands to frontend
  47. ui/preferences/update - UI preferences and settings updates

  48. system/ - System management messages

  49. system/bus/started - Message bus startup events
  50. system/bus/stopping - Message bus shutdown events
  51. system/module/registered - Module registration events
  52. system/health - System health checks

  53. logs/ - Logging and audit messages

  54. logs/entry - Individual log entries
  55. logs/* - All log topics (wildcard subscription)

Cross-Cutting Concerns

  • crisis/ - Crisis detection and handling
  • crisis/detection - Crisis signals and alerts
  • crisis/response - Crisis response coordination

  • expression/ - Cross-modal expression coordination

  • expression/coordination - Coordinated expression directives
  • expression/feedback - Expression effectiveness feedback

  • learning/ - Shared learning coordination

  • learning/coordination - Learning signals and coordination
  • learning/feedback - Learning effectiveness feedback

Module Integration Patterns

Publisher-Subscriber Pattern

Modules interact with the message bus through a consistent pattern:

  1. Initialization:
  2. Modules connect to the message bus on startup
  3. They declare topic subscriptions based on their functionality
  4. They register message handlers for each subscribed topic

  5. Message Publication:

  6. Modules publish messages when their internal state changes
  7. Messages include standardized metadata and domain-specific payloads
  8. Publication is non-blocking and asynchronous

  9. Message Consumption:

  10. Modules receive messages for their subscribed topics
  11. Message handlers process incoming messages
  12. Processing may trigger internal state changes or new message publications

Example: Emotion-Personality Integration

The Emotion Simulation and Personality Simulation modules integrate through the message bus:

  1. Personality Simulation publishes personality/expression/emotional messages
  2. Emotion Simulation subscribes to these messages to adjust emotional tendencies
  3. Emotion Simulation publishes emotion/state/current messages
  4. Personality Simulation subscribes to these messages to inform personality expression

This bidirectional communication happens without direct dependencies between the modules.

Using the Central Topic Registry

All code should use the AICOTopics class instead of string literals:

from aico.core.topics import AICOTopics

# Correct usage
await client.publish(AICOTopics.EMOTION_STATE_CURRENT, emotion_data)
await client.subscribe(AICOTopics.ALL_PERSONALITY, handler)

# Incorrect usage (deprecated)
await client.publish("emotion.state.current", emotion_data)  # DON'T DO THIS

Migration Support: The TopicMigration class provides automatic conversion from old dot notation to new slash notation for backward compatibility during the transition period.

Plugin Integration

The Plugin Manager mediates plugin access to the message bus:

  1. Topic Access Control:
  2. Plugins request access to specific topics
  3. Plugin Manager enforces access policies based on plugin permissions
  4. Unauthorized topic access attempts are blocked and logged

  5. Message Validation:

  6. All plugin-originated messages are validated before publication
  7. Malformed messages are rejected to prevent system instability
  8. Message rate limiting prevents denial-of-service attacks

  9. Sandboxed Publication:

  10. Plugins publish through the Plugin Manager proxy
  11. Messages are tagged with plugin identity for traceability
  12. Plugin-specific topic prefixes isolate plugin messages

Security and Privacy Considerations

Message Security

  1. CurveZMQ Encryption:
  2. Mandatory encryption: All message bus communication uses CurveZMQ with no plaintext fallback
  3. Deterministic key derivation: Keys derived from master key using Argon2id + Z85 encoding
  4. Mutual authentication: Both broker and clients authenticate using public key cryptography
  5. Fail-secure behavior: System fails completely rather than falling back to plaintext

  6. Authentication:

  7. All modules authenticate to the message bus using CurveZMQ certificates
  8. Broker validates specific client public keys (no CURVE_ALLOW_ANY)
  9. Unauthorized connections are rejected with comprehensive security logging
  10. Plugin authentication uses separate CurveZMQ credentials

  11. Authorization:

  12. Topic-level access control limits which modules can publish/subscribe
  13. Sensitive topics have restricted access
  14. Plugin access is limited to approved topics

Privacy Protection

  1. Data Minimization:
  2. Messages contain only necessary information
  3. Sensitive data is filtered before publication
  4. User identifiers are anonymized where possible

  5. End-to-End Encryption:

  6. Transport encryption: All message bus traffic encrypted with CurveZMQ
  7. Message payload encryption: Sensitive payloads additionally encrypted at application level
  8. Zero plaintext transmission: No unencrypted data crosses network boundaries
  9. Key management: Automatic key derivation with secure storage integration

Performance Considerations

Message Throughput

The message bus is designed to handle: - High-frequency emotional state updates - Real-time conversation events - Periodic memory consolidation - Burst traffic during multi-modal coordination

Optimization Strategies

  1. Message Prioritization:
  2. Critical messages (e.g., crisis detection) receive higher priority
  3. Non-time-sensitive messages may be queued during high load

  4. Payload Optimization:

  5. Large payloads may use compression
  6. References instead of full content where appropriate
  7. Selective field inclusion for performance-critical paths

  8. Subscription Optimization:

  9. Fine-grained topic subscriptions to reduce unnecessary message processing
  10. Message filtering at the source when possible
  11. Local caching of frequently accessed message data

Message Persistence

Storage Strategy

Database: libSQL (already integrated and encrypted) - Selective persistence for audit logs, debugging, and cross-device sync - Append-only message log with SQL queryability - JSON metadata support for flexible message attributes

Storage Schema:

CREATE TABLE events (
    id INTEGER PRIMARY KEY,
    timestamp DATETIME,
    topic TEXT,
    source TEXT,
    message_type TEXT,
    payload BLOB,      -- Protocol Buffer binary
    metadata JSON,     -- Flexible attributes
    INDEX(topic, timestamp)
);

Persistence Policy: - Always: Security events, audit logs, admin actions - Optional: Debug mode message replay, cross-device sync - Never: High-frequency emotion states (unless debugging)

Monitoring and Debugging

The message bus includes facilities for:

  1. Message Tracing:
  2. Correlation IDs link related messages
  3. End-to-end tracing of message flows
  4. Timing metrics for message processing

  5. Traffic Monitoring:

  6. Topic-level message volume metrics
  7. Latency measurements for critical paths
  8. Queue depth monitoring for backpressure detection

  9. Debugging Tools:

  10. Message bus inspector for real-time monitoring
  11. Message replay capabilities for testing
  12. Topic subscription viewer to understand module connectivity

Message Definition and Code Generation

Protocol Buffer Definitions

All message definitions are maintained as Protocol Buffer (.proto) files in the /proto/ directory:

  • Core message envelope: /proto/core/envelope.proto
  • Emotion messages: /proto/emotion/emotion.proto
  • Conversation messages: /proto/conversation/conversation.proto
  • Personality messages: /proto/personality/personality.proto
  • Integration messages: /proto/integration/integration.proto
  • UI messages: /proto/ui/ui.proto

Code Generation Pipeline

The build process automatically generates language-specific code from these definitions:

  1. Python classes for backend services
  2. Dart classes for Flutter frontend
  3. Additional language bindings as needed

CurveZMQ Implementation

Security Architecture

AICO's message bus implements mandatory CurveZMQ encryption for all inter-component communication with the following core principles:

  1. Mandatory Encryption: No plaintext fallback - system fails securely if encryption cannot be established
  2. Mutual Authentication: Both broker and clients authenticate using public key cryptography
  3. Deterministic Key Derivation: All keys derived from master key using Argon2id + Z85 encoding
  4. Fail-Secure Design: Encryption failures result in system failure, not insecure fallback

Key Management

Master Key Integration

from aico.security.key_manager import AICOKeyManager
from aico.core.config import ConfigurationManager

# Initialize key manager
config = ConfigurationManager()
key_manager = AICOKeyManager(config)

# Authenticate and get master key
master_key = key_manager.authenticate(interactive=True)

# Derive CurveZMQ keypair for specific component
public_key, secret_key = key_manager.derive_curve_keypair(master_key, "message_bus_client_api_gateway")

Key Derivation Process

  1. Input: Master key + component identifier
  2. KDF: Argon2id with fixed salt and parameters
  3. Encoding: Z85 encoding for ZeroMQ compatibility
  4. Output: 40-character public/secret key pair

Broker Configuration

Authentication Setup

from aico.core.bus import MessageBusBroker

# Create encrypted broker
broker = MessageBusBroker()
await broker.start()

# Broker automatically:
# 1. Derives broker keypair from master key
# 2. Sets up ThreadAuthenticator
# 3. Configures authorized client public keys
# 4. Enables CurveZMQ on all sockets

Authorized Clients

The broker maintains a fixed list of authorized clients: - message_bus_client_api_gateway - message_bus_client_log_consumer - message_bus_client_scheduler - message_bus_client_cli - message_bus_client_modelservice - message_bus_client_system_host - message_bus_client_backend_modules

Client Configuration

Basic Usage

from aico.core.bus import MessageBusClient, create_client

# Create encrypted client (recommended)
client = create_client("api_gateway")
await client.connect()

# Manual creation
client = MessageBusClient("api_gateway")
await client.connect()

# Client automatically:
# 1. Derives client keypair from master key
# 2. Retrieves broker public key
# 3. Configures CurveZMQ on publisher/subscriber sockets
# 4. Authenticates with broker

Message Publishing

# Publish encrypted message
await client.publish("test.topic", {"data": "encrypted content"})

# All messages are automatically encrypted with CurveZMQ

Message Subscription

# Subscribe to encrypted messages
def message_handler(topic: str, message: dict):
    print(f"Received encrypted message on {topic}: {message}")

await client.subscribe("test.*", message_handler)

# All received messages are automatically decrypted

Implementation Details

Socket Configuration

Publisher Socket:

# CurveZMQ configuration applied automatically
publisher.setsockopt(zmq.CURVE_SERVER, 0)  # Client mode
publisher.setsockopt_string(zmq.CURVE_SECRETKEY, secret_key)
publisher.setsockopt_string(zmq.CURVE_PUBLICKEY, public_key)
publisher.setsockopt_string(zmq.CURVE_SERVERKEY, broker_public_key)

Subscriber Socket:

# CurveZMQ configuration applied automatically
subscriber.setsockopt(zmq.CURVE_SERVER, 0)  # Client mode
subscriber.setsockopt_string(zmq.CURVE_SECRETKEY, secret_key)
subscriber.setsockopt_string(zmq.CURVE_PUBLICKEY, public_key)
subscriber.setsockopt_string(zmq.CURVE_SERVERKEY, broker_public_key)

Broker Sockets:

# Frontend (clients connect here)
frontend.setsockopt(zmq.CURVE_SERVER, 1)  # Server mode
frontend.setsockopt_string(zmq.CURVE_SECRETKEY, broker_secret_key)
frontend.setsockopt_string(zmq.CURVE_PUBLICKEY, broker_public_key)

# Backend (internal forwarding)
backend.setsockopt(zmq.CURVE_SERVER, 1)  # Server mode
backend.setsockopt_string(zmq.CURVE_SECRETKEY, broker_secret_key)
backend.setsockopt_string(zmq.CURVE_PUBLICKEY, broker_public_key)

Security Logging

All CurveZMQ operations include comprehensive security logging:

Client Logging:

self.logger.info(f"[SECURITY] CurveZMQ encryption enabled for client: {self.client_id}")
self.logger.debug(f"[SECURITY] Client public key fingerprint: {self.public_key[:8]}...")
self.logger.debug(f"[SECURITY] Authenticating broker with public key fingerprint: {broker_public_key[:8]}...")
self.logger.info(f"[SECURITY] CurveZMQ socket encryption configured for client {self.client_id}")

Broker Logging:

self.logger.info("[SECURITY] Setting up CurveZMQ authentication for message bus broker")
self.logger.debug(f"[SECURITY] Authorized CurveZMQ client: {client_name} (key: {client_public_key[:8]}...)")
self.logger.info("[SECURITY] Broker authentication setup complete - all connections will be encrypted")

Error Handling

Fail-Secure Behavior:

try:
    # Setup CurveZMQ encryption
    await self._setup_curve_encryption(config)
    self._configure_curve_sockets()
except Exception as e:
    # NO PLAINTEXT FALLBACK - Fail securely
    self.logger.error(f"[SECURITY] CRITICAL: Failed to setup CurveZMQ encryption: {e}")
    raise MessageBusError(f"CurveZMQ encryption setup failed: {e}")

Testing and Validation

Test Script

Use the provided test script to verify encryption:

python scripts/test_curve_zmq.py

Expected output:

πŸ”’ Testing CurveZMQ Message Bus Encryption
==================================================
βœ… Broker started (encryption: enabled)
βœ… Publisher connected (encryption: enabled)
βœ… Subscriber connected (encryption: enabled)
βœ… All 3 encrypted messages received successfully!
πŸŽ‰ CurveZMQ Message Bus Encryption Test: PASSED

CLI Testing

Test encrypted CLI commands:

# Test encrypted message bus
aico bus test

# Monitor encrypted traffic
aico bus monitor

# Check broker statistics
aico bus stats

Migration from Plaintext

Removed Components

  1. Plaintext fallback code: All fallback mechanisms removed
  2. CURVE_ALLOW_ANY: Replaced with explicit client authentication
  3. Raw ZMQ sockets: All components use encrypted MessageBusClient
  4. IPC adapter: Unused ZeroMQ IPC adapter removed

Breaking Changes

  • No backward compatibility: Old plaintext clients cannot connect
  • Master key required: All components require master key for operation
  • Fail-secure only: No graceful degradation to plaintext mode

Troubleshooting

Common Issues

Authentication Failures:

[SECURITY] CRITICAL: Failed to setup CurveZMQ authentication
Solution: Verify master key is available and AICOKeyManager is properly configured.

Key Derivation Errors:

[SECURITY] CRITICAL: Failed to setup CurveZMQ encryption
Solution: Check master key authentication and key manager initialization.

Connection Refused:

MessageBusError: CurveZMQ socket configuration failed
Solution: Ensure broker is running and client public key is in authorized list.

Debug Logging

Enable debug logging to see detailed CurveZMQ operations:

import logging
logging.getLogger('aico.core.bus').setLevel(logging.DEBUG)

Security Guarantees

What is Protected

βœ… All message bus traffic encrypted
βœ… Mutual authentication between all components
βœ… No plaintext fallback possible
βœ… Deterministic key derivation from master key
βœ… Comprehensive security logging

What is NOT Protected

❌ Application-level message content (use additional encryption if needed)
❌ Topic names (visible in ZeroMQ subscription filters)
❌ Message timing/frequency (traffic analysis still possible)

Performance Impact

Encryption Overhead

  • CPU: ~5-10% overhead for CurveZMQ encryption/decryption
  • Memory: Minimal additional memory usage
  • Latency: <1ms additional latency per message
  • Throughput: >95% of plaintext performance maintained

Optimization Tips

  1. Reuse connections: Avoid frequent connect/disconnect cycles
  2. Batch messages: Group small messages when possible
  3. Monitor key derivation: Cache derived keys when appropriate

Conclusion

The Core Message Bus architecture is fundamental to AICO's modular, event-driven design. It enables:

  • Modularity: Components can be developed, tested, and deployed independently
  • Extensibility: New modules and plugins can be integrated without modifying existing code
  • Resilience: Failures in one module don't cascade to others
  • Adaptability: The system can evolve through versioned message formats
  • Autonomy: Modules can operate independently based on events
  • Performance: Binary serialization optimizes for speed and size
  • Cross-Platform: Consistent message format across all platforms and devices
  • Security: Mandatory CurveZMQ encryption ensures all communication is protected

By providing a standardized, secure communication backbone, the message bus facilitates the complex interactions required for AICO's proactive agency, emotional presence, personality consistency, and multi-modal embodiment across its federated device network.