Skip to content

Log Consumer Service

Overview

The Log Consumer Service receives application logs from the ZeroMQ message bus and persists them to Loki for queryable, structured log storage. It runs as a backend plugin and provides log storage for the AICO system.

Architecture ✅

Core Components

  • AICOLogConsumer: Main service class with ZMQ subscription and database persistence
  • Plugin Integration: Runs as log_consumer plugin in backend service container
  • Message Bus Subscription: Connects to port 5556 with logs.* topic filtering
  • Log Storage: Writes to Loki (HTTP push API) for LogQL queries

Message Flow ✅

Application Logger → ZMQ Transport → Message Bus (5555/5556) → Log Consumer → Database
Application Logger → ZMQ Transport → Message Bus (5555/5556) → Log Consumer → Loki

Current Implementation: 1. Logger creates LogEntry protobuf message 2. ZMQ Transport sends to message bus on logs.* topics 3. Message Bus Broker routes to subscriber port 5556 4. Log Consumer processes and pushes to Loki

Implementation ✅

ZMQ Subscription

# Direct ZMQ subscription to message bus
self.subscriber = self.context.socket(zmq.SUB)
self.subscriber.connect(f"tcp://localhost:5556")
self.subscriber.setsockopt(zmq.SUBSCRIBE, b"logs.")

LogEntry Processing

Protobuf Fields: - timestamp: ISO datetime string - level: LogLevel enum (DEBUG, INFO, WARNING, ERROR, CRITICAL) - subsystem: Component name (e.g., "api_gateway", "backend") - module: Module identifier - function: Function name - message: Log message text - extra: JSON metadata (optional)

Log storage ✅

  • Loki stores logs as streams labeled by fields like service and level
  • Querying uses LogQL (via the API and aico logs ...)

Plugin Lifecycle ✅

  • Plugin Loading: Initialized via backend service container
  • Background Thread: Runs message processing loop in separate thread
  • Graceful Shutdown: Responds to plugin stop() method
  • Error Recovery: Continues processing despite individual message failures

Configuration ✅

Message Bus Settings (config/defaults/message_bus.yaml):

host: "localhost"
pub_port: 5555    # Publisher port
sub_port: 5556    # Subscriber port

Backend Integration ✅

Plugin System

Startup Sequence: 1. Message Bus Plugin starts ZMQ broker (ports 5555/5556) 2. Log Consumer Plugin initializes and connects to subscriber port 3. Coordinated Lifecycle with other backend plugins

Current Status: Active plugin in production backend

Performance ✅

Characteristics: - Throughput: Handles typical application log volume efficiently - Memory: Minimal footprint with streaming message processing - Error Handling: Individual message failures don't stop service - Database: Efficient single-row insertions with proper error handling

Monitoring ✅

Service Status

# Check if log consumer is active
aico gateway status  # Shows active plugins

# View recent logs
aico logs tail

# Test message bus connectivity
aico bus test

Debug Information

  • Plugin Status: Visible in backend startup logs
  • Message Processing: Debug output shows successful insertions
  • Database Health: Verified via CLI log retrieval

Security ✅

Data Protection: - Local-first storage: Logs are stored locally via Loki - Local Processing: No external log transmission - Access Control: Gateway endpoints that query logs are authenticated - CurveZMQ: Message bus encryption for transport security

Privacy: - Local-First: All processing on user device - User Control: Complete control over log retention - No Telemetry: Logs never transmitted externally

Troubleshooting ✅

Common Issues: 1. No Logs: Check if message bus plugin is active 2. Database Errors: Verify encryption key and database initialization 3. Connection Issues: Ensure backend service is running

Diagnostic Commands:

aico gateway status    # Check plugin status
aico logs tail         # View recent logs
aico db status         # Database health
aico bus test          # Message bus connectivity