Matrix Architecture Code Examples
This document contains all Python code examples from the Matrix architecture.
Core Type System and Protocols
python
# Core protocol interfaces that define behaviors
class Observable(Protocol[E]):
"""Interface for objects that emit events."""
def add_observer(self, observer: Callable[[E, Any], None]) -> Callable[[], None]: ...
def remove_observer(self, observer: Callable[[E, Any], None]) -> None: ...
def notify(self, event: E, data: Any = None) -> None: ...
class Versioned(Protocol[S]):
"""Interface for objects with versioned state."""
def get_current_version_id(self) -> VersionId: ...
def get_version(self, version_id: Optional[VersionId] = None) -> S: ...
def commit(self, state: S, description: str = "") -> VersionId: ...
def get_history(self) -> List[Tuple[VersionId, S, str, float]]: ...
class Projectable(Protocol[S, P]):
"""Interface for objects that can be projected into different views."""
def add_projection(self, name: str, projection_fn: Callable[[S], P]) -> None: ...
def project(self, projection: str = "default") -> P: ...
class Effectful(Protocol[V]):
"""Interface for operations with explicit side effects."""
def with_effect(self, effect: Effect) -> 'Effectful[V]': ...
def map(self, fn: Callable[[V], Any]) -> 'Effectful[Any]': ...
def bind(self, fn: Callable[[V], 'Effectful[Any]']) -> 'Effectful[Any]': ...
def run(self, handler: Callable[[Effect], Any]) -> V: ...
class QuantumUnit(Protocol[S, R]):
"""Interface for parallelizable computation units."""
def can_execute(self, completed_units: Set[Any]) -> bool: ...
def execute(self, context: S) -> R: ...
def get_dependencies(self) -> List[Any]: ...
def get_result(self) -> Optional[R]: ...
class Boundary(Protocol[T_in, T_out]):
"""Interface for system boundaries that convert between domains."""
def validate(self, data: T_in) -> ValidationResult[T_out]: ...
def process(self, data: T_in) -> T_out: ...
def handle_error(self, error: Exception) -> BoundaryError: ...
Core Error Classes
python
class AtlasError(Exception):
"""Base class for all Atlas errors."""
def __init__(self, message: str, details: Optional[Dict[str, Any]] = None):
self.message = message
self.details = details or {}
super().__init__(message)
class BoundaryError(AtlasError):
"""Error that occurs at system boundaries."""
def __init__(self, message: str, boundary: str, details: Optional[Dict[str, Any]] = None):
self.boundary = boundary
super().__init__(message, details)
class ValidationError(BoundaryError):
"""Error during data validation at a boundary."""
def __init__(self, message: str, boundary: str, validation_errors: List[Dict[str, Any]]):
self.validation_errors = validation_errors
super().__init__(message, boundary, {"validation_errors": validation_errors})
class NetworkError(BoundaryError):
"""Error during network operations at a boundary."""
def __init__(self, message: str, status_code: Optional[int] = None,
response: Optional[Any] = None):
self.status_code = status_code
self.response = response
super().__init__(message, "network", {
"status_code": status_code,
"response": str(response)[:1000] if response else None
})
Core Type Variables and Definitions
python
from typing import TypeVar, Generic, Protocol, Callable, Dict, List, Any, Optional, Union, Set, Tuple
from dataclasses import dataclass, field
from enum import Enum, auto
from datetime import datetime
import uuid
import copy
import time
import threading
import queue
from collections import defaultdict
from abc import ABC, abstractmethod
# Core type variables for generic definitions
T = TypeVar('T') # Generic data type
S = TypeVar('S') # Generic state type
R = TypeVar('R') # Generic result type
E = TypeVar('E') # Generic event type
V = TypeVar('V') # Generic value type
P = TypeVar('P') # Generic perspective type
K = TypeVar('K') # Generic key type
C = TypeVar('C') # Generic context type
M = TypeVar('M') # Generic message type
# Identity and Reference Types
EntityId = str # Unique identifier for any entity in the system
VersionId = str # Unique identifier for a versioned state
ResourceId = str # Identifier for a resource
EventId = str # Identifier for an event
# Event System Types
class EventType(Enum):
"""Core event types in the system."""
# System lifecycle events
SYSTEM_INIT = auto()
SYSTEM_SHUTDOWN = auto()
# Provider events
PROVIDER_CREATED = auto()
PROVIDER_CONNECTED = auto()
PROVIDER_DISCONNECTED = auto()
PROVIDER_ERROR = auto()
# Agent events
AGENT_CREATED = auto()
AGENT_STARTED = auto()
AGENT_STOPPED = auto()
AGENT_ERROR = auto()
# Stream events
STREAM_STARTED = auto()
STREAM_CHUNK = auto()
STREAM_PAUSED = auto()
STREAM_RESUMED = auto()
STREAM_COMPLETED = auto()
STREAM_ERROR = auto()
# Workflow events
WORKFLOW_STARTED = auto()
WORKFLOW_NODE_ENTERED = auto()
WORKFLOW_NODE_EXITED = auto()
WORKFLOW_COMPLETED = auto()
WORKFLOW_ERROR = auto()
# Knowledge events
DOCUMENT_ADDED = auto()
DOCUMENT_PROCESSED = auto()
RETRIEVAL_STARTED = auto()
RETRIEVAL_COMPLETED = auto()
# Command events
COMMAND_EXECUTED = auto()
COMMAND_REVERTED = auto()
COMMAND_FAILED = auto()
@dataclass
class Event(Generic[T]):
"""Core event structure for the event bus."""
id: EventId = field(default_factory=lambda: str(uuid.uuid4()))
type: EventType
data: Optional[T] = None
timestamp: float = field(default_factory=time.time)
source: Optional[EntityId] = None
# Effect System Types
class EffectType(Enum):
"""Types of side effects in the system."""
# I/O effects
FILE_READ = auto()
FILE_WRITE = auto()
NETWORK_REQUEST = auto()
DATABASE_QUERY = auto()
# State effects
STATE_READ = auto()
STATE_MODIFY = auto()
# System effects
THREAD_CREATE = auto()
PROCESS_SPAWN = auto()
RESOURCE_ALLOCATE = auto()
RESOURCE_RELEASE = auto()
# External effects
MODEL_CALL = auto()
TOOL_INVOKE = auto()
EXTERNAL_SERVICE = auto()
@dataclass
class Effect:
"""Representation of a side effect."""
type: EffectType
payload: Any = None
description: str = ""
# Lifecycle State Types
class LifecycleState(Enum):
"""Lifecycle states for stateful components."""
CREATED = auto()
INITIALIZING = auto()
READY = auto()
ACTIVE = auto()
PAUSED = auto()
STOPPING = auto()
STOPPED = auto()
ERROR = auto()
DISPOSED = auto()
# Stream Control Types
class StreamState(Enum):
"""States for streaming operations."""
PENDING = auto()
ACTIVE = auto()
PAUSED = auto()
COMPLETED = auto()
CANCELLED = auto()
ERROR = auto()
# Resource Management Types
class ResourceType(Enum):
"""Types of managed resources."""
CONNECTION = auto()
FILE = auto()
THREAD = auto()
PROCESS = auto()
SOCKET = auto()
DATABASE = auto()
MODEL = auto()
@dataclass
class Resource:
"""Representation of a managed resource."""
id: ResourceId
type: ResourceType
state: LifecycleState = LifecycleState.CREATED
created_at: float = field(default_factory=time.time)
metadata: Dict[str, Any] = field(default_factory=dict)
Reactive Event Mesh Implementation
python
class EventBus:
"""Central event dispatch system for the Reactive Event Mesh."""
def __init__(self):
self._subscribers: Dict[EventType, List[Callable[[Event[Any]], None]]] = defaultdict(list)
self._middleware: List[Callable[[Event[Any]], Optional[Event[Any]]]] = []
self._history: List[Event[Any]] = []
self._history_limit: int = 1000
self._lock: threading.RLock = threading.RLock()
def subscribe(self, event_type: EventType, handler: Callable[[Event[Any]], None]) -> Callable[[], None]:
"""Subscribe to events of a specific type.
Args:
event_type: Type of events to subscribe to
handler: Callback function to handle events
Returns:
A function that can be called to unsubscribe
"""
with self._lock:
self._subscribers[event_type].append(handler)
def unsubscribe():
with self._lock:
if handler in self._subscribers[event_type]:
self._subscribers[event_type].remove(handler)
return unsubscribe
def publish(self, event_type: EventType, data: Any = None, source: Optional[EntityId] = None) -> EventId:
"""Publish an event to all subscribers.
Args:
event_type: Type of event to publish
data: Data payload for the event
source: Identifier for the event source
Returns:
The ID of the published event
"""
event = Event(type=event_type, data=data, source=source)
with self._lock:
# Apply middleware
current_event = event
for middleware in self._middleware:
current_event = middleware(current_event)
if current_event is None:
return event.id # Event filtered out
# Save to history
self._history.append(current_event)
if len(self._history) > self._history_limit:
self._history = self._history[-self._history_limit:]
# Notify subscribers
for handler in self._subscribers.get(event_type, []):
try:
handler(current_event)
except Exception as e:
# In a real implementation, log this error
pass
return event.id
def add_middleware(self, middleware: Callable[[Event[Any]], Optional[Event[Any]]]) -> Callable[[], None]:
"""Add middleware to process events before they reach subscribers.
Args:
middleware: Function that transforms events or filters them out
Returns:
A function that can be called to remove the middleware
"""
with self._lock:
self._middleware.append(middleware)
def remove_middleware():
with self._lock:
if middleware in self._middleware:
self._middleware.remove(middleware)
return remove_middleware
def get_history(self, event_type: Optional[EventType] = None, limit: int = 100) -> List[Event[Any]]:
"""Get recent events from the history.
Args:
event_type: Optional filter for specific event types
limit: Maximum number of events to return
Returns:
List of recent events, most recent first
"""
with self._lock:
if event_type is None:
return list(reversed(self._history))[:limit]
else:
return list(reversed([e for e in self._history if e.type == event_type]))[:limit]
Temporal Versioning Implementation
python
@dataclass
class VersionedState:
version_id: str
data: Any
parent_version_id: Optional[str] = None
timestamp: float = field(default_factory=time.time)
change_description: str = ""
class TemporalStore:
def __init__(self):
self._versions = {}
self._current_version_id = None
def commit(self, data, description=""):
version_id = str(uuid.uuid4())
version = VersionedState(
version_id=version_id,
data=copy.deepcopy(data),
parent_version_id=self._current_version_id,
change_description=description
)
self._versions[version_id] = version
self._current_version_id = version_id
return version_id
def get(self, version_id=None):
if version_id is None:
version_id = self._current_version_id
return self._versions[version_id].data
def get_history(self):
# Return full version history
history = []
version_id = self._current_version_id
while version_id:
version = self._versions[version_id]
history.append(version)
version_id = version.parent_version_id
return list(reversed(history))
Perspective Shifting Implementation
python
class PerspectiveAware(Generic[S, P]):
"""A class that provides multiple views of the same data based on context.
This implementation supports dynamic shifting between different abstraction
levels or viewpoints on the same underlying data.
"""
def __init__(self, data: S):
"""Initialize with the raw data and empty perspectives map.
Args:
data: The raw data to provide perspectives on
"""
self._data: S = data
self._perspectives: Dict[str, Callable[[S], P]] = {}
self._current_perspective: str = "default"
self._lock: threading.RLock = threading.RLock()
def add_perspective(self, name: str, transform_fn: Callable[[S], P]) -> None:
"""Add a new perspective transformation function.
Args:
name: The name of the perspective
transform_fn: Function that transforms data to this perspective
"""
with self._lock:
self._perspectives[name] = transform_fn
def remove_perspective(self, name: str) -> bool:
"""Remove a perspective transformation.
Args:
name: The name of the perspective to remove
Returns:
True if the perspective was removed, False if it wasn't found
"""
with self._lock:
if name in self._perspectives:
del self._perspectives[name]
if self._current_perspective == name:
self._current_perspective = "default"
return True
return False
def set_default_perspective(self, name: str) -> None:
"""Set the current default perspective.
Args:
name: Name of the perspective to use as default
Raises:
ValueError: If the perspective doesn't exist
"""
with self._lock:
if name not in self._perspectives and name != "default":
raise ValueError(f"Perspective '{name}' not found")
self._current_perspective = name
def view(self, perspective: Optional[str] = None) -> Union[S, P]:
"""View the data through a specific perspective.
Args:
perspective: Name of the perspective to use,
or None to use the current default
Returns:
The transformed data, or raw data if perspective not found
"""
with self._lock:
perspective = perspective or self._current_perspective
if perspective in self._perspectives:
return self._perspectives[perspective](self._data)
# Fall back to raw data
return self._data
def get_available_perspectives(self) -> List[str]:
"""Get the names of all available perspectives.
Returns:
List of perspective names
"""
with self._lock:
return list(self._perspectives.keys())
def update_data(self, data: S) -> None:
"""Update the underlying data without changing perspectives.
Args:
data: New data to use
"""
with self._lock:
self._data = data
State Projection Implementation
python
class Delta(Generic[S]):
"""Represents a change to be applied to a state."""
@staticmethod
def function_delta(fn: Callable[[S], S]) -> 'Delta[S]':
"""Create a delta from a function that transforms state.
Args:
fn: Function that takes state and returns modified state
Returns:
A new Delta instance
"""
return FunctionDelta(fn)
@staticmethod
def patch_delta(patch: Dict[str, Any]) -> 'Delta[S]':
"""Create a delta from a dictionary patch.
Args:
patch: Dictionary of updates to apply to state
Returns:
A new Delta instance
"""
return PatchDelta(patch)
@abstractmethod
def apply(self, state: S) -> S:
"""Apply this delta to a state.
Args:
state: The state to update
Returns:
The updated state
"""
pass
class FunctionDelta(Delta[S]):
"""A delta represented by a function that transforms state."""
def __init__(self, fn: Callable[[S], S]):
self.fn = fn
def apply(self, state: S) -> S:
return self.fn(state)
class PatchDelta(Delta[S]):
"""A delta represented by a dictionary patch."""
def __init__(self, patch: Dict[str, Any]):
self.patch = patch
def apply(self, state: S) -> S:
if not isinstance(state, dict):
raise TypeError("PatchDelta can only be applied to dict-like states")
return {**state, **self.patch}
@dataclass
class DeltaMetadata:
"""Metadata about a delta change."""
timestamp: float = field(default_factory=time.time)
source: Optional[EntityId] = None
description: str = ""
tags: List[str] = field(default_factory=list)
class StateProjector(Generic[S, P]):
"""Manages state through deltas and supports multiple projections.
This class implements a space-efficient approach to state management by
storing only the initial state and subsequent changes (deltas), then
applying projection functions to provide different views.
"""
def __init__(self, initial_state: S):
"""Initialize with initial state.
Args:
initial_state: The initial state
"""
self._initial_state: S = copy.deepcopy(initial_state)
self._deltas: List[Tuple[Delta[S], DeltaMetadata]] = []
self._projections: Dict[str, Callable[[S], P]] = {}
self._lock: threading.RLock = threading.RLock()
def apply_delta(self, delta: Union[Delta[S], Dict[str, Any], Callable[[S], S]],
description: str = "",
source: Optional[EntityId] = None,
tags: Optional[List[str]] = None) -> int:
"""Apply a delta to the state.
Args:
delta: The delta to apply (Delta object, dict patch or function)
description: Description of the change
source: Source of the change
tags: Optional tags for categorizing the change
Returns:
Index of the newly added delta
"""
with self._lock:
# Convert to Delta if needed
if not isinstance(delta, Delta):
if callable(delta):
delta = Delta.function_delta(delta)
else:
delta = Delta.patch_delta(delta)
# Create metadata
metadata = DeltaMetadata(
description=description,
source=source,
tags=tags or []
)
# Store delta with metadata
self._deltas.append((delta, metadata))
return len(self._deltas) - 1
def add_projection(self, name: str, projection_fn: Callable[[S], P]) -> None:
"""Add a new projection function.
Args:
name: Name of the projection
projection_fn: Function that transforms state into projection
"""
with self._lock:
self._projections[name] = projection_fn
def project(self, projection: str = "default", until: Optional[int] = None) -> P:
"""Project the current state through a specific projection.
Args:
projection: Name of the projection to use
until: Optional index to stop applying deltas at
Returns:
The projected state
Raises:
ValueError: If projection doesn't exist
"""
with self._lock:
# Validate projection
if projection not in self._projections:
raise ValueError(f"Projection '{projection}' not found")
# Compute current state
state = copy.deepcopy(self._initial_state)
deltas = self._deltas if until is None else self._deltas[:until]
# Apply deltas sequentially
for delta, _ in deltas:
state = delta.apply(state)
# Apply projection
return self._projections[projection](state)
def get_current_state(self) -> S:
"""Get the current state without any projection.
Returns:
The current state
"""
with self._lock:
# Compute current state
state = copy.deepcopy(self._initial_state)
# Apply all deltas
for delta, _ in self._deltas:
state = delta.apply(state)
return state
def get_delta_history(self, with_tags: Optional[List[str]] = None) -> List[Tuple[int, DeltaMetadata]]:
"""Get history of changes with their metadata.
Args:
with_tags: Optional filter to include only deltas with specific tags
Returns:
List of (index, metadata) pairs
"""
with self._lock:
if with_tags is None:
return [(i, meta) for i, (_, meta) in enumerate(self._deltas)]
else:
return [
(i, meta) for i, (_, meta) in enumerate(self._deltas)
if any(tag in meta.tags for tag in with_tags)
]
def reset_to_initial(self) -> None:
"""Clear all deltas and return to initial state."""
with self._lock:
self._deltas = []
Effect System Implementation
python
class EffectfulOperation(Generic[V]):
"""Interface for operations with tracked effects."""
@abstractmethod
def map(self, fn: Callable[[V], Any]) -> 'EffectfulOperation[Any]':
"""Transform the result value without adding effects.
Args:
fn: Function to transform the result
Returns:
New operation with transformed result
"""
pass
@abstractmethod
def bind(self, fn: Callable[[V], 'EffectfulOperation[Any]']) -> 'EffectfulOperation[Any]':
"""Chain with another effectful operation.
Args:
fn: Function that returns another effectful operation
Returns:
New operation with combined effects
"""
pass
@abstractmethod
def run(self, handler: Callable[[Effect], Any]) -> V:
"""Execute the operation with a specific effect handler.
Args:
handler: Function to handle effects
Returns:
The result value
"""
pass
@abstractmethod
def get_effects(self) -> List[Effect]:
"""Get all effects in this operation.
Returns:
List of effects
"""
pass
@classmethod
def pure(cls, value: V) -> 'EffectfulOperation[V]':
"""Create an effectful operation with no effects.
Args:
value: The value to wrap
Returns:
New effectful operation
"""
return EffectMonad(value, [])
@classmethod
def effect(cls, effect_type: EffectType, payload: Any = None,
description: str = "") -> 'EffectfulOperation[None]':
"""Create an operation with a single effect.
Args:
effect_type: Type of effect
payload: Effect payload
description: Description of the effect
Returns:
New effectful operation
"""
effect = Effect(type=effect_type, payload=payload, description=description)
return EffectMonad(None, [effect])
class EffectHandler:
"""Handles and processes effect descriptions."""
def __init__(self):
self._handlers: Dict[EffectType, Callable[[Any], Any]] = {}
self._fallback: Optional[Callable[[Effect], Any]] = None
self._executed_effects: List[Tuple[Effect, Any]] = []
self._lock: threading.RLock = threading.RLock()
def register_handler(self, effect_type: EffectType,
handler: Callable[[Any], Any]) -> None:
"""Register a handler for a specific effect type.
Args:
effect_type: Type of effect to handle
handler: Function to handle the effect
"""
with self._lock:
self._handlers[effect_type] = handler
def set_fallback_handler(self, handler: Callable[[Effect], Any]) -> None:
"""Set a fallback handler for unknown effect types.
Args:
handler: Function to handle unknown effects
"""
with self._lock:
self._fallback = handler
def handle(self, effect: Effect) -> Any:
"""Handle a specific effect.
Args:
effect: The effect to handle
Returns:
Result of handling the effect
Raises:
ValueError: If no handler exists for the effect type
"""
with self._lock:
if effect.type in self._handlers:
result = self._handlers[effect.type](effect.payload)
self._executed_effects.append((effect, result))
return result
elif self._fallback is not None:
result = self._fallback(effect)
self._executed_effects.append((effect, result))
return result
raise ValueError(f"No handler for effect: {effect.type}")
def get_executed_effects(self) -> List[Tuple[Effect, Any]]:
"""Get all effects that have been handled.
Returns:
List of (effect, result) pairs
"""
with self._lock:
return list(self._executed_effects)
def clear_history(self) -> None:
"""Clear the history of executed effects."""
with self._lock:
self._executed_effects = []
class EffectMonad(EffectfulOperation[V]):
"""Implementation of the effectful operation interface using a monad pattern.
This class encapsulates a value and associated effects, allowing for
functional composition of operations with explicit tracking of side effects.
"""
def __init__(self, value: V, effects: List[Effect]):
"""Initialize with a value and list of effects.
Args:
value: The wrapped value
effects: List of effects associated with this value
"""
self.value: V = value
self.effects: List[Effect] = effects
def map(self, fn: Callable[[V], Any]) -> 'EffectMonad[Any]':
"""Transform the result value without adding effects.
Args:
fn: Function to transform the value
Returns:
New EffectMonad with transformed value and same effects
"""
return EffectMonad(fn(self.value), self.effects)
def bind(self, fn: Callable[[V], 'EffectfulOperation[Any]']) -> 'EffectMonad[Any]':
"""Chain with another effectful operation.
Args:
fn: Function that returns another effectful operation
Returns:
New EffectMonad with combined value and effects
"""
result = fn(self.value)
return EffectMonad(result.run(lambda e: None),
self.effects + result.get_effects())
def run(self, handler: Callable[[Effect], Any]) -> V:
"""Execute the operation, handling all effects.
Args:
handler: Function to handle effects
Returns:
The wrapped value
"""
for effect in self.effects:
handler(effect)
return self.value
def get_effects(self) -> List[Effect]:
"""Get all effects in this operation.
Returns:
List of effects
"""
return list(self.effects)
def with_effect(self, effect: Effect) -> 'EffectMonad[V]':
"""Add an effect to this operation.
Args:
effect: The effect to add
Returns:
New EffectMonad with the additional effect
"""
return EffectMonad(self.value, self.effects + [effect])
def then(self, other: 'EffectfulOperation[Any]') -> 'EffectMonad[Any]':
"""Sequence this operation with another, ignoring this operation's value.
Args:
other: Next operation to execute
Returns:
New EffectMonad with the other's value and combined effects
"""
return EffectMonad(other.run(lambda e: None),
self.effects + other.get_effects())
@staticmethod
def sequence(operations: List['EffectfulOperation[Any]']) -> 'EffectMonad[List[Any]]':
"""Combine multiple effectful operations into one.
Args:
operations: List of operations to combine
Returns:
New EffectMonad with list of results and combined effects
"""
if not operations:
return EffectMonad.pure([])
results = []
all_effects = []
for op in operations:
results.append(op.run(lambda e: None))
all_effects.extend(op.get_effects())
return EffectMonad(results, all_effects)
Quantum Partitioning Implementation
python
class UnitState(Enum):
"""Possible states for a quantum unit."""
PENDING = auto() # Not yet executed
READY = auto() # Ready to execute (dependencies satisfied)
RUNNING = auto() # Currently executing
COMPLETED = auto() # Successfully completed
FAILED = auto() # Execution failed
CANCELLED = auto() # Execution cancelled
@dataclass
class UnitResult(Generic[R]):
"""Result of a quantum unit execution."""
success: bool
value: Optional[R] = None
error: Optional[Exception] = None
execution_time: float = 0.0
metadata: Dict[str, Any] = field(default_factory=dict)
class QuantumUnit(Generic[S, R]):
"""A self-contained unit of computation with explicit dependencies.
This class represents a parallelizable computation unit that can be
scheduled based on its dependencies. Units exist in a state of
potential execution until their dependencies are resolved.
"""
def __init__(self, fn: Callable[[S], R],
dependencies: Optional[List['QuantumUnit']] = None,
name: Optional[str] = None,
timeout: Optional[float] = None):
"""Initialize with function and dependencies.
Args:
fn: Function that performs the computation
dependencies: Other units that must complete before this one
name: Optional name for the unit
timeout: Optional timeout in seconds
"""
self.id: str = str(uuid.uuid4())
self.name: str = name or f"Unit-{self.id[:8]}"
self.fn: Callable[[S], R] = fn
self.dependencies: List[QuantumUnit] = dependencies or []
self.state: UnitState = UnitState.PENDING
self.result: Optional[UnitResult[R]] = None
self.timeout: Optional[float] = timeout
self.metadata: Dict[str, Any] = {}
self._lock: threading.RLock = threading.RLock()
def can_execute(self, completed_units: Set[str]) -> bool:
"""Check if this unit can be executed.
Args:
completed_units: Set of IDs of completed units
Returns:
True if all dependencies are satisfied
"""
with self._lock:
if self.state != UnitState.PENDING:
return False
return all(dep.id in completed_units for dep in self.dependencies)
def execute(self, context: S) -> UnitResult[R]:
"""Execute this unit's function.
Args:
context: The execution context
Returns:
Result of the execution
"""
with self._lock:
if self.state == UnitState.COMPLETED:
return self.result
if self.state == UnitState.CANCELLED:
return UnitResult(success=False, error=ValueError("Unit was cancelled"))
self.state = UnitState.RUNNING
start_time = time.time()
try:
# Execute with optional timeout
if self.timeout is not None:
# A real implementation would use a proper timeout mechanism
value = self.fn(context)
else:
value = self.fn(context)
execution_time = time.time() - start_time
result = UnitResult(
success=True,
value=value,
execution_time=execution_time
)
except Exception as e:
execution_time = time.time() - start_time
result = UnitResult(
success=False,
error=e,
execution_time=execution_time
)
with self._lock:
self.result = result
self.state = UnitState.COMPLETED if result.success else UnitState.FAILED
return result
def cancel(self) -> bool:
"""Cancel this unit if it hasn't started yet.
Returns:
True if successfully cancelled
"""
with self._lock:
if self.state == UnitState.PENDING:
self.state = UnitState.CANCELLED
return True
return False
def get_dependencies(self) -> List['QuantumUnit']:
"""Get this unit's dependencies.
Returns:
List of dependency units
"""
return list(self.dependencies)
def get_result(self) -> Optional[R]:
"""Get this unit's result value, if any.
Returns:
Result value if execution was successful, None otherwise
"""
with self._lock:
if self.result and self.result.success:
return self.result.value
return None
class ExecutionPlan:
"""A structured plan for executing quantum units."""
def __init__(self, units: List[QuantumUnit]):
"""Build an execution plan from units.
Args:
units: List of units to schedule
"""
self.units = {unit.id: unit for unit in units}
self.graph = self._build_dependency_graph()
self.levels = self._compute_levels()
def _build_dependency_graph(self) -> Dict[str, Set[str]]:
"""Build a dependency graph from units.
Returns:
Dict mapping unit IDs to sets of dependency IDs
"""
graph = {}
for unit_id, unit in self.units.items():
graph[unit_id] = {dep.id for dep in unit.get_dependencies()}
return graph
def _compute_levels(self) -> List[List[str]]:
"""Compute execution levels for parallelization.
Units in the same level can be executed in parallel.
Returns:
List of levels, each containing unit IDs
"""
# Compute in-degree (number of dependencies) for each unit
in_degree = {unit_id: len(deps) for unit_id, deps in self.graph.items()}
# Units with no dependencies go in the first level
zero_degree = [unit_id for unit_id, degree in in_degree.items() if degree == 0]
levels = []
while zero_degree:
levels.append(zero_degree)
next_zero_degree = []
for unit_id in zero_degree:
# Find units that depend on the current units
for next_id, deps in self.graph.items():
if unit_id in deps:
in_degree[next_id] -= 1
if in_degree[next_id] == 0:
next_zero_degree.append(next_id)
zero_degree = next_zero_degree
# Check for cycles
if sum(len(level) for level in levels) < len(self.units):
raise ValueError("Circular dependencies detected in quantum units")
return levels
def get_level_count(self) -> int:
"""Get the number of execution levels.
Returns:
Number of levels
"""
return len(self.levels)
def get_units_at_level(self, level: int) -> List[QuantumUnit]:
"""Get units at a specific execution level.
Args:
level: The level to retrieve
Returns:
List of units at that level
"""
if level < 0 or level >= len(self.levels):
return []
return [self.units[unit_id] for unit_id in self.levels[level]]
def get_maximum_parallelism(self) -> int:
"""Get the maximum number of units that can run in parallel.
Returns:
Maximum parallelism
"""
return max(len(level) for level in self.levels) if self.levels else 0
class QuantumPartitioner(Generic[S, R]):
"""Executes quantum units with maximum parallelism while respecting dependencies."""
def __init__(self):
"""Initialize an empty partitioner."""
self.units: List[QuantumUnit[S, R]] = []
self.results: Dict[str, UnitResult[R]] = {}
self.completed: Set[str] = set()
self.failed: Set[str] = set()
self._lock: threading.RLock = threading.RLock()
def add_unit(self, fn: Callable[[S], R],
dependencies: Optional[List[QuantumUnit]] = None,
name: Optional[str] = None,
timeout: Optional[float] = None) -> QuantumUnit[S, R]:
"""Add a unit to the partitioner.
Args:
fn: Function that performs the computation
dependencies: Other units that must complete before this one
name: Optional name for the unit
timeout: Optional timeout in seconds
Returns:
The created unit
"""
with self._lock:
unit = QuantumUnit(fn, dependencies, name, timeout)
self.units.append(unit)
return unit
def build_execution_plan(self) -> ExecutionPlan:
"""Build an execution plan for the current units.
Returns:
An execution plan
"""
with self._lock:
return ExecutionPlan(self.units)
def execute(self, context: S, max_parallel: Optional[int] = None) -> Dict[str, R]:
"""Execute all units with maximum parallelism.
Args:
context: The execution context
max_parallel: Maximum number of units to run in parallel
Returns:
Dict mapping unit IDs to results
Raises:
ValueError: If circular dependencies are detected
"""
with self._lock:
# Reset state
self.results = {}
self.completed = set()
self.failed = set()
# Build execution plan
plan = self.build_execution_plan()
# In a real implementation, this would use a thread/process pool
for level in range(plan.get_level_count()):
level_units = plan.get_units_at_level(level)
# Process units in this level (would be parallel in real implementation)
for unit in level_units:
# Skip if dependencies failed
if any(dep.id in self.failed for dep in unit.get_dependencies()):
continue
# Execute the unit
result = unit.execute(context)
self.results[unit.id] = result
if result.success:
self.completed.add(unit.id)
else:
self.failed.add(unit.id)
# Collect successful results
return {unit_id: result.value
for unit_id, result in self.results.items()
if result.success}
def get_failed_units(self) -> List[Tuple[QuantumUnit[S, R], Exception]]:
"""Get units that failed during execution.
Returns:
List of (unit, error) pairs
"""
with self._lock:
return [
(unit, self.results[unit.id].error)
for unit in self.units
if unit.id in self.failed and self.results[unit.id].error is not None
]
def cancel_all_pending(self) -> int:
"""Cancel all pending units.
Returns:
Number of units cancelled
"""
with self._lock:
cancelled = 0
for unit in self.units:
if unit.cancel():
cancelled += 1
return cancelled
Basic Building Blocks Implementation
python
class Observable(Generic[E]):
"""Base class for objects that emit observable events."""
def __init__(self):
self._observers: List[Callable[[E, Any], None]] = []
self._lock: threading.RLock = threading.RLock()
def add_observer(self, observer: Callable[[E, Any], None]) -> Callable[[], None]:
"""Add an observer to this object.
Args:
observer: Callback to handle events
Returns:
Function that removes this observer when called
"""
with self._lock:
self._observers.append(observer)
def remove():
with self._lock:
if observer in self._observers:
self._observers.remove(observer)
return remove
def notify(self, event: E, data: Any = None) -> None:
"""Notify all observers of an event.
Args:
event: The event that occurred
data: Optional data associated with the event
"""
observers = []
with self._lock:
observers = list(self._observers) # Copy to avoid mutation during iteration
for observer in observers:
try:
observer(event, data)
except Exception as e:
# In a real implementation, log this error
pass
class Versioned(Generic[S]):
"""Base class for objects that maintain version history."""
def __init__(self, initial_state: Optional[S] = None):
"""Initialize with an optional initial state.
Args:
initial_state: The starting state, if any
"""
self._history: List[Tuple[S, str, float]] = []
self._current_state: Optional[S] = initial_state
self._lock: threading.RLock = threading.RLock()
if initial_state is not None:
self._history.append((initial_state, "Initial state", time.time()))
def update(self, new_state: S, description: str = "") -> None:
"""Update to a new state, preserving history.
Args:
new_state: The new state
description: Description of the change
"""
with self._lock:
if self._current_state is not None:
self._history.append((self._current_state, description, time.time()))
self._current_state = new_state
def get_history(self) -> List[Tuple[S, str, float]]:
"""Get the complete state history.
Returns:
List of (state, description, timestamp) tuples
"""
with self._lock:
return list(self._history) # Return a copy
def get_state(self) -> Optional[S]:
"""Get the current state.
Returns:
The current state, or None if no state exists
"""
with self._lock:
return self._current_state
def rollback(self, steps: int = 1) -> Optional[S]:
"""Roll back to a previous state.
Args:
steps: Number of steps to roll back
Returns:
The restored state, or None if rollback not possible
Raises:
ValueError: If steps is non-positive or exceeds history
"""
if steps <= 0:
raise ValueError("Steps must be positive")
with self._lock:
if len(self._history) < steps:
return None
# Roll back the specified number of steps
for _ in range(steps):
if not self._history:
return None
self._current_state = self._history.pop()[0]
return self._current_state
class Projectable(Generic[S, P]):
"""Base class for objects that support multiple views/projections."""
def __init__(self, data: S):
"""Initialize with data and empty projections map.
Args:
data: The source data
"""
self._data: S = data
self._projections: Dict[str, Callable[[S], P]] = {}
self._lock: threading.RLock = threading.RLock()
def add_projection(self, name: str, fn: Callable[[S], P]) -> None:
"""Add a named projection function.
Args:
name: Projection name
fn: Transformation function
"""
with self._lock:
self._projections[name] = fn
def has_projection(self, name: str) -> bool:
"""Check if a projection exists.
Args:
name: Projection name to check
Returns:
True if the projection exists
"""
with self._lock:
return name in self._projections
def project(self, projection: str = "default") -> Union[S, P]:
"""Apply a projection to the data.
Args:
projection: Name of projection to apply
Returns:
Transformed data, or raw data if projection not found
"""
transform_fn = None
with self._lock:
transform_fn = self._projections.get(projection)
if transform_fn is not None:
return transform_fn(self._data)
return self._data
def update_data(self, data: S) -> None:
"""Update the underlying data.
Args:
data: New data to use
"""
with self._lock:
self._data = data
def get_raw_data(self) -> S:
"""Get the raw underlying data.
Returns:
The raw data
"""
with self._lock:
return self._data
class Effectful(Generic[V]):
"""Base class for operations with tracked side effects."""
def __init__(self, value: V, effects: List[Effect] = None):
"""Initialize with a value and optional effects.
Args:
value: The value produced by this operation
effects: Side effects caused by this operation
"""
self.value: V = value
self.effects: List[Effect] = effects or []
def map(self, fn: Callable[[V], Any]) -> "Effectful[Any]":
"""Transform the value without adding effects.
Args:
fn: Function to transform the value
Returns:
New operation with transformed value
"""
return Effectful(fn(self.value), self.effects)
def bind(self, fn: Callable[[V], "Effectful[Any]"]) -> "Effectful[Any]":
"""Chain with another effectful operation.
Args:
fn: Function returning another effectful operation
Returns:
Combined operation with all effects
"""
next_op = fn(self.value)
return Effectful(next_op.value, self.effects + next_op.effects)
def add_effect(self, effect: Effect) -> "Effectful[V]":
"""Add a side effect to this operation.
Args:
effect: The effect to add
Returns:
New operation with the additional effect
"""
return Effectful(self.value, self.effects + [effect])
def run(self, handler: Callable[[Effect], Any]) -> V:
"""Execute this operation, handling all effects.
Args:
handler: Function to handle each effect
Returns:
The final value
"""
for effect in self.effects:
handler(effect)
return self.value
@classmethod
def pure(cls, value: V) -> "Effectful[V]":
"""Create an operation with no effects.
Args:
value: The value to wrap
Returns:
Pure effectful operation
"""
return cls(value, [])
@classmethod
def of_effect(cls, effect_type: EffectType, payload: Any = None) -> "Effectful[None]":
"""Create an operation with a single effect.
Args:
effect_type: Type of the effect
payload: Optional effect payload
Returns:
Effectful operation with no value
"""
effect = Effect(type=effect_type, payload=payload)
return cls(None, [effect])
class QuantumComputation(Generic[S, R]):
"""Base class for computations with explicit dependencies."""
def __init__(self, fn: Callable[[S], R], dependencies: List["QuantumComputation"] = None,
name: str = None):
"""Initialize with function and dependencies.
Args:
fn: Function implementing the computation
dependencies: Other computations this one depends on
name: Optional name for debugging
"""
self.id: str = str(uuid.uuid4())
self.name: str = name or f"Quantum-{self.id[:8]}"
self.fn: Callable[[S], R] = fn
self.dependencies: List[QuantumComputation] = dependencies or []
self.state: UnitState = UnitState.PENDING
self.result: Optional[R] = None
self._lock: threading.RLock = threading.RLock()
def can_execute(self, completed: Set[str]) -> bool:
"""Check if this computation can be executed.
Args:
completed: Set of IDs of completed computations
Returns:
True if all dependencies are satisfied
"""
with self._lock:
if self.state != UnitState.PENDING:
return False
return all(dep.id in completed for dep in self.dependencies)
def execute(self, context: S) -> R:
"""Execute this computation.
Args:
context: Context object for the computation
Returns:
Result of the computation
Raises:
RuntimeError: If already executed or dependencies not satisfied
"""
with self._lock:
if self.state == UnitState.COMPLETED:
return self.result
if self.state != UnitState.PENDING:
raise RuntimeError(f"Cannot execute computation in state {self.state}")
self.state = UnitState.RUNNING
try:
result = self.fn(context)
with self._lock:
self.result = result
self.state = UnitState.COMPLETED
return result
except Exception as e:
with self._lock:
self.state = UnitState.FAILED
raise
Type-Safe Boundary Implementation
python
# Import specific types instead of using Any
from typing import TypeVar, Generic, Protocol, Callable, Dict, List, Optional, Union, Set, Tuple
from dataclasses import dataclass, field
from enum import Enum, auto
import threading
import time
import uuid
import copy
import json
from abc import ABC, abstractmethod
# Type variables for boundary interfaces
T_in = TypeVar('T_in') # Input type
T_out = TypeVar('T_out') # Output type
T_payload = TypeVar('T_payload') # Payload type for effects
@dataclass
class ValidationResult(Generic[T_out]):
"""Result of data validation at a boundary."""
is_valid: bool
data: Optional[T_out] = None
errors: List[Dict[str, Union[str, int, float, bool]]] = None
def __post_init__(self):
if self.errors is None:
self.errors = []
# Replace Any with specific types in interfaces
class Observable(Protocol[E]):
"""Interface for objects that emit events."""
def add_observer(self, observer: Callable[[E, Dict[str, Union[str, int, float, bool]]], None]) -> Callable[[], None]: ...
def remove_observer(self, observer: Callable[[E, Dict[str, Union[str, int, float, bool]]], None]) -> None: ...
def notify(self, event: E, data: Optional[Dict[str, Union[str, int, float, bool]]] = None) -> None: ...
class Boundary(Protocol[T_in, T_out]):
"""Interface for system boundaries that convert between domains."""
def validate(self, data: T_in) -> ValidationResult[T_out]: ...
def process(self, data: T_in) -> T_out: ...
def handle_error(self, error: Exception) -> 'BoundaryError': ...
# Type-safe implementations for effect handling
class EffectHandler:
"""Type-safe handler for effects."""
def __init__(self):
self._handlers: Dict[EffectType, Callable[[T_payload], Union[str, int, float, bool, Dict, List]]] = {}
self._fallback: Optional[Callable[[Effect], Union[str, int, float, bool, Dict, List]]] = None
self._executed_effects: List[Tuple[Effect, Union[str, int, float, bool, Dict, List]]] = []
self._lock: threading.RLock = threading.RLock()
def register_handler(self, effect_type: EffectType,
handler: Callable[[T_payload], Union[str, int, float, bool, Dict, List]]) -> None:
"""Register a handler for a specific effect type."""
with self._lock:
self._handlers[effect_type] = handler
def set_fallback_handler(self, handler: Callable[[Effect], Union[str, int, float, bool, Dict, List]]) -> None:
"""Set a fallback handler for unknown effect types."""
with self._lock:
self._fallback = handler
def handle(self, effect: Effect) -> Union[str, int, float, bool, Dict, List]:
"""Handle an effect with the appropriate handler."""
with self._lock:
if effect.type in self._handlers:
result = self._handlers[effect.type](effect.payload)
self._executed_effects.append((effect, result))
return result
elif self._fallback is not None:
result = self._fallback(effect)
self._executed_effects.append((effect, result))
return result
raise ValueError(f"No handler for effect: {effect.type}")