Log Consumer Service¶
Overview¶
The Log Consumer Service receives application logs from the ZeroMQ message bus and persists them to Loki for queryable, structured log storage. It runs as a backend plugin and provides log storage for the AICO system.
Architecture ✅¶
Core Components¶
- AICOLogConsumer: Main service class with ZMQ subscription and database persistence
- Plugin Integration: Runs as
log_consumerplugin in backend service container - Message Bus Subscription: Connects to port 5556 with
logs.*topic filtering - Log Storage: Writes to Loki (HTTP push API) for LogQL queries
Message Flow ✅¶
Current Implementation:
1. Logger creates LogEntry protobuf message
2. ZMQ Transport sends to message bus on logs.* topics
3. Message Bus Broker routes to subscriber port 5556
4. Log Consumer processes and pushes to Loki
Implementation ✅¶
ZMQ Subscription¶
# Direct ZMQ subscription to message bus
self.subscriber = self.context.socket(zmq.SUB)
self.subscriber.connect(f"tcp://localhost:5556")
self.subscriber.setsockopt(zmq.SUBSCRIBE, b"logs.")
LogEntry Processing¶
Protobuf Fields:
- timestamp: ISO datetime string
- level: LogLevel enum (DEBUG, INFO, WARNING, ERROR, CRITICAL)
- subsystem: Component name (e.g., "api_gateway", "backend")
- module: Module identifier
- function: Function name
- message: Log message text
- extra: JSON metadata (optional)
Log storage ✅¶
- Loki stores logs as streams labeled by fields like service and level
- Querying uses LogQL (via the API and
aico logs ...)
Plugin Lifecycle ✅¶
- Plugin Loading: Initialized via backend service container
- Background Thread: Runs message processing loop in separate thread
- Graceful Shutdown: Responds to plugin stop() method
- Error Recovery: Continues processing despite individual message failures
Configuration ✅¶
Message Bus Settings (config/defaults/message_bus.yaml):
Backend Integration ✅¶
Plugin System¶
Startup Sequence: 1. Message Bus Plugin starts ZMQ broker (ports 5555/5556) 2. Log Consumer Plugin initializes and connects to subscriber port 3. Coordinated Lifecycle with other backend plugins
Current Status: Active plugin in production backend
Performance ✅¶
Characteristics: - Throughput: Handles typical application log volume efficiently - Memory: Minimal footprint with streaming message processing - Error Handling: Individual message failures don't stop service - Database: Efficient single-row insertions with proper error handling
Monitoring ✅¶
Service Status¶
# Check if log consumer is active
aico gateway status # Shows active plugins
# View recent logs
aico logs tail
# Test message bus connectivity
aico bus test
Debug Information¶
- Plugin Status: Visible in backend startup logs
- Message Processing: Debug output shows successful insertions
- Database Health: Verified via CLI log retrieval
Security ✅¶
Data Protection: - Local-first storage: Logs are stored locally via Loki - Local Processing: No external log transmission - Access Control: Gateway endpoints that query logs are authenticated - CurveZMQ: Message bus encryption for transport security
Privacy: - Local-First: All processing on user device - User Control: Complete control over log retention - No Telemetry: Logs never transmitted externally
Troubleshooting ✅¶
Common Issues: 1. No Logs: Check if message bus plugin is active 2. Database Errors: Verify encryption key and database initialization 3. Connection Issues: Ensure backend service is running
Diagnostic Commands: