Core Architecture Plan - May 19, 2025
This document outlines the architectural planning for Atlas, including module organization, implementation strategies, and development principles.
System Architecture Overview
The Atlas architecture follows a modular design with clear separation of concerns:
- Core Components: Base functionality, configuration, and error handling
- Agents: Specialized entities that perform specific tasks
- Knowledge System: Document storage, retrieval, and query processing
- Models: LLM provider integrations and abstraction layer
- Graph: State management and flow control
- Tools: Capability extensions for agents
- Orchestration: Coordination of multi-agent systems
Proposed File Structure
atlas/
├── agents/
│ ├── __init__.py
│ ├── base.py # Keep - extend with tool capabilities
│ ├── controller.py # Keep - enhance with message routing
│ ├── registry.py # Keep - extend for capability discovery
│ ├── worker.py # Keep - extend for specialized workers
│ ├── messaging/ # NEW directory for agent communication
│ │ ├── __init__.py
│ │ ├── message.py # NEW - StructuredMessage implementation
│ │ ├── routing.py # NEW - Message routing and delivery
│ │ └── serialization.py # NEW - Message serialization/deserialization
│ └── specialized/ # NEW directory for specialized agents
│ ├── __init__.py
│ ├── knowledge_agent.py # NEW - Specialized for knowledge operations
│ ├── tool_agent.py # NEW - Tool-capable agent base class
│ └── research_agent.py # NEW - Research-focused agent
├── tools/ # NEW top-level directory for tools
│ ├── __init__.py
│ ├── base.py # NEW - Tool interface definition
│ ├── registry.py # NEW - Tool registration and discovery
│ ├── permissions.py # NEW - Tool access control
│ ├── standard/ # NEW - Standard built-in tools
│ │ ├── __init__.py
│ │ ├── knowledge_tools.py # NEW - Knowledge base interaction tools
│ │ ├── web_tools.py # NEW - Web interaction tools
│ │ └── utility_tools.py # NEW - Utility functions as tools
│ └── mcp/ # NEW - MCP integration
│ ├── __init__.py
│ ├── adapter.py # NEW - MCP protocol adapter
│ ├── converter.py # NEW - Convert MCP tools to Atlas tools
│ └── registry.py # NEW - MCP tool registration
└── orchestration/ # ENHANCE existing directory
├── __init__.py
├── coordinator.py # Enhance for tool orchestration
├── parallel.py # Keep
├── scheduler.py # Keep
├── messaging/ # NEW - Messaging infrastructure
│ ├── __init__.py
│ ├── broker.py # NEW - Message broker implementation
│ └── queue.py # NEW - Message queue implementation
└── workflow/ # NEW - Workflow definitions
├── __init__.py
├── patterns.py # NEW - Common workflow patterns
└── templates.py # NEW - Workflow templates
Implementation Approach
1. Unified Streaming Architecture
The unified streaming architecture is a key architectural pattern that will be used throughout Atlas:
- Thread-Safe Buffer System: Provides controlled flow with pause/resume/cancel capabilities
- Standardized State Management: Consistent state transitions and event notifications
- Resource Lifecycle Management: Proper initialization, operation, and cleanup patterns
- Control Interfaces: Standardized interfaces for controlling components
This architecture enables:
- Controlled Data Flow: Fine-grained control over asynchronous data streams
- Backpressure Management: Prevention of memory issues with fast producers and slow consumers
- Interruptible Processing: Ability to pause, redirect, or cancel ongoing operations
- Progressive Results: Streaming partial results as they become available
The streaming architecture will be used for:
- Provider Communication: Control over model generation streams
- Agent-to-Agent Communication: Controlled flow between agents
- Knowledge Base Streaming: Progressive loading of context
- Graph Node Communication: Streaming data through workflow nodes
2. Multi-Agent System Architecture
The multi-agent system follows a controller-worker pattern where:
- Controller Agent: Coordinates task distribution and result aggregation
- Worker Agents: Perform specialized tasks with domain expertise
- Structured Messaging: Enables rich communication between agents
- Tool System: Extends agent capabilities with external functions
Implementation details:
class StructuredMessage:
"""Structured message format for agent communication with tool capability."""
def __init__(self, content, metadata=None, task_id=None, tool_calls=None):
self.content = content
self.metadata = metadata or {}
self.task_id = task_id or str(uuid.uuid4())
self.timestamp = time.time()
self.tool_calls = tool_calls or []
self.source_agent = None
self.target_agent = None
def to_dict(self):
"""Convert to dictionary for serialization."""
return {
"content": self.content,
"metadata": self.metadata,
"task_id": self.task_id,
"timestamp": self.timestamp,
"tool_calls": self.tool_calls,
"source_agent": self.source_agent,
"target_agent": self.target_agent
}
@classmethod
def from_dict(cls, data):
"""Create message from dictionary."""
msg = cls(
content=data["content"],
metadata=data["metadata"],
task_id=data["task_id"],
tool_calls=data["tool_calls"]
)
msg.timestamp = data["timestamp"]
msg.source_agent = data["source_agent"]
msg.target_agent = data["target_agent"]
return msg
2. Tool System Architecture
The tool system provides a standardized way to extend agent capabilities:
class AgentToolkit:
"""Registry for tools that agents can use."""
def __init__(self):
self.tools = {}
self.permissions = {}
def register_tool(self, name, function, description, schema):
"""Register a tool function with its schema."""
self.tools[name] = {
"function": function,
"description": description,
"schema": schema
}
def get_tool_descriptions(self, agent_id):
"""Get tool descriptions available to an agent."""
return {
name: tool["description"]
for name, tool in self.tools.items()
if self.has_permission(agent_id, name)
}
def execute_tool(self, agent_id, tool_name, arguments):
"""Execute a tool if agent has permission."""
if not self.has_permission(agent_id, tool_name):
raise PermissionError(f"Agent {agent_id} doesn't have permission to use {tool_name}")
if tool_name not in self.tools:
raise ValueError(f"Tool {tool_name} not found in registry")
tool = self.tools[tool_name]
return tool["function"](**arguments)
def has_permission(self, agent_id, tool_name):
"""Check if agent has permission to use tool."""
if agent_id not in self.permissions:
return False
if "*" in self.permissions[agent_id]:
return True
return tool_name in self.permissions[agent_id]
3. Specialized Agent Architecture
Tool-capable agents extend the base worker agent with tool functionality:
class ToolCapableAgent(WorkerAgent):
"""Worker agent with tool execution capabilities."""
def __init__(self, name, model_provider, toolkit=None):
super().__init__(name, model_provider)
self.toolkit = toolkit or AgentToolkit()
self.capabilities = []
def register_capability(self, capability):
"""Register a capability this agent provides."""
self.capabilities.append(capability)
def handle_message(self, message):
"""Process incoming message and execute any tool calls."""
# Handle basic message content
response_content = self._process_content(message.content)
# Handle any tool calls in the message
tool_results = []
for tool_call in message.tool_calls:
result = self.toolkit.execute_tool(
self.id,
tool_call["name"],
tool_call["arguments"]
)
tool_results.append({
"name": tool_call["name"],
"result": result
})
# Create response message
response = StructuredMessage(
content=response_content,
metadata={"type": "response", "in_reply_to": message.task_id},
task_id=str(uuid.uuid4()),
tool_results=tool_results
)
response.source_agent = self.id
response.target_agent = message.source_agent
return response
Implementation Status by Module
Models Module
Completed:
- ✅ Base provider interface with standardized types
- ✅ Factory pattern with registration mechanism
- ✅ Provider auto-detection from environment
- ✅ Environment variable integration with env module
- ✅ Unit tests for provider functionality
- ✅ Streaming implementation for all providers (Anthropic, OpenAI, Ollama)
- ✅ Enhanced API key validation
- ✅ Comprehensive error handling with standardized error types
- ✅ Token usage tracking and cost calculation
- ✅ Mock provider implementation for testing without API keys
Planned:
- ⏱️ Connection pooling for improved performance
- ⏱️ Provider health monitoring
- ⏱️ Advanced error handling with retries
- ⏱️ Cost optimization strategies
Knowledge System
Completed:
- ✅ Enhanced document chunking with semantic boundaries
- ✅ Document type detection (Markdown, code, general text)
- ✅ Specialized chunking strategies for different document types
- ✅ Code chunking based on function/class definitions
- ✅ Markdown chunking with frontmatter preservation
- ✅ Content deduplication system
- ✅ Content hashing with normalization
- ✅ Duplicate detection and reference tracking
- ✅ Configurable deduplication options
- ✅ Real-time directory ingestion
- ✅ Directory watching with watchdog integration
- ✅ File change detection and processing
- ✅ Cooldown mechanisms to prevent duplicate processing
- ✅ Enhanced progress indicators for ingestion and embedding
- ✅ Performance metrics and timing information
- ✅ Enhanced retrieval with filtering
- ✅ Metadata-based filtering with complex queries
- ✅ RetrievalFilter class for filter composition
- ✅ RetrievalResult with relevance scoring
- ✅ Reranking capabilities for better relevance
- ✅ Hybrid retrieval implementation
- ✅ Combined semantic and keyword search
- ✅ Weighted result combination
- ✅ Configurable search parameters
- ✅ Customizable embedding strategies
- ✅ Abstract embedding strategy interface
- ✅ Provider-specific implementations (Anthropic, ChromaDB)
- ✅ Strategy factory for easy instantiation
- ✅ Hybrid embedding approach support
- ✅ Enhanced document identification
- ✅ Simplified document ID format (parent_dir/filename.md)
- ✅ Consistent metadata with both full paths and simplified IDs
- ✅ Improved readability for debugging and document inspection
Planned:
- ⏱️ Query caching for performance optimization
- ⏱️ Performance telemetry for knowledge operations
- ⏱️ Support for multimedia document types
- ⏱️ Advanced relevance feedback mechanisms
Acceleration Pathways
These pathways represent specialized areas of functionality that can be accelerated in parallel with the MVP approach, depending on specific user priorities and resource availability.
Enhanced Knowledge Retrieval 🧠 [Accel] ✅
- ✅ Implement advanced embedding with multi-provider support
- ✅ Create sophisticated chunking with overlap strategies
- ✅ Enhance retrieval with hybrid search (keywords + semantic)
- ✅ Add document metadata filtering and faceted search
Multi-Agent Intelligence 🤖 [Accel]
Enhanced Provider System Architecture ⚡ [High Priority]
Current Status
- ✅ Created centralized
ProviderOptions
data class - ✅ Moved provider detection logic from CLI to factory layer
- ✅ Implemented provider resolution system
- ✅ Updated entry points to use factory interface
- ✅ Basic capability-based model selection
- ✅ Standardized provider interface with consistent methods
- ✅ Ollama provider improvements (dynamic model discovery, error handling, timeout configuration)
Next Phase: Enhanced Provider System
The current provider system has limitations:
- Limited capability system only covers basic operational features (inexpensive, efficient, premium)
- No provider fallback or aggregation capabilities
- Provider and model relationships are determined through pattern matching
- No task-aware model selection based on specialized capabilities
Provider Registry Architecture
class ProviderRegistry:
"""Central registry for provider, model, and capability information."""
def __init__(self):
# Core data structures
self._providers: Dict[str, Type[BaseProvider]] = {} # name -> Provider class
self._provider_models: Dict[str, List[str]] = {} # provider_name -> list of models
self._model_capabilities: Dict[str, Dict[str, CapabilityStrength]] = {} # model_name -> {capability -> strength}
self._capability_models: Dict[str, Set[str]] = {} # capability -> set of models
self._model_providers: Dict[str, str] = {} # model_name -> provider_name
Enhanced Capability System
class CapabilityStrength(IntEnum):
"""Enumeration of capability strength levels."""
BASIC = 1 # Has the capability but limited
MODERATE = 2 # Average capability
STRONG = 3 # Excellent at this capability
EXCEPTIONAL = 4 # Best-in-class for this capability
# Capability categories
CAPABILITY_INEXPENSIVE = "inexpensive" # Lower cost models
CAPABILITY_CODE = "code" # Code generation and understanding
CAPABILITY_REASONING = "reasoning" # Logical reasoning and problem-solving
CAPABILITY_DOMAIN_SCIENCE = "science" # Scientific domain knowledge
# Task capability mapping
TASK_CAPABILITY_REQUIREMENTS = {
"code_generation": {
CAPABILITY_CODE: CapabilityStrength.STRONG,
CAPABILITY_REASONING: CapabilityStrength.MODERATE
}
}
ProviderGroup Architecture
class ProviderGroup(BaseProvider):
"""A provider that encapsulates multiple providers with fallback capabilities."""
def __init__(
self,
providers: List[BaseProvider],
selection_strategy: Callable = ProviderSelectionStrategy.failover,
name: str = "provider_group",
):
"""Initialize a provider group with a list of providers."""
self.providers = providers
self.selection_strategy = selection_strategy
self._name = name
self._health_status = {provider: True for provider in providers}
self._context = {} # Context for selection strategy
Selection Strategies
class ProviderSelectionStrategy:
"""Strategy for selecting providers from a group."""
@staticmethod
def failover(providers: List[BaseProvider], context: Dict[str, Any] = None) -> List[BaseProvider]:
"""Returns providers in order, for failover purposes."""
return providers
@staticmethod
def round_robin(providers: List[BaseProvider], context: Dict[str, Any] = None) -> List[BaseProvider]:
"""Rotates through providers in sequence."""
# Implementation details
@staticmethod
def cost_optimized(providers: List[BaseProvider], context: Dict[str, Any] = None) -> List[BaseProvider]:
"""Sorts providers by estimated cost."""
# Implementation details
class TaskAwareSelectionStrategy:
"""Selects models based on task requirements and capability strengths."""
@staticmethod
def select(providers, context=None):
"""Select providers optimized for specific task types."""
# Implementation details
Factory Integration
def create_provider_group(
providers: List[str] = None,
models: List[str] = None,
strategy: str = "failover",
options: Optional[ProviderOptions] = None,
) -> ProviderGroup:
"""Create a provider group with multiple providers."""
# Implementation details
CLI and Configuration Integration
# Provider group options
provider_group = parser.add_argument_group("Provider Group Options")
provider_group.add_argument(
"--providers",
type=str,
nargs="+",
help="Multiple providers to use as a group",
)
provider_group.add_argument(
"--provider-strategy",
type=str,
choices=["failover", "round_robin", "cost_optimized", "task_aware"],
default="failover",
help="Strategy for selecting providers in a group",
)
# Task-aware selection options
task_group = parser.add_argument_group("Task-Aware Selection Options")
task_group.add_argument(
"--task-type",
type=str,
help="Task type for automatic capability selection",
)
task_group.add_argument(
"--capabilities",
type=str,
nargs="+",
help="Specific capability requirements (e.g., code:strong reasoning:moderate)",
)
This enhanced provider system architecture provides a comprehensive framework for provider management, capability-based selection, and resilient operations through provider groups with fallback capabilities.
Core Architecture Standardization
Unified Core Service Module
To standardize architectural patterns across Atlas, we’ll create a comprehensive atlas.core.services
module structure:
atlas/core/services/
├── __init__.py # Module exports
├── base.py # Base service interfaces
├── state.py # State management patterns
├── buffer.py # Thread-safe buffer implementations
├── commands.py # Command pattern implementation
├── concurrency.py # Thread safety utilities
└── resources.py # Resource lifecycle management
Command Pattern Architecture
The command pattern will provide complete observability into system operations:
# Command pattern interface
class Command[S, R]:
def execute(state: S) -> R # Execute with state, return result
def can_execute(state: S) -> bool # Check if executable in current state
def is_undoable() -> bool # Whether command can be undone
def undo(state: S) -> None # Undo the command if possible
# Command processor for execution and tracking
class CommandProcessor[S]:
def execute(command: Command) -> Any # Execute and track command
def undo_last() -> Optional[Command] # Undo last command
def get_history() -> List[Command] # Get execution history
This will provide a foundation for all services in Atlas, not just providers. This standardization will ensure consistency across components and reduce duplicated patterns.
Thread Safety Standardization
We’ll standardize thread safety with a consistent approach across the codebase:
- A core
atlas.core.concurrency
module with standard patterns - Consistent use of locks, events, and threading primitives
- Standardized thread naming and management
- Unified patterns for cancellation and interrupt handling
Interface-First Design
The successful interface-driven approach used in the streaming system will be formalized:
- Clear interfaces in appropriate core modules
- Consistent use of abstract base classes
- Thorough documentation of interface contracts
- Base implementations for common requirements
Error System Extensions
We’ll extend our error system to support domain-specific errors in a standardized way:
- Create extension points in core.errors for domain-specific error types
- Define consistent error hierarchies across modules
- Standardize error handling with utilities like safe_execute
- Ensure consistent telemetry for error tracking
Streaming Architecture Applications
The streaming architecture pattern will be extracted to core modules and applied across:
- Agent-to-Agent Communication: Controlled messaging between agents
- Knowledge Base Integration: Progressive document loading and retrieval
- Graph Execution: Streaming data through LangGraph nodes
- Tool Integration: Controlled streaming for long-running tool operations
Development Principles
CLEAN BREAK DECISION
We are adopting an aggressive clean break approach that emphasizes comprehensive redesign over incremental integration. This represents a critical pivot to ensure architectural clarity and consistency. See the Clean Break Architecture Manifesto for complete details.
Clean Break Philosophy: Prioritize building high-quality, robust APIs over maintaining backward compatibility with legacy code.
- Decisive Redesign: When architectural improvements require it, fully redesign components rather than preserving compatibility
- Complete Component Replacement: Replace entire subsystems with service-oriented alternatives
- Unified Design Vision: Maintain architectural consistency through top-down design
- Service-First Architecture: Build all components around core services from inception
Redesign with Services: Create all components around a unified service architecture.
- Design service interfaces before implementation
- Implement versioned state containers for all components
- Use command pattern for all operations
- Ensure all components emit events for observability
Test-Driven Development: Create comprehensive tests alongside or before implementation.
Complete Documentation: Provide thorough documentation for all new components with clear examples.
Robust Error Handling: Implement consistent, informative error handling throughout all components.
Type Safety: Use comprehensive type hints and validation for better code quality and reliability.
Consistent Environment Configuration: Maintain a clear precedence model for configuration (CLI args > env vars > defaults).
Modular Design: Create loosely coupled components that can be used independently.
Documentation-Driven Implementation: Use documentation to guide implementation while resolving any discrepancies:
- Start with thoroughly documenting the expected behavior and interfaces
- When discrepancies arise between documentation and implementation needs:
- Favor the approach that provides better API design, performance, and maintainability
- Update documentation to match implementation decisions when technical requirements necessitate changes
- Preserve the original design intent while adapting to technical realities
- Keep both implementation and documentation in sync throughout development