title: Event Bus
EventBus Implementation
Overview
The EventBus is the central implementation of the Reactive Event Mesh pattern in Atlas. It provides a thread-safe, in-memory publish-subscribe system for decoupled component communication using the Blinker library.
Architectural Role
The EventBus serves as the “nervous system” of Atlas, connecting all major components:
- Providers: Emit connection events and response events
- Agents: Coordinate through task and state events
- Knowledge: Share retrieval and document processing events
- Orchestration: Emit workflow state transitions
- Core Services: Monitor system health and metrics
Implementation Details
Library: Blinker
Blinker is chosen for EventBus implementation because it provides:
- Fast signal dispatching with minimal overhead
- Thread-safe event handling for concurrent operations
- Flexible subscription model with support for named signals
- Memory-efficient design with weak references to prevent leaks
- Lightweight footprint for minimizing dependencies
Key Features
The EventBus implementation includes:
- Thread-safe event distribution using read-write locks
- Middleware pipeline for event transformation and filtering
- Event history with configurable capacity
- Typed subscribers for type-safe event handling
- Unsubscribe tokens for clean handler removal
Core Data Types
from typing import TypeVar, Generic, Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum, auto
import uuid
import time
import threading
from blinker import Signal
# Type variables
T = TypeVar('T') # Event data type
EventId = str
class EventType(Enum):
"""Core event types in the system."""
# System lifecycle events
SYSTEM_INIT = auto()
SYSTEM_SHUTDOWN = auto()
# Provider events
PROVIDER_CREATED = auto()
PROVIDER_CONNECTED = auto()
# More event types...
@dataclass
class Event(Generic[T]):
"""Core event structure for the event bus."""
id: EventId = field(default_factory=lambda: str(uuid.uuid4()))
type: EventType
data: Optional[T] = None
timestamp: float = field(default_factory=time.time)
source: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
Implementation Structure
The EventBus implementation uses Blinker’s Signal class for event distribution:
class EventBus:
"""Central event dispatch system for the Reactive Event Mesh."""
def __init__(self, history_limit: int = 1000):
"""Initialize the event bus with configurable history limit."""
self._signals: Dict[EventType, Signal] = {}
self._middleware: List[Callable] = []
self._history: List[Event[Any]] = []
self._history_limit: int = history_limit
self._lock: threading.RLock = threading.RLock()
def subscribe(self, event_type: EventType,
handler: Callable[[Event[Any]], None]) -> Callable[[], None]:
"""Subscribe to events with unsubscribe token return."""
# Implementation details...
def publish(self, event_type: EventType, data: Any = None,
source: Optional[str] = None) -> EventId:
"""Publish event to all subscribers with middleware processing."""
# Implementation details...
Performance Considerations
Blinker is designed for high performance, but there are several optimizations in the EventBus implementation:
Signal Management
The EventBus maintains a private registry of signals instead of using Blinker’s global registry:
# Private signal creation for better isolation
def get_signal(self, event_type: EventType) -> Signal:
"""Get or create a signal for an event type."""
with self._lock:
if event_type not in self._signals:
self._signals[event_type] = Signal(str(event_type))
return self._signals[event_type]
Thread Safety
The EventBus uses a reentrant lock for thread safety, ensuring concurrent access is properly synchronized:
# Thread-safe publishing with lock separation
def publish(self, event_type: EventType, data: Any = None) -> EventId:
# Create event
event = Event(type=event_type, data=data)
# Critical section: apply middleware and store history
with self._lock:
# Apply middleware...
# Store in history...
pass
# Send event outside lock to prevent deadlocks during handler execution
signal = self._signals.get(event_type)
if signal:
signal.send(event)
Memory Management
Blinker’s weak references help prevent memory leaks from accumulated handlers:
# Use weak references for handler connections
self._signals[event_type].connect(handler, weak=True)
Bounded History
The EventBus maintains a bounded history to prevent unbounded memory growth:
# Bounded history with size limit
self._history.append(current_event)
if len(self._history) > self._history_limit:
self._history = self._history[-self._history_limit:]
Integration Patterns
Provider Integration
# Type definition only - not full implementation
class ProviderManager:
"""Manager for provider lifecycle events."""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.providers = {}
# Subscribe to provider events
self.event_bus.subscribe(EventType.PROVIDER_CREATED,
self._handle_provider_created)
Agent Integration
# Type definition only - not full implementation
class AgentController:
"""Controller for agent coordination events."""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.agents = {}
# Subscribe to agent lifecycle events
self.event_bus.subscribe(EventType.AGENT_STARTED,
self._handle_agent_started)
Workflow Integration
# Type definition only - not full implementation
class WorkflowEngine:
"""Engine for workflow execution events."""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
def execute_node(self, workflow_id: str, node_id: str,
input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Execute workflow node with event notifications."""
# Emit node entered event
self.event_bus.publish(EventType.WORKFLOW_NODE_ENTERED,
{"workflow_id": workflow_id, "node_id": node_id})
Usage Patterns
Basic Publish/Subscribe
Basic Usage with Blinker
# Create event bus
event_bus = EventBus()
# Subscribe to events
unsubscribe = event_bus.subscribe(
EventType.STREAM_STARTED,
lambda event: print(f"Stream started: {event.data}")
)
# Publish event
event_id = event_bus.publish(
EventType.STREAM_STARTED,
{"stream_id": "stream1", "provider": "anthropic"}
)
# Unsubscribe when done
unsubscribe()
Event Middleware
Event Transformation
# Type definition only - not full implementation
def add_timestamp_middleware(event: Event) -> Event:
"""Add ISO timestamp to event metadata."""
if event.metadata is None:
event.metadata = {}
event.metadata["iso_time"] = format_iso_time(event.timestamp)
return event
# Add middleware to event bus
remove_middleware = event_bus.add_middleware(add_timestamp_middleware)
Error Handling
Safe Subscription
# Type definition only - not full implementation
def safe_subscribe(event_bus: EventBus, event_type: EventType,
handler: Callable[[Event], None]) -> Callable[[], None]:
"""Subscribe with error handling wrapper."""
def safe_handler(event: Event) -> None:
try:
handler(event)
except Exception as e:
# Log error and publish error event
event_bus.publish(
EventType.HANDLER_ERROR,
{"original_event": event.id, "error": str(e)}
)
return event_bus.subscribe(event_type, safe_handler)
Scoped Event Bus
Hierarchical Event Scope
# Type definition only - not full implementation
class ScopedEventBus:
"""Event bus with hierarchical scoping."""
def __init__(self, parent=None, scope_name=None):
self.event_bus = EventBus()
self.parent_bus = parent
self.scope_name = scope_name or str(uuid.uuid4())[:8]
def publish(self, event_type, data=None):
"""Publish in current scope and propagate to parent."""
# Publish locally
event_id = self.event_bus.publish(event_type, data)
# Propagate to parent if available
if self.parent_bus:
self.parent_bus.publish(event_type, data)
return event_id
Performance Optimization
The EventBus implementation includes several performance optimizations:
- Signal Caching: Reusing signal instances for each event type
- Lock Minimization: Using lock only for critical sections
- Weak References: Preventing memory leaks from handler references
- Bounded Collections: Limiting history size for memory efficiency
- Dispatch Separation: Separating event creation from dispatch
Relationship to Patterns
Implements:
- Reactive Event Mesh: Primary implementation
Supports:
- Temporal Versioning: Events can trigger version creation
- State Projection: Events can trigger projections
- Effect System: Events can be modeled as effects
- Quantum Partitioning: Events can coordinate parallel execution