Inner Universe Implementation Guide
This document provides a comprehensive guide for implementing the SpacetimeDB persistence layer within the NERV architecture. The guide focuses on practical implementation steps, code examples, and best practices for integrating the two systems.
Overview
The Inner Universe persistence layer provides durable storage for NERV components using SpacetimeDB, a WASM-powered database that supports both storage and computation. This implementation guide will cover:
- Setting up the Python client to connect to SpacetimeDB
- Mapping NERV components to SpacetimeDB tables and reducers
- Implementing key operations for entity and relation management
- Testing the integration
- Handling errors
- Optimizing performance
Python Client Setup
Installation Requirements
Start by installing the required dependencies for the Python client:
uv pip install spacetimedb-sdk asyncio typing-extensions
Basic Client Configuration
Create a base configuration class for Inner Universe:
# atlas/graph/persistence/config.py
from dataclasses import dataclass, field
from typing import Dict, Any, Optional
@dataclass
class InnerUniverseConfig:
"""Configuration for the Inner Universe persistence layer"""
# SpacetimeDB connection settings
address: str = "127.0.0.1"
port: int = 3000
# Module settings
module_path: Optional[str] = None
db_path: str = "./inner_universe_db"
# Deployment settings
build_module: bool = False
module_dir: Optional[str] = None
# Feature flags
features: Dict[str, bool] = field(default_factory=lambda: {
"enable_state_versioning": True,
"enable_audit_log": True,
"enable_metrics": False,
"enable_caching": True
})
# Performance settings
cache_size_mb: int = 256
log_level: str = "info"
def to_dict(self) -> Dict[str, Any]:
"""Convert configuration to dictionary format"""
return {
"spacetimedb_config": {
"address": self.address,
"port": self.port,
"db_path": self.db_path,
"log_level": self.log_level
},
"module_config": {
"module_path": self.module_path,
"module_dir": self.module_dir,
"build_module": self.build_module
},
"server_config": {
"cache_size_mb": self.cache_size_mb
},
"feature_flags": self.features
}
Client Implementation
Next, create the Inner Universe client that will interface with SpacetimeDB:
# atlas/graph/persistence/inner_universe.py
import asyncio
import logging
from typing import Dict, List, Optional, Any, Callable, Union, TypeVar, Generic
from dataclasses import dataclass, asdict
from spacetimedb_sdk.client import SpacetimeDBClient
from spacetimedb_sdk.subscription import SubscriptionHandle
from atlas.graph.persistence.config import InnerUniverseConfig
from atlas.graph.persistence.types import (
EntityMetadata, RelationMetadata, QueryFilter,
FilterOperator, DeltaOperation, DeltaMetadata
)
from atlas.core.logging import get_logger
T = TypeVar('T')
class InnerUniverseClient:
"""Client for the Inner Universe persistence layer"""
def __init__(self, config: InnerUniverseConfig):
self.config = config
self.client: Optional[SpacetimeDBClient] = None
self.process = None
self.logger = get_logger("inner_universe")
self.subscriptions: List[SubscriptionHandle] = []
async def connect(self) -> bool:
"""Connect to the SpacetimeDB instance"""
try:
self.logger.info(f"Connecting to SpacetimeDB at {self.config.address}:{self.config.port}")
self.client = SpacetimeDBClient(
address=self.config.address,
port=self.config.port
)
await self.client.connect()
self.logger.info("Connected to SpacetimeDB")
return True
except Exception as e:
self.logger.error(f"Failed to connect to SpacetimeDB: {e}")
return False
async def disconnect(self) -> None:
"""Disconnect from the SpacetimeDB instance"""
if self.client:
for subscription in self.subscriptions:
self.client.unsubscribe(subscription)
await self.client.disconnect()
self.client = None
self.logger.info("Disconnected from SpacetimeDB")
# Entity operations
async def create_entity(self, entity_type: str, metadata: EntityMetadata) -> str:
"""Create a new entity in the persistence layer"""
if not self.client:
raise RuntimeError("Client not connected")
self.logger.debug(f"Creating entity of type {entity_type}")
# Convert metadata to dictionary
metadata_dict = asdict(metadata)
# Call the reducer
result = await self.client.call_reducer("create_entity", entity_type, metadata_dict)
return result
async def get_entity(self, entity_id: str) -> Dict[str, Any]:
"""Retrieve an entity by ID"""
if not self.client:
raise RuntimeError("Client not connected")
self.logger.debug(f"Retrieving entity {entity_id}")
# Query the entity table directly
entities = await self.client.query(f"SELECT * FROM entity WHERE id = '{entity_id}'")
if not entities:
raise ValueError(f"Entity not found: {entity_id}")
return entities[0]
async def update_entity(self, entity_id: str, metadata: EntityMetadata) -> bool:
"""Update an existing entity"""
if not self.client:
raise RuntimeError("Client not connected")
self.logger.debug(f"Updating entity {entity_id}")
# Convert metadata to dictionary
metadata_dict = asdict(metadata)
# Call the reducer
result = await self.client.call_reducer("update_entity", entity_id, metadata_dict)
return result
async def delete_entity(self, entity_id: str) -> bool:
"""Delete an entity by ID"""
if not self.client:
raise RuntimeError("Client not connected")
self.logger.debug(f"Deleting entity {entity_id}")
# Call the reducer
result = await self.client.call_reducer("delete_entity", entity_id)
return result
# Relation operations
async def create_relation(
self,
from_entity: str,
to_entity: str,
relation_type: str,
metadata: RelationMetadata
) -> str:
"""Create a new relation between entities"""
if not self.client:
raise RuntimeError("Client not connected")
self.logger.debug(f"Creating relation {relation_type} from {from_entity} to {to_entity}")
# Convert metadata to dictionary
metadata_dict = asdict(metadata)
# Call the reducer
result = await self.client.call_reducer(
"create_relation",
from_entity,
to_entity,
relation_type,
metadata_dict
)
return result
# State version operations
async def commit_state_version(
self,
parent_version_id: Optional[str],
description: str,
data_hash: str
) -> str:
"""Commit a new state version"""
if not self.client:
raise RuntimeError("Client not connected")
self.logger.debug(f"Committing state version: {description}")
# Call the reducer
result = await self.client.call_reducer(
"commit_state_version",
parent_version_id,
description,
data_hash
)
return result
async def record_delta(
self,
state_version_id: str,
operations: List[DeltaOperation],
metadata: DeltaMetadata
) -> str:
"""Record a delta for a state version"""
if not self.client:
raise RuntimeError("Client not connected")
self.logger.debug(f"Recording delta for state version {state_version_id}")
# Convert operations and metadata to dictionaries
operations_dict = [asdict(op) for op in operations]
metadata_dict = asdict(metadata)
# Call the reducer
result = await self.client.call_reducer(
"record_delta",
state_version_id,
operations_dict,
metadata_dict
)
return result
# Event operations
async def publish_event(
self,
event_type: str,
source: Optional[str],
data: Dict[str, Any]
) -> str:
"""Publish an event to the event log"""
if not self.client:
raise RuntimeError("Client not connected")
self.logger.debug(f"Publishing event of type {event_type}")
# Call the reducer
result = await self.client.call_reducer("publish_event", event_type, source, data)
return result
# Subscription methods
def subscribe_entity_changes(self, callback: Callable[[str, Dict[str, Any]], None]) -> None:
"""Subscribe to entity table changes"""
if not self.client:
raise RuntimeError("Client not connected")
def on_entity_update(changes):
for change in changes:
# Extract change type and entity data
change_type = "INSERTED" if change.is_insertion() else "DELETED" if change.is_deletion() else "UPDATED"
entity = change.row_after if change_type != "DELETED" else change.row_before
# Call the callback
callback(change_type, entity)
# Subscribe to the entity table
subscription = self.client.subscribe("entity", on_entity_update)
self.subscriptions.append(subscription)
# Query methods
async def query_entities(
self,
entity_type: Optional[str] = None,
filters: Optional[List[QueryFilter]] = None,
limit: int = 100,
offset: int = 0
) -> List[Dict[str, Any]]:
"""Query entities with optional filters"""
if not self.client:
raise RuntimeError("Client not connected")
# Build query string
query = "SELECT * FROM entity"
conditions = []
if entity_type:
conditions.append(f"entity_type = '{entity_type}'")
if filters:
for filter_item in filters:
# Convert filter operator to SQL operator
op_str = self._filter_op_to_sql(filter_item.operator)
# Format value based on type
value_str = self._format_value_for_query(filter_item.value)
conditions.append(f"{filter_item.field} {op_str} {value_str}")
# Add WHERE clause if we have conditions
if conditions:
query += " WHERE " + " AND ".join(conditions)
# Add limit and offset
query += f" LIMIT {limit} OFFSET {offset}"
self.logger.debug(f"Executing query: {query}")
# Execute the query
result = await self.client.query(query)
return result
# Helper methods
def _filter_op_to_sql(self, op: FilterOperator) -> str:
"""Convert filter operator to SQL operator"""
mapping = {
FilterOperator.EQUALS: "=",
FilterOperator.NOT_EQUALS: "!=",
FilterOperator.GREATER_THAN: ">",
FilterOperator.LESS_THAN: "<",
FilterOperator.GREATER_OR_EQUAL: ">=",
FilterOperator.LESS_OR_EQUAL: "<=",
FilterOperator.CONTAINS: "LIKE", # Approximate
FilterOperator.STARTS_WITH: "LIKE", # Approximate
FilterOperator.ENDS_WITH: "LIKE", # Approximate
FilterOperator.IN: "IN",
FilterOperator.NOT_IN: "NOT IN"
}
return mapping.get(op, "=")
def _format_value_for_query(self, value: Any) -> str:
"""Format a value for use in a SQL query"""
if value is None:
return "NULL"
elif isinstance(value, bool):
return "TRUE" if value else "FALSE"
elif isinstance(value, (int, float)):
return str(value)
elif isinstance(value, str):
# Escape single quotes
escaped = value.replace("'", "''")
return f"'{escaped}'"
elif isinstance(value, list):
# Format as array
items = [self._format_value_for_query(item) for item in value]
return f"({', '.join(items)})"
else:
# Fallback to string representation
return f"'{value}'"
Custom Type Definitions
Create type definitions for the Python client that map to the Rust types:
# atlas/graph/persistence/types.py
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Any, Union
@dataclass
class EntityMetadata:
"""Metadata associated with an entity"""
name: Optional[str] = None
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
properties: Dict[str, Any] = field(default_factory=dict)
@dataclass
class RelationMetadata:
"""Metadata associated with a relation"""
description: Optional[str] = None
strength: float = 1.0
bidirectional: bool = False
tags: List[str] = field(default_factory=list)
properties: Dict[str, Any] = field(default_factory=dict)
class FilterOperator(Enum):
"""Query filter operators"""
EQUALS = "Equals"
NOT_EQUALS = "NotEquals"
GREATER_THAN = "GreaterThan"
LESS_THAN = "LessThan"
GREATER_OR_EQUAL = "GreaterOrEqual"
LESS_OR_EQUAL = "LessOrEqual"
CONTAINS = "Contains"
STARTS_WITH = "StartsWith"
ENDS_WITH = "EndsWith"
IN = "In"
NOT_IN = "NotIn"
@dataclass
class QueryFilter:
"""Filter for database queries"""
field: str
operator: FilterOperator
value: Any
@dataclass
class DeltaMetadata:
"""Metadata for a delta record"""
description: str
author: Optional[str] = None
tags: List[str] = field(default_factory=list)
parent_delta_id: Optional[str] = None
@dataclass
class DeltaOperation:
"""A single operation in a delta"""
operation_type: str # "Set", "Delete", "Insert", "Remove", "Patch"
path: str
value: Optional[Any] = None
index: Optional[int] = None
patch: Optional[Dict[str, Any]] = None
SpacetimeDB Controller Implementation
Next, create the controller class that manages the SpacetimeDB process:
# atlas/graph/persistence/controller.py
import asyncio
import os
import json
import tempfile
from typing import Dict, Any, Optional, List
from atlas.graph.persistence.config import InnerUniverseConfig
from atlas.graph.persistence.inner_universe import InnerUniverseClient
from atlas.core.logging import get_logger
class InnerUniverseController:
"""Controller for the Inner Universe persistence layer"""
def __init__(self, config: InnerUniverseConfig):
self.config = config
self.process = None
self.client: Optional[InnerUniverseClient] = None
self.logger = get_logger("inner_universe_controller")
async def start_deployment_mode(self) -> Dict[str, Any]:
"""Initialize and configure a new SpacetimeDB instance"""
# Build module if needed
if self.config.build_module and self.config.module_dir:
await self._build_module()
# Create a temporary config file
config_file = await self._create_temp_config()
# Launch SpacetimeDB in deployment mode
cmd = [
"spacetime", "deploy",
"--module-path", self.config.module_path,
"--db-path", self.config.db_path,
"--config", config_file,
"--log-level", self.config.log_level
]
self.logger.info(f"Starting SpacetimeDB in deployment mode: {' '.join(cmd)}")
# Start the process
self.process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Wait for deployment to complete
stdout, stderr = await self.process.communicate()
if self.process.returncode != 0:
error_msg = stderr.decode()
self.logger.error(f"Deployment failed: {error_msg}")
raise RuntimeError(f"Deployment failed: {error_msg}")
self.logger.info("SpacetimeDB deployment completed successfully")
# Clean up temporary config file
os.unlink(config_file)
# Connect client
self.client = InnerUniverseClient(self.config)
await self.client.connect()
return {
"status": "deployed",
"stdout": stdout.decode()
}
async def start_server_mode(self) -> InnerUniverseClient:
"""Start SpacetimeDB in server mode to handle ongoing requests"""
# Launch SpacetimeDB in server mode
cmd = [
"spacetime", "run",
"--db-path", self.config.db_path,
"--address", self.config.address,
"--port", str(self.config.port),
"--log-level", self.config.log_level
]
self.logger.info(f"Starting SpacetimeDB in server mode: {' '.join(cmd)}")
# Start the process
self.process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Wait a bit for the server to start
await asyncio.sleep(1)
# Check if process is still running
if self.process.returncode is not None:
stderr = await self.process.stderr.read()
error_msg = stderr.decode()
self.logger.error(f"Server failed to start: {error_msg}")
raise RuntimeError(f"Server failed to start: {error_msg}")
# Connect client
self.client = InnerUniverseClient(self.config)
connected = await self.client.connect()
if not connected:
self.logger.error("Failed to connect to SpacetimeDB server")
await self.stop()
raise RuntimeError("Failed to connect to SpacetimeDB server")
self.logger.info("SpacetimeDB server running and client connected")
return self.client
async def stop(self):
"""Stop the SpacetimeDB instance and disconnect client"""
if self.client:
await self.client.disconnect()
self.client = None
if self.process and self.process.returncode is None:
self.process.terminate()
await self.process.wait()
self.logger.info("SpacetimeDB process terminated")
self.process = None
async def _build_module(self):
"""Build the Rust module to WebAssembly"""
cmd = [
"cargo", "build",
"--release",
"--target", "wasm32-unknown-unknown"
]
self.logger.info(f"Building module: {' '.join(cmd)}")
# Run in the module directory
process = await asyncio.create_subprocess_exec(
*cmd,
cwd=self.config.module_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_msg = stderr.decode()
self.logger.error(f"Module build failed: {error_msg}")
raise RuntimeError(f"Module build failed: {error_msg}")
self.logger.info("Module built successfully")
async def _create_temp_config(self) -> str:
"""Create a temporary config file for SpacetimeDB deployment"""
# Create a temporary file
fd, path = tempfile.mkstemp(suffix=".json")
# Write the config to the file
with os.fdopen(fd, 'w') as f:
json.dump(self.config.to_dict(), f)
return path
Mapping NERV Components to SpacetimeDB
Entity-Relation Mapping
The following table shows how NERV components map to SpacetimeDB tables:
NERV Component | SpacetimeDB Table(s) | Description |
---|---|---|
Entity | entity | Stores entities from NERV’s knowledge graph |
Relation | relation | Stores relationships between entities |
Temporal | state_version , delta_record | Stores versioned state and change history |
Event Bus | event_log | Records events for synchronization |
Integration with EventBus
To integrate with NERV’s EventBus, create a SpacetimeEventBus adapter:
# atlas/graph/persistence/event_bus.py
from typing import Dict, Any, Optional, Callable, List
import asyncio
from atlas.agents.messaging.message import Message
from atlas.core.logging import get_logger
from atlas.graph.persistence.inner_universe import InnerUniverseClient
class SpacetimeEventBus:
"""Event bus adapter for SpacetimeDB integration"""
def __init__(self, client: InnerUniverseClient):
self.client = client
self.logger = get_logger("spacetime_event_bus")
self.handlers: Dict[str, List[Callable[[Message], None]]] = {}
def register_handler(self, event_type: str, handler: Callable[[Message], None]) -> None:
"""Register a handler for a specific event type"""
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
self.logger.debug(f"Registered handler for event type: {event_type}")
async def publish(self, event_type: str, data: Dict[str, Any], source: Optional[str] = None) -> str:
"""Publish an event to the SpacetimeDB event log"""
try:
event_id = await self.client.publish_event(event_type, source, data)
self.logger.debug(f"Published event {event_id} of type {event_type}")
return event_id
except Exception as e:
self.logger.error(f"Failed to publish event: {e}")
raise
def subscribe_to_events(self) -> None:
"""Subscribe to events from the SpacetimeDB event log"""
def on_event_change(change_type: str, event: Dict[str, Any]) -> None:
if change_type != "INSERTED":
return # Only process new events
# Convert to message format
message = Message(
id=event.get("event_id", ""),
type=event.get("event_type", ""),
source=event.get("source"),
timestamp=event.get("timestamp", 0),
data=event.get("data", {})
)
# Call registered handlers
event_type = event.get("event_type", "")
if event_type in self.handlers:
for handler in self.handlers[event_type]:
asyncio.create_task(self._call_handler(handler, message))
# Subscribe to event_log table changes
self.client.subscribe_entity_changes(on_event_change)
self.logger.info("Subscribed to event_log changes")
async def _call_handler(self, handler: Callable[[Message], None], message: Message) -> None:
"""Call a handler with error handling"""
try:
handler(message)
except Exception as e:
self.logger.error(f"Error in event handler: {e}")
Integration with TemporalStore
Next, create an adapter for NERV’s TemporalStore:
# atlas/graph/persistence/temporal_store.py
from typing import Dict, Any, Optional, List
import hashlib
import json
from atlas.core.logging import get_logger
from atlas.graph.persistence.inner_universe import InnerUniverseClient
from atlas.graph.persistence.types import DeltaMetadata, DeltaOperation
class SpacetimeTemporalStore:
"""TemporalStore adapter for SpacetimeDB integration"""
def __init__(self, client: InnerUniverseClient):
self.client = client
self.logger = get_logger("spacetime_temporal_store")
self.current_version_id: Optional[str] = None
async def commit_version(self, state: Dict[str, Any], description: str) -> str:
"""Commit a new state version"""
# Calculate hash of state for integrity checking
state_json = json.dumps(state, sort_keys=True)
data_hash = hashlib.sha256(state_json.encode()).hexdigest()
# Commit the state version
version_id = await self.client.commit_state_version(
self.current_version_id,
description,
data_hash
)
# Update current version
self.current_version_id = version_id
self.logger.info(f"Committed state version {version_id}: {description}")
return version_id
async def record_state_delta(
self,
version_id: str,
operations: List[Dict[str, Any]],
description: str
) -> str:
"""Record a delta for a state version"""
# Convert operations to DeltaOperation objects
delta_operations = []
for op in operations:
op_type = op.get("type", "")
path = op.get("path", "")
if op_type == "set":
delta_operations.append(DeltaOperation(
operation_type="Set",
path=path,
value=op.get("value")
))
elif op_type == "delete":
delta_operations.append(DeltaOperation(
operation_type="Delete",
path=path
))
elif op_type == "insert":
delta_operations.append(DeltaOperation(
operation_type="Insert",
path=path,
index=op.get("index"),
value=op.get("value")
))
elif op_type == "remove":
delta_operations.append(DeltaOperation(
operation_type="Remove",
path=path,
index=op.get("index", 0)
))
elif op_type == "patch":
delta_operations.append(DeltaOperation(
operation_type="Patch",
path=path,
patch=op.get("patch", {})
))
# Create delta metadata
metadata = DeltaMetadata(
description=description,
author=None, # Could be populated from context
tags=[]
)
# Record the delta
delta_id = await self.client.record_delta(version_id, delta_operations, metadata)
self.logger.info(f"Recorded delta {delta_id} for version {version_id}")
return delta_id
async def get_current_version(self) -> Optional[Dict[str, Any]]:
"""Get the current state version"""
if not self.current_version_id:
# Try to find the latest version
versions = await self.client.query_entities(
entity_type="state_version",
limit=1,
filters=[] # No filters to get latest
)
if versions:
self.current_version_id = versions[0].get("version_id")
else:
return None
# Query the state version
if self.current_version_id:
versions = await self.client.query_entities(
entity_type="state_version",
filters=[{
"field": "version_id",
"operator": "EQUALS",
"value": self.current_version_id
}]
)
if versions:
return versions[0]
return None
Integration with NERV Core
Persistence Manager
Create a central manager class to coordinate the SpacetimeDB integration:
# atlas/graph/persistence/manager.py
from typing import Dict, Any, Optional, List
import asyncio
from atlas.core.logging import get_logger
from atlas.graph.persistence.config import InnerUniverseConfig
from atlas.graph.persistence.controller import InnerUniverseController
from atlas.graph.persistence.inner_universe import InnerUniverseClient
from atlas.graph.persistence.event_bus import SpacetimeEventBus
from atlas.graph.persistence.temporal_store import SpacetimeTemporalStore
class PersistenceManager:
"""Manager for the Inner Universe persistence layer"""
def __init__(self, config: Optional[InnerUniverseConfig] = None):
self.config = config or InnerUniverseConfig()
self.controller = InnerUniverseController(self.config)
self.client: Optional[InnerUniverseClient] = None
self.event_bus: Optional[SpacetimeEventBus] = None
self.temporal_store: Optional[SpacetimeTemporalStore] = None
self.logger = get_logger("persistence_manager")
self._initialized = False
async def initialize(self, deploy: bool = False) -> bool:
"""Initialize the persistence layer"""
try:
if deploy:
# Deploy in deployment mode
await self.controller.start_deployment_mode()
else:
# Start in server mode
self.client = await self.controller.start_server_mode()
# Create adapters
self.event_bus = SpacetimeEventBus(self.client)
self.temporal_store = SpacetimeTemporalStore(self.client)
# Subscribe to events
self.event_bus.subscribe_to_events()
self._initialized = True
self.logger.info("Persistence manager initialized successfully")
return True
except Exception as e:
self.logger.error(f"Failed to initialize persistence layer: {e}")
await self.shutdown()
return False
async def shutdown(self) -> None:
"""Shutdown the persistence layer"""
await self.controller.stop()
self.client = None
self.event_bus = None
self.temporal_store = None
self._initialized = False
self.logger.info("Persistence manager shut down")
def is_initialized(self) -> bool:
"""Check if the persistence layer is initialized"""
return self._initialized
# Entity operations
async def create_entity(self, entity_type: str, metadata: Dict[str, Any]) -> str:
"""Create a new entity"""
self._check_initialized()
return await self.client.create_entity(entity_type, metadata)
async def get_entity(self, entity_id: str) -> Dict[str, Any]:
"""Get an entity by ID"""
self._check_initialized()
return await self.client.get_entity(entity_id)
async def update_entity(self, entity_id: str, metadata: Dict[str, Any]) -> bool:
"""Update an entity's metadata"""
self._check_initialized()
return await self.client.update_entity(entity_id, metadata)
async def delete_entity(self, entity_id: str) -> bool:
"""Delete an entity"""
self._check_initialized()
return await self.client.delete_entity(entity_id)
# Relation operations
async def create_relation(
self,
from_entity: str,
to_entity: str,
relation_type: str,
metadata: Dict[str, Any]
) -> str:
"""Create a new relation between entities"""
self._check_initialized()
return await self.client.create_relation(from_entity, to_entity, relation_type, metadata)
# State version operations
async def commit_version(self, state: Dict[str, Any], description: str) -> str:
"""Commit a new state version"""
self._check_initialized()
if not self.temporal_store:
raise RuntimeError("TemporalStore not initialized")
return await self.temporal_store.commit_version(state, description)
# Event operations
async def publish_event(
self,
event_type: str,
data: Dict[str, Any],
source: Optional[str] = None
) -> str:
"""Publish an event"""
self._check_initialized()
if not self.event_bus:
raise RuntimeError("EventBus not initialized")
return await self.event_bus.publish(event_type, data, source)
def _check_initialized(self) -> None:
"""Check if the persistence layer is initialized"""
if not self._initialized or not self.client:
raise RuntimeError("Persistence layer not initialized")
Example Operations
Entity Creation & Management
# Example of using the persistence manager for entity operations
async def entity_example():
# Create the persistence manager
manager = PersistenceManager()
# Initialize without deployment (assuming SpacetimeDB is already running)
await manager.initialize()
try:
# Create a new entity
entity_id = await manager.create_entity(
entity_type="Person",
metadata={
"name": "John Doe",
"description": "A test user",
"tags": ["test", "example"],
"properties": {
"age": 30,
"email": "[email protected]"
}
}
)
print(f"Created entity: {entity_id}")
# Get the entity
entity = await manager.get_entity(entity_id)
print(f"Retrieved entity: {entity}")
# Update the entity
success = await manager.update_entity(
entity_id=entity_id,
metadata={
"name": "John Doe",
"description": "An updated test user",
"tags": ["test", "example", "updated"],
"properties": {
"age": 31,
"email": "[email protected]"
}
}
)
print(f"Updated entity: {success}")
# Create a related entity
other_entity_id = await manager.create_entity(
entity_type="Company",
metadata={
"name": "Acme Inc",
"description": "A test company",
"tags": ["test", "example"],
"properties": {
"industry": "Technology"
}
}
)
print(f"Created related entity: {other_entity_id}")
# Create a relation between entities
relation_id = await manager.create_relation(
from_entity=entity_id,
to_entity=other_entity_id,
relation_type="WORKS_FOR",
metadata={
"description": "Employment relationship",
"strength": 0.9,
"bidirectional": False,
"tags": ["employment"],
"properties": {
"start_date": "2023-01-01",
"position": "Software Engineer"
}
}
)
print(f"Created relation: {relation_id}")
finally:
# Shut down the persistence layer
await manager.shutdown()
State Versioning Example
# Example of using the persistence manager for state versioning
async def versioning_example():
# Create the persistence manager
manager = PersistenceManager()
# Initialize without deployment (assuming SpacetimeDB is already running)
await manager.initialize()
try:
# Create initial state
initial_state = {
"version": "1.0.0",
"settings": {
"theme": "light",
"notifications": True,
"language": "en"
},
"data": {
"items": [1, 2, 3],
"count": 3
}
}
# Commit initial state version
version_id = await manager.commit_version(
state=initial_state,
description="Initial state version"
)
print(f"Committed initial state version: {version_id}")
# Record a delta for adding an item
add_operations = [
{
"type": "insert",
"path": "data.items",
"index": 3,
"value": 4
},
{
"type": "set",
"path": "data.count",
"value": 4
}
]
delta_id = await manager.temporal_store.record_delta(
version_id=version_id,
operations=add_operations,
description="Added item 4"
)
print(f"Recorded delta: {delta_id}")
# Update state with the changes
updated_state = {
"version": "1.0.0",
"settings": {
"theme": "light",
"notifications": True,
"language": "en"
},
"data": {
"items": [1, 2, 3, 4],
"count": 4
}
}
# Commit updated state version
new_version_id = await manager.commit_version(
state=updated_state,
description="Added item 4"
)
print(f"Committed updated state version: {new_version_id}")
# Get current version
current_version = await manager.temporal_store.get_current_version()
print(f"Current version: {current_version}")
finally:
# Shut down the persistence layer
await manager.shutdown()
Event System Example
# Example of using the persistence manager for events
async def event_example():
# Create the persistence manager
manager = PersistenceManager()
# Initialize without deployment (assuming SpacetimeDB is already running)
await manager.initialize()
try:
# Register event handler
def handle_user_event(message):
print(f"Received user event: {message.data}")
manager.event_bus.register_handler("USER_ACTION", handle_user_event)
# Publish an event
event_id = await manager.publish_event(
event_type="USER_ACTION",
data={
"action": "login",
"user_id": "12345",
"timestamp": 1633036800
},
source="auth_service"
)
print(f"Published event: {event_id}")
# Wait for event processing
await asyncio.sleep(1)
finally:
# Shut down the persistence layer
await manager.shutdown()
Testing the Integration
Unit Testing Setup
Create test utilities for the SpacetimeDB integration:
# atlas/tests/graph/persistence/conftest.py
import pytest
import asyncio
import tempfile
import os
import shutil
from atlas.graph.persistence.config import InnerUniverseConfig
from atlas.graph.persistence.controller import InnerUniverseController
from atlas.graph.persistence.manager import PersistenceManager
@pytest.fixture(scope="session")
def event_loop():
"""Create a session-scoped event loop"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def spacetime_config():
"""Create a temporary configuration for testing"""
# Create a temporary directory for the database
temp_dir = tempfile.mkdtemp()
try:
# Create configuration
config = InnerUniverseConfig(
address="127.0.0.1",
port=3789, # Use a different port for testing
db_path=os.path.join(temp_dir, "test_db"),
log_level="info",
module_path="/path/to/test_module.wasm" # Replace with the actual test module path
)
yield config
finally:
# Clean up temporary directory
shutil.rmtree(temp_dir)
@pytest.fixture(scope="session")
async def spacetime_controller(spacetime_config):
"""Create and start a SpacetimeDB controller"""
controller = InnerUniverseController(spacetime_config)
try:
# Deploy the test database (skip if using a mock)
# await controller.start_deployment_mode()
# Start the server
await controller.start_server_mode()
yield controller
finally:
# Stop the controller
await controller.stop()
@pytest.fixture(scope="function")
async def persistence_manager(spacetime_controller):
"""Create a persistence manager with an initialized client"""
manager = PersistenceManager(spacetime_controller.config)
# Use the existing client
manager.client = spacetime_controller.client
manager.event_bus = SpacetimeEventBus(manager.client)
manager.temporal_store = SpacetimeTemporalStore(manager.client)
manager._initialized = True
yield manager
Basic Test Implementation
Create basic tests for the persistence layer:
# atlas/tests/graph/persistence/test_entities.py
import pytest
from atlas.graph.persistence.types import EntityMetadata, RelationMetadata
async def test_create_entity(persistence_manager):
"""Test creating an entity"""
# Create test metadata
metadata = EntityMetadata(
name="Test Entity",
description="A test entity",
tags=["test", "entity"],
properties={"test_prop": "test_value"}
)
# Create the entity
entity_id = await persistence_manager.create_entity("TestType", metadata)
# Verify the entity was created
assert entity_id is not None
assert isinstance(entity_id, str)
# Get the entity
entity = await persistence_manager.get_entity(entity_id)
# Verify the entity data
assert entity is not None
assert entity["id"] == entity_id
assert entity["entity_type"] == "TestType"
assert entity["metadata"]["name"] == "Test Entity"
assert "test" in entity["metadata"]["tags"]
async def test_create_and_update_entity(persistence_manager):
"""Test creating and updating an entity"""
# Create initial metadata
initial_metadata = EntityMetadata(
name="Initial Name",
description="Initial description",
tags=["initial"],
properties={"initial_prop": "initial_value"}
)
# Create the entity
entity_id = await persistence_manager.create_entity("TestType", initial_metadata)
# Updated metadata
updated_metadata = EntityMetadata(
name="Updated Name",
description="Updated description",
tags=["initial", "updated"],
properties={"initial_prop": "initial_value", "new_prop": "new_value"}
)
# Update the entity
result = await persistence_manager.update_entity(entity_id, updated_metadata)
assert result is True
# Get the updated entity
updated_entity = await persistence_manager.get_entity(entity_id)
# Verify the updates
assert updated_entity["metadata"]["name"] == "Updated Name"
assert updated_entity["metadata"]["description"] == "Updated description"
assert "updated" in updated_entity["metadata"]["tags"]
assert updated_entity["metadata"]["properties"]["new_prop"] == "new_value"
async def test_create_relation(persistence_manager):
"""Test creating a relation between entities"""
# Create two entities
entity1_id = await persistence_manager.create_entity(
"TestType1",
EntityMetadata(name="Entity 1")
)
entity2_id = await persistence_manager.create_entity(
"TestType2",
EntityMetadata(name="Entity 2")
)
# Create relation metadata
relation_metadata = RelationMetadata(
description="Test relation",
strength=0.75,
bidirectional=True,
tags=["test_relation"],
properties={"rel_prop": "rel_value"}
)
# Create the relation
relation_id = await persistence_manager.create_relation(
from_entity=entity1_id,
to_entity=entity2_id,
relation_type="TEST_RELATION",
metadata=relation_metadata
)
# Verify the relation was created
assert relation_id is not None
assert isinstance(relation_id, str)
Error Handling Patterns
Implement consistent error handling throughout the persistence layer:
# atlas/graph/persistence/errors.py
class PersistenceError(Exception):
"""Base class for persistence layer errors"""
pass
class ConnectionError(PersistenceError):
"""Error connecting to SpacetimeDB"""
pass
class EntityNotFoundError(PersistenceError):
"""Entity not found in the database"""
pass
class RelationNotFoundError(PersistenceError):
"""Relation not found in the database"""
pass
class VersionNotFoundError(PersistenceError):
"""State version not found in the database"""
pass
class AuthorizationError(PersistenceError):
"""Not authorized to perform the operation"""
pass
class ValidationError(PersistenceError):
"""Input validation failed"""
pass
Update the client to use these error types:
# Example client method with improved error handling
async def get_entity(self, entity_id: str) -> Dict[str, Any]:
"""Retrieve an entity by ID"""
if not self.client:
raise ConnectionError("Client not connected")
self.logger.debug(f"Retrieving entity {entity_id}")
try:
# Query the entity table directly
entities = await self.client.query(f"SELECT * FROM entity WHERE id = '{entity_id}'")
if not entities:
raise EntityNotFoundError(f"Entity not found: {entity_id}")
return entities[0]
except Exception as e:
if isinstance(e, EntityNotFoundError):
raise
else:
self.logger.error(f"Error retrieving entity {entity_id}: {e}")
raise PersistenceError(f"Error retrieving entity: {e}")
Performance Optimization Strategies
Connection Pooling
Implement connection pooling for SpacetimeDB clients:
# atlas/graph/persistence/pool.py
import asyncio
from typing import Dict, List, Optional, Any, Callable
import time
from atlas.core.logging import get_logger
from atlas.graph.persistence.inner_universe import InnerUniverseClient
from atlas.graph.persistence.config import InnerUniverseConfig
class ClientPool:
"""Connection pool for SpacetimeDB clients"""
def __init__(self, config: InnerUniverseConfig, min_size: int = 2, max_size: int = 10):
self.config = config
self.min_size = min_size
self.max_size = max_size
self.pool: List[InnerUniverseClient] = []
self.in_use: Dict[InnerUniverseClient, float] = {}
self.lock = asyncio.Lock()
self.logger = get_logger("client_pool")
async def initialize(self) -> None:
"""Initialize the connection pool"""
async with self.lock:
self.logger.info(f"Initializing client pool with min_size={self.min_size}")
# Create initial connections
for _ in range(self.min_size):
client = InnerUniverseClient(self.config)
await client.connect()
self.pool.append(client)
self.logger.info(f"Client pool initialized with {len(self.pool)} connections")
async def acquire(self) -> InnerUniverseClient:
"""Acquire a client from the pool"""
async with self.lock:
# Try to get an available client
if self.pool:
client = self.pool.pop(0)
self.in_use[client] = time.time()
return client
# If the pool is empty but we haven't reached max_size, create a new client
if len(self.in_use) < self.max_size:
client = InnerUniverseClient(self.config)
await client.connect()
self.in_use[client] = time.time()
self.logger.debug(f"Created new client, pool size: {len(self.pool)}, in use: {len(self.in_use)}")
return client
# If we've reached max_size, wait for a client to be released
self.logger.warning(f"Client pool exhausted (max_size={self.max_size}), waiting for release")
# Wait outside the lock to prevent deadlock
while True:
await asyncio.sleep(0.1)
async with self.lock:
if self.pool:
client = self.pool.pop(0)
self.in_use[client] = time.time()
return client
async def release(self, client: InnerUniverseClient) -> None:
"""Release a client back to the pool"""
async with self.lock:
if client in self.in_use:
del self.in_use[client]
self.pool.append(client)
self.logger.debug(f"Released client to pool, pool size: {len(self.pool)}, in use: {len(self.in_use)}")
else:
self.logger.warning(f"Attempted to release a client that isn't in use")
async def shutdown(self) -> None:
"""Shut down all clients in the pool"""
async with self.lock:
# Close all clients in the pool
for client in self.pool:
await client.disconnect()
# Close all clients in use
for client in list(self.in_use.keys()):
await client.disconnect()
self.pool.clear()
self.in_use.clear()
self.logger.info("Client pool shut down")
Request Batching
Implement batching for operations:
# atlas/graph/persistence/batch.py
from typing import Dict, List, Any, TypeVar, Generic
import asyncio
from atlas.core.logging import get_logger
from atlas.graph.persistence.inner_universe import InnerUniverseClient
from atlas.graph.persistence.types import EntityMetadata
T = TypeVar('T')
class BatchOperation(Generic[T]):
"""A batch operation with a future result"""
def __init__(self):
self.future: asyncio.Future[T] = asyncio.Future()
class EntityBatch:
"""Batch processor for entity operations"""
def __init__(self, client: InnerUniverseClient, max_batch_size: int = 50, max_wait_ms: int = 100):
self.client = client
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.logger = get_logger("entity_batch")
self.pending_operations: List[Dict[str, Any]] = []
self.pending_futures: List[asyncio.Future] = []
self.batch_task: Optional[asyncio.Task] = None
self.lock = asyncio.Lock()
async def create_entity(self, entity_type: str, metadata: EntityMetadata) -> str:
"""Queue an entity creation in the batch"""
operation = BatchOperation[str]()
async with self.lock:
# Add to pending operations
self.pending_operations.append({
"type": "create_entity",
"entity_type": entity_type,
"metadata": metadata,
"operation": operation
})
self.pending_futures.append(operation.future)
# Start batch task if not running
if not self.batch_task or self.batch_task.done():
self.batch_task = asyncio.create_task(self._process_batch())
# Process immediately if batch size reached
if len(self.pending_operations) >= self.max_batch_size:
self.batch_task.cancel()
self.batch_task = asyncio.create_task(self._process_batch())
# Wait for result
return await operation.future
async def _process_batch(self) -> None:
"""Process the current batch of operations"""
try:
# Wait for more operations or timeout
try:
await asyncio.sleep(self.max_wait_ms / 1000)
except asyncio.CancelledError:
# Batch size reached, process immediately
pass
async with self.lock:
# Get the current batch
operations = self.pending_operations.copy()
futures = self.pending_futures.copy()
# Clear pending lists
self.pending_operations.clear()
self.pending_futures.clear()
if not operations:
return
self.logger.debug(f"Processing batch of {len(operations)} operations")
# Group operations by type
create_entities = []
for op in operations:
if op["type"] == "create_entity":
create_entities.append(op)
# Process create_entity operations
if create_entities:
try:
# Call reducer for each operation
for op in create_entities:
entity_id = await self.client.create_entity(
op["entity_type"],
op["metadata"]
)
op["operation"].future.set_result(entity_id)
except Exception as e:
# Set exception for all futures
for op in create_entities:
if not op["operation"].future.done():
op["operation"].future.set_exception(e)
self.logger.debug(f"Batch processing completed")
except Exception as e:
self.logger.error(f"Error processing batch: {e}")
# Set exception for all futures
async with self.lock:
for future in self.pending_futures:
if not future.done():
future.set_exception(e)
self.pending_operations.clear()
self.pending_futures.clear()
Caching Layer
Implement a caching layer for frequently accessed data:
# atlas/graph/persistence/cache.py
from typing import Dict, Any, Optional, List, Callable, TypeVar, Generic
import time
import asyncio
T = TypeVar('T')
class CacheEntry(Generic[T]):
"""An entry in the cache with expiration"""
def __init__(self, value: T, ttl_seconds: float):
self.value = value
self.expires_at = time.time() + ttl_seconds
def is_expired(self) -> bool:
"""Check if the entry is expired"""
return time.time() > self.expires_at
class Cache(Generic[T]):
"""Simple in-memory cache with expiration"""
def __init__(self, ttl_seconds: float = 300):
self.ttl_seconds = ttl_seconds
self.entries: Dict[str, CacheEntry[T]] = {}
self.cleanup_task: Optional[asyncio.Task] = None
def get(self, key: str) -> Optional[T]:
"""Get a value from the cache"""
entry = self.entries.get(key)
if entry is None or entry.is_expired():
if entry is not None:
del self.entries[key]
return None
return entry.value
def set(self, key: str, value: T, ttl_seconds: Optional[float] = None) -> None:
"""Set a value in the cache with an optional custom TTL"""
ttl = ttl_seconds if ttl_seconds is not None else self.ttl_seconds
self.entries[key] = CacheEntry(value, ttl)
def invalidate(self, key: str) -> None:
"""Remove a key from the cache"""
if key in self.entries:
del self.entries[key]
def clear(self) -> None:
"""Clear all entries from the cache"""
self.entries.clear()
async def get_or_compute(self, key: str, compute_func: Callable[[], T]) -> T:
"""Get a value from the cache or compute it if not present"""
value = self.get(key)
if value is None:
value = compute_func()
self.set(key, value)
return value
async def cleanup(self) -> None:
"""Remove expired entries from the cache"""
if self.cleanup_task is None or self.cleanup_task.done():
self.cleanup_task = asyncio.create_task(self._cleanup_task())
async def _cleanup_task(self) -> None:
"""Background task to clean up expired entries"""
while True:
# Check all entries
keys_to_remove = []
for key, entry in self.entries.items():
if entry.is_expired():
keys_to_remove.append(key)
# Remove expired entries
for key in keys_to_remove:
del self.entries[key]
# Wait before next cleanup
await asyncio.sleep(60) # Clean up every minute
Add caching to the client:
# Enhanced client method with caching
def get_entity_with_cache(self, entity_id: str) -> Dict[str, Any]:
"""Get an entity by ID with caching"""
# Check cache first
cache_key = f"entity:{entity_id}"
cached_entity = self.cache.get(cache_key)
if cached_entity:
self.logger.debug(f"Cache hit for entity {entity_id}")
return cached_entity
# Not in cache, query from database
entity = await self.get_entity(entity_id)
# Store in cache
self.cache.set(cache_key, entity)
return entity
Conclusion
This implementation guide provides a comprehensive approach to integrating SpacetimeDB with the NERV architecture. By following these patterns and examples, you can create a robust persistence layer that aligns with NERV’s event-driven design, effect system, and state projection capabilities.
Key benefits of this integration include:
- Persistent Storage: Durable storage for entities, relations, and events
- Real-time Synchronization: Event-driven updates between clients
- Temporal State Management: Support for versioned state and deltas
- Performance Optimization: Connection pooling, batching, and caching
- Error Handling: Robust error patterns and recovery mechanisms
Next steps for the integration include:
- Implementing the full set of query capabilities
- Adding transaction support for multi-step operations
- Creating advanced analytics and reporting features
- Implementing sharding for horizontal scaling
- Adding monitoring and observability instrumentation