Core Services Architecture
This document details the architecture for the core services module in Atlas, providing standardized interfaces for service components across the framework.
Overview
The atlas.core.services
module will provide a comprehensive set of interfaces and implementations for key architectural patterns used throughout Atlas. These components will enable unified patterns for streaming, state management, resource lifecycle, and command execution.
Service Module Structure
atlas/core/services/
├── __init__.py # Module exports
├── base.py # Base service interfaces
├── buffer.py # Thread-safe buffer implementations
├── state.py # State management patterns
├── commands.py # Command pattern implementation
├── concurrency.py # Thread safety utilities
└── resources.py # Resource lifecycle management
Interface Definitions
1. Base Interfaces (base.py)
from abc import ABC, abstractmethod
from typing import Callable, Dict, Any, Optional, TypeVar, Generic
S = TypeVar('S') # State type
T = TypeVar('T') # Content type
class Controllable(ABC):
"""
Interface for components that support control operations.
This interface defines standard control operations that can be
applied to various components throughout the system.
"""
@property
@abstractmethod
def state(self) -> S:
"""Get the current state of the component."""
pass
@property
@abstractmethod
def can_pause(self) -> bool:
"""Whether this component supports pausing."""
pass
@property
@abstractmethod
def can_resume(self) -> bool:
"""Whether this component can be resumed from a paused state."""
pass
@property
@abstractmethod
def can_cancel(self) -> bool:
"""Whether this component supports cancellation."""
pass
@abstractmethod
def pause(self) -> bool:
"""
Pause the component if supported.
Returns:
bool: True if the component was paused, False otherwise.
"""
pass
@abstractmethod
def resume(self) -> bool:
"""
Resume the component if paused.
Returns:
bool: True if the component was resumed, False otherwise.
"""
pass
@abstractmethod
def cancel(self) -> bool:
"""
Cancel the component if supported.
Returns:
bool: True if the component was cancelled, False otherwise.
"""
pass
@abstractmethod
def get_metrics(self) -> Dict[str, Any]:
"""
Get component performance metrics.
Returns:
Dict containing metrics such as processing rates, throughput, etc.
"""
pass
@abstractmethod
def register_state_change_callback(self, callback: Callable[[S], None]) -> None:
"""
Register a callback to be called when the state changes.
Args:
callback: Function to call with the new state.
"""
pass
class ProgressReporter(ABC):
"""Interface for components that report progress."""
@property
@abstractmethod
def progress(self) -> float:
"""Get the current progress as a value between 0.0 and 1.0."""
pass
@abstractmethod
def register_progress_callback(self, callback: Callable[[float], None]) -> None:
"""
Register a callback to be called when progress changes.
Args:
callback: Function to call with the new progress value.
"""
pass
class DataProducer(Generic[T], ABC):
"""Interface for components that produce data."""
@abstractmethod
def register_data_callback(self, callback: Callable[[T], None]) -> None:
"""
Register a callback to be called when new data is produced.
Args:
callback: Function to call with the new data.
"""
pass
class ServiceComponent(ABC):
"""
Base interface for all service components.
Defines the common lifecycle methods that all service components
should implement.
"""
@abstractmethod
def initialize(self) -> bool:
"""
Initialize the component.
Returns:
bool: True if initialization was successful, False otherwise.
"""
pass
@abstractmethod
def dispose(self) -> None:
"""
Clean up resources used by the component.
This method should be idempotent and safe to call multiple times.
"""
pass
def __enter__(self) -> 'ServiceComponent':
"""Context manager entry."""
self.initialize()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit."""
self.dispose()
2. Buffer Interface (buffer.py)
from abc import ABC, abstractmethod
from typing import Optional, Generic, TypeVar, List, Callable, Any
T = TypeVar('T') # Generic type for buffer contents
class Buffer(Generic[T], ABC):
"""
Thread-safe buffer for storing and retrieving content with flow control.
This interface defines a thread-safe buffer with pause/resume functionality
and optional capacity constraints. It's designed for producer-consumer
scenarios where flow control is important.
"""
@property
@abstractmethod
def is_paused(self) -> bool:
"""Whether the buffer is currently paused."""
pass
@property
@abstractmethod
def is_closed(self) -> bool:
"""Whether the buffer has been closed."""
pass
@property
@abstractmethod
def size(self) -> int:
"""Current size of the buffer."""
pass
@property
@abstractmethod
def capacity(self) -> Optional[int]:
"""Maximum capacity of the buffer or None if unbounded."""
pass
@abstractmethod
def add(self, item: T) -> bool:
"""
Add an item to the buffer.
Args:
item: The item to add to the buffer.
Returns:
bool: True if the item was added, False if the buffer is closed or full.
"""
pass
@abstractmethod
def get(self, timeout: Optional[float] = None) -> Optional[T]:
"""
Get an item from the buffer, waiting if none is available.
Args:
timeout: How long to wait for an item in seconds, or None to wait indefinitely.
Returns:
The next item from the buffer, or None if timeout occurred or buffer is closed.
"""
pass
@abstractmethod
def peek(self) -> Optional[T]:
"""
View the next item without removing it.
Returns:
The next item or None if the buffer is empty.
"""
pass
@abstractmethod
def pause(self) -> None:
"""Pause the buffer, preventing consumers from getting content."""
pass
@abstractmethod
def resume(self) -> None:
"""Resume the buffer, allowing consumers to get content."""
pass
@abstractmethod
def close(self) -> None:
"""Close the buffer, preventing further additions."""
pass
@abstractmethod
def clear(self) -> None:
"""Clear all content from the buffer."""
pass
@abstractmethod
def register_item_callback(self, callback: Callable[[T], None]) -> None:
"""
Register a callback to be called when a new item is added.
Args:
callback: Function to call with each new item.
"""
pass
class MemoryBuffer(Buffer[T]):
"""Basic in-memory implementation of Buffer interface."""
pass
class RateLimitedBuffer(Buffer[T]):
"""Buffer with rate limiting capabilities."""
pass
class BatchingBuffer(Buffer[T]):
"""Buffer that accumulates items until batch criteria are met."""
pass
3. State Management Interface (state.py)
from abc import ABC, abstractmethod
from enum import Enum
from typing import Callable, Dict, Any, List, Optional, TypeVar, Generic
S = TypeVar('S', bound=Enum) # State type variable
E = TypeVar('E') # Event type variable
class StateTransitionError(Exception):
"""Error raised when a state transition is invalid."""
pass
class StateMachine(Generic[S, E], ABC):
"""
Generic state machine interface for managing component states.
This interface defines a state machine with controlled transitions
and event-based notifications.
"""
@property
@abstractmethod
def state(self) -> S:
"""Get the current state."""
pass
@abstractmethod
def can_transition_to(self, target_state: S) -> bool:
"""
Check if a transition to the target state is valid.
Args:
target_state: The state to check transition to.
Returns:
bool: True if the transition is valid, False otherwise.
"""
pass
@abstractmethod
def transition_to(self, target_state: S, data: Optional[Dict[str, Any]] = None) -> bool:
"""
Transition to the target state if valid.
Args:
target_state: The state to transition to.
data: Optional data associated with the transition.
Returns:
bool: True if the transition was successful, False otherwise.
Raises:
StateTransitionError: If the transition is invalid.
"""
pass
@abstractmethod
def register_state_change_callback(self, callback: Callable[[S, Dict[str, Any]], None]) -> None:
"""
Register a callback to be called when the state changes.
Args:
callback: Function to call with new state and transition data.
"""
pass
@abstractmethod
def handle_event(self, event: E, data: Optional[Dict[str, Any]] = None) -> bool:
"""
Process an event that may trigger a state transition.
Args:
event: The event to process.
data: Optional data associated with the event.
Returns:
bool: True if the event triggered a transition, False otherwise.
"""
pass
@abstractmethod
def get_allowed_transitions(self) -> Dict[S, List[S]]:
"""
Get all allowed state transitions.
Returns:
Dict mapping states to lists of states they can transition to.
"""
pass
class EventDrivenStateMachine(StateMachine[S, E]):
"""Implementation of a state machine driven by events."""
pass
4. Command Pattern Interface (commands.py)
from abc import ABC, abstractmethod
from typing import Dict, Any, Generic, TypeVar, List, Optional, Callable
S = TypeVar('S') # State type
R = TypeVar('R') # Result type
class Command(Generic[S, R], ABC):
"""
Command pattern interface for state-modifying operations.
This interface defines a command that can be executed against a state,
with execution tracking and optional undo capabilities.
"""
@abstractmethod
def execute(self, state: S) -> R:
"""
Execute the command against the state, returning a result.
Args:
state: The state to execute against.
Returns:
The result of the command execution.
"""
pass
@abstractmethod
def can_execute(self, state: S) -> bool:
"""
Check if the command can be executed in the current state.
Args:
state: The state to check.
Returns:
bool: True if the command can be executed, False otherwise.
"""
pass
@property
@abstractmethod
def is_undoable(self) -> bool:
"""Whether this command can be undone."""
pass
@abstractmethod
def undo(self, state: S) -> None:
"""
Undo the command if possible.
Args:
state: The state to undo against.
Raises:
NotImplementedError: If the command is not undoable.
"""
pass
def get_metadata(self) -> Dict[str, Any]:
"""
Get metadata about this command.
Returns:
Dict containing command metadata.
"""
return {}
class CommandProcessor(Generic[S]):
"""
Processes commands and maintains execution history.
This class is responsible for executing commands against a state,
maintaining a command history, and providing undo capabilities.
"""
def __init__(self, initial_state: S):
"""
Initialize the command processor.
Args:
initial_state: The initial state to process commands against.
"""
self.state = initial_state
self.history: List[Command] = []
self.observers: List[Callable[[Command], None]] = []
def execute(self, command: Command[S, R]) -> R:
"""
Execute a command and record it in history.
Args:
command: The command to execute.
Returns:
The result of command execution.
Raises:
ValueError: If the command cannot be executed in the current state.
"""
if not command.can_execute(self.state):
raise ValueError(f"Command {command} cannot be executed in current state")
result = command.execute(self.state)
self.history.append(command)
self._notify_observers(command)
return result
def undo_last(self) -> Optional[Command]:
"""
Undo the last undoable command if any.
Returns:
The command that was undone, or None if no undoable commands.
"""
# Find last undoable command
for i in range(len(self.history) - 1, -1, -1):
cmd = self.history[i]
if cmd.is_undoable:
cmd.undo(self.state)
return cmd
return None
def add_observer(self, observer: Callable[[Command], None]) -> None:
"""
Add an observer for command execution.
Args:
observer: Function to call with each executed command.
"""
self.observers.append(observer)
def _notify_observers(self, command: Command) -> None:
"""
Notify observers about command execution.
Args:
command: The command that was executed.
"""
for observer in self.observers:
observer(command)
class PauseCommand(Command[S, bool]):
"""Example command to pause a controllable component."""
pass
class ResumeCommand(Command[S, bool]):
"""Example command to resume a controllable component."""
pass
class CancelCommand(Command[S, bool]):
"""Example command to cancel a controllable component."""
pass
5. Concurrency Utilities (concurrency.py)
from abc import ABC, abstractmethod
from typing import Optional, Callable, Any, TypeVar, Generic, List
T = TypeVar('T') # Return type for callable
class CancellationToken:
"""
Token for coordinating cancellation across threads.
This class provides a way to request cancellation of operations
across different threads, with callback notification.
"""
def __init__(self):
"""Initialize a new cancellation token."""
self._cancelled = False
self._callbacks = []
@property
def is_cancelled(self) -> bool:
"""Whether cancellation has been requested."""
return self._cancelled
def cancel(self) -> None:
"""Request cancellation."""
if not self._cancelled:
self._cancelled = True
for callback in self._callbacks:
try:
callback()
except Exception:
pass # Swallow errors in callbacks
def register_callback(self, callback: Callable[[], None]) -> None:
"""
Register a callback to be called when cancellation is requested.
Args:
callback: Function to call on cancellation.
"""
if self._cancelled:
# If already cancelled, call immediately
try:
callback()
except Exception:
pass
else:
self._callbacks.append(callback)
def throw_if_cancelled(self) -> None:
"""
Throw OperationCanceledException if cancellation has been requested.
Raises:
OperationCanceledException: If cancellation has been requested.
"""
if self._cancelled:
raise OperationCanceledException()
class OperationCanceledException(Exception):
"""Exception thrown when an operation is cancelled."""
pass
class ThreadSafeCounter:
"""Thread-safe counter implementation."""
pass
class AsyncResult(Generic[T]):
"""
A thread-safe container for an asynchronously computed value.
This is similar to Future/Promise but simplified for our needs.
"""
def __init__(self):
"""Initialize a new AsyncResult."""
self._result = None
self._exception = None
self._is_set = False
self._callbacks = []
def set_result(self, result: T) -> None:
"""
Set the result value.
Args:
result: The result value.
"""
self._result = result
self._is_set = True
self._notify_callbacks()
def set_exception(self, exception: Exception) -> None:
"""
Set an exception as the result.
Args:
exception: The exception that occurred.
"""
self._exception = exception
self._is_set = True
self._notify_callbacks()
def get(self, timeout: Optional[float] = None) -> T:
"""
Get the result, waiting if it's not yet available.
Args:
timeout: How long to wait in seconds, or None to wait indefinitely.
Returns:
The result value.
Raises:
Exception: If an exception was set as the result.
TimeoutError: If the timeout expires before the result is available.
"""
# Implementation details
pass
def add_done_callback(self, callback: Callable[[Any], None]) -> None:
"""
Add a callback to be called when the result becomes available.
Args:
callback: Function to call with the result.
"""
# Implementation details
pass
def _notify_callbacks(self) -> None:
"""Notify all registered callbacks."""
# Implementation details
pass
class ThreadPool:
"""
Simple thread pool for executing tasks.
This provides a simplified subset of concurrent.futures.ThreadPoolExecutor
functionality focused on our specific needs.
"""
pass
6. Resource Management Interface (resources.py)
from abc import ABC, abstractmethod
from typing import Optional, Callable, List, Dict, Any, TypeVar, Generic
T = TypeVar('T') # Resource type
class ManagedResource(ABC):
"""
Interface for resources with lifecycle management.
This interface defines standard lifecycle operations for resources
that need initialization and cleanup.
"""
@property
@abstractmethod
def is_initialized(self) -> bool:
"""Whether the resource has been initialized."""
pass
@property
@abstractmethod
def is_disposed(self) -> bool:
"""Whether the resource has been disposed."""
pass
@abstractmethod
def initialize(self) -> bool:
"""
Initialize the resource.
Returns:
bool: True if initialization was successful, False otherwise.
"""
pass
@abstractmethod
def dispose(self) -> None:
"""
Clean up the resource.
This method should be idempotent and safe to call multiple times.
"""
pass
def __enter__(self) -> 'ManagedResource':
"""Context manager entry."""
self.initialize()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit."""
self.dispose()
class ResourceManager(ABC):
"""
Manager for tracking and coordinating multiple resources.
This interface defines operations for managing multiple resources
with dependency tracking and coordinated initialization/disposal.
"""
@abstractmethod
def register_resource(self, resource: ManagedResource,
dependencies: Optional[List[ManagedResource]] = None) -> None:
"""
Register a resource with the manager.
Args:
resource: The resource to register.
dependencies: Optional list of resources this resource depends on.
"""
pass
@abstractmethod
def unregister_resource(self, resource: ManagedResource) -> None:
"""
Unregister a resource from the manager.
Args:
resource: The resource to unregister.
"""
pass
@abstractmethod
def initialize_all(self) -> bool:
"""
Initialize all registered resources in dependency order.
Returns:
bool: True if all resources were initialized successfully, False otherwise.
"""
pass
@abstractmethod
def dispose_all(self) -> None:
"""
Dispose all registered resources in reverse dependency order.
"""
pass
@abstractmethod
def get_resource_status(self) -> Dict[ManagedResource, Dict[str, Any]]:
"""
Get status information for all registered resources.
Returns:
Dict mapping resources to their status information.
"""
pass
Application in Atlas Components
This standardized service architecture will be applied across multiple Atlas components:
1. Provider System Integration
The provider system will use the service components as follows:
- StreamBuffer will extend
Buffer[str]
fromcore.services.buffer
- StreamControl will implement
Controllable
fromcore.services.base
- ProviderGroup will use
CommandProcessor
for tracking provider selection - ProviderReliability will integrate with
ManagedResource
for cleanup
2. Agent Communication
Agents will communicate using the service architecture:
- AgentStreams will use
Buffer[Message]
for controlled message flow - AgentLifecycle will implement
ManagedResource
for proper initialization/cleanup - AgentCommands will use the command pattern for task execution tracking
- ControllerAgent will use
CommandProcessor
for orchestrating worker agents
3. Knowledge Integration
The knowledge system will leverage service components:
- StreamingRetrieval will use
Buffer
for progressive document loading - RetrievalCommands will track all document access operations
- KnowledgeResources will use
ResourceManager
for connection pooling
4. Graph Execution
The graph system will utilize the service architecture:
- NodeStreaming will implement
Buffer
for data flow between nodes - EdgeTransitions will use
StateMachine
for managing graph traversal - GraphCommands will track execution flow for debugging and monitoring
Implementation Timeline
Implementation will proceed in these phases:
Core Services Module Structure (May 17-18, 2025)
- Set up basic module structure and interfaces
- Implement buffer and state management
- Extract patterns from provider streaming
Command Pattern Integration (May 18-19, 2025)
- Implement command pattern interface
- Create command processor
- Build telemetry integration
Provider System Migration (May 19-20, 2025)
- Update streaming to use core services
- Integrate with command pattern
- Add telemetry hooks
Agent System Integration (May 20-24, 2025)
- Implement agent communication with services
- Create agent-specific commands
- Add command tracking to agent workflows
Knowledge and Graph Integration (May 25-31, 2025)
- Update knowledge system to use services
- Enhance graph execution with command tracking
- Implement progressive retrieval with buffers
Architectural Impact
This architecture will unify patterns across Atlas, providing:
- Complete Observability: Tracking all operations through commands
- Consistent State Management: Standardized state transitions
- Resource Safety: Proper initialization and cleanup
- Thread Safety: Consistent concurrency patterns
- Interruptible Processing: Pause/resume/cancel capabilities everywhere
- Telemetry Integration: Performance metrics across all components
The command pattern in particular will enable powerful diagnostic capabilities, execution tracing, and undo functionality that can be leveraged for more sophisticated user interactions.