Skip to content

NERV Type System Cheatsheet

This cheatsheet provides a quick reference for Atlas’s NERV architecture type system, with concise definitions and usage guidance.

Type Variables

TypeVarPurposeUsed For
TGeneric data typeGeneral purpose type variable for any data
SGeneric state typeState representations in components
RGeneric result typeReturn values from operations
EGeneric event typeEvent data in the event system
VGeneric value typeValues in effectful operations
PGeneric perspective typeProjected views of data
KGeneric key typeKeys in dictionaries/maps
CGeneric context typeContext passed between components
MGeneric message typeMessages in communication
T_inInput boundary typeInput data at system boundaries
T_outOutput boundary typeOutput data at system boundaries

Identity Types

TypeDefinitionPurpose
EntityIdstrUnique identifier for any entity in the system
VersionIdstrIdentifier for a versioned state
ResourceIdstrIdentifier for a managed resource
EventIdstrIdentifier for an event occurrence

Core Enums

EnumValuesPurpose
EventTypeSYSTEM_INIT, PROVIDER_CREATED, etc.Categorize events in the system
EffectTypeFILE_READ, MODEL_CALL, etc.Categorize side effects
LifecycleStateCREATED, READY, ACTIVE, etc.Track component lifecycle
StreamStatePENDING, ACTIVE, COMPLETED, etc.Control streaming operations
ResourceTypeCONNECTION, FILE, THREAD, etc.Categorize managed resources
UnitStatePENDING, RUNNING, COMPLETED, etc.Track quantum unit execution

Core Data Classes

ClassKey FieldsPurpose
Event[T]id, type, data, timestamp, sourceStructured event data
Effecttype, payload, descriptionSide effect representation
Resourceid, type, state, metadataManaged system resource
VersionedStateversion_id, data, parent_version_idState with version info
UnitResult[R]success, value, errorComputation unit result
ValidationResult[T]is_valid, data, errorsValidation outcome
DeltaMetadatatimestamp, source, descriptionChange metadata

Core Protocol Interfaces

ProtocolKey MethodsPurpose
Observable[E]add_observer(), notify()Emit observable events
Versioned[S]get_version(), commit()Version history management
Projectable[S,P]add_projection(), project()Multiple views of data
Effectful[V]with_effect(), map(), bind()Track operation side effects
QuantumUnit[S,R]can_execute(), execute()Parallelizable computation
Boundary[T_in,T_out]validate(), process()System boundary handling

Error Hierarchy

AtlasError
 └── BoundaryError
      ├── ValidationError
      └── NetworkError

Implementation Classes

ClassImplements PatternKey Features
EventBusReactive Event MeshManages pub/sub, middleware, history
TemporalStoreTemporal VersioningComplete version history
PerspectiveAwarePerspective ShiftingContext-specific data views
EffectMonadEffect SystemMonadic effect handling
StateProjectorState ProjectionDelta-based state projections
QuantumUnitImplQuantum PartitioningParallelizable computation unit
ExecutionPlanQuantum PartitioningDependency-aware scheduling
QuantumPartitionerQuantum PartitioningMaximizes parallel execution

Primitive Design Patterns

PatternImplementersUsage
ObserverObservable, EventBusEvent notification system
CommandEffect, EffectHandlerOperation encapsulation
MonadEffectMonadFunctional composition
StrategyProjectable, StateProjectorInterchangeable algorithms
BuilderQuantumPartitionerStep-by-step construction
DAGExecutionPlanDependency management
FactoryDelta, EffectMonadDelegated object creation
DecoratorEffectMonad, PerspectiveAwareDynamic behavior addition

Relationship Visualizer

Pattern Integration

The following table shows how NERV’s core patterns integrate to solve common problems:

Pattern CombinationUse CaseKey Benefits
Temporal Versioning + State ProjectionDocument versioning systemsCommand-level granularity with milestone snapshots
Reactive Event Mesh + Effect SystemStreaming response handlingDecoupled side effects with observable events
Perspective Shifting + Temporal VersioningMulti-user content managementRole-based views with full history tracking
Quantum Partitioning + Event-DrivenParallel workflow orchestrationOptimized execution with decoupled messaging
State Projection + Effect SystemTransactional operationsExplicit effects with delta-based changes

Integration with Atlas Core Systems

NERV ComponentAtlas IntegrationPurpose
EventBusIntegrates with Provider and Agent systemsCentral communication backbone
TemporalStoreSupports Provider configuration and Agent stateVersioned state preservation
PerspectiveAwareProvides role-based API viewsContext-specific representations
EffectMonadPowers tool invocation and external callsClean effect handling
QuantumPartitionerOptimizes multi-agent orchestrationParallel execution control

Usage Examples

Event System

python
# Create and use EventBus
bus = EventBus()
unsubscribe = bus.subscribe(EventType.STREAM_STARTED, lambda e: print(f"Stream started: {e.data}"))
event_id = bus.publish(EventType.STREAM_STARTED, data="Stream 1")

Versioned State

python
# Create and use TemporalStore
store = TemporalStore()
v1 = store.commit({"count": 0}, "Initial state")
v2 = store.commit({"count": 1}, "Increment count")
history = store.get_history()  # Get all versions

Perspective Shifting

python
# Create and use PerspectiveAware
raw_data = {"users": [{"id": 1, "name": "Alice", "email": "[email protected]", "role": "admin"}]}
multi_view = PerspectiveAware(raw_data)
multi_view.add_perspective("public", lambda d: {"users": [{"id": u["id"], "name": u["name"]} for u in d["users"]]})
public_view = multi_view.view("public")  # Filtered view

Effect System

python
# Create and use EffectMonad and EffectHandler
handler = EffectHandler()
handler.register_handler(EffectType.MODEL_CALL, lambda p: f"Called model with: {p}")

operation = EffectMonad.pure("Start") \
    .with_effect(Effect(EffectType.MODEL_CALL, "What's 2+2?")) \
    .map(lambda s: s + " -> Processing") \
    .with_effect(Effect(EffectType.STATE_MODIFY, "Update result"))

result = operation.run(handler.handle)  # "Start -> Processing"
executed = handler.get_executed_effects()  # List of all effects that were handled

Quantum Partitioning

python
# Create and use QuantumPartitioner
partitioner = QuantumPartitioner()
unit1 = partitioner.add_unit(lambda ctx: ctx["data"] * 2, name="Double")
unit2 = partitioner.add_unit(lambda ctx: ctx["data"] + 10, name="Add10")
unit3 = partitioner.add_unit(lambda ctx: f"Result: {ctx['data']}", dependencies=[unit1, unit2], name="Format")

results = partitioner.execute({"data": 5})  # Parallel execution with dependencies

Integrated Examples

Document-Based Blog Platform with Versioning

python
# Combining Temporal Versioning and State Projection for a blogging platform
# This demonstrates the synergy between command-pattern changes and milestone versions

# Initialize the systems
bus = EventBus()
store = TemporalStore()  # For major version snapshots
projector = StateProjector({"posts": []})  # For granular changes

# Create perspectives for different user roles
blog_views = PerspectiveAware(projector.get_current_state())
blog_views.add_perspective("admin", lambda s: s)  # Full access
blog_views.add_perspective("public", lambda s: {
    "posts": [
        {k: v for k, v in post.items() if k != "draft_content"}
        for post in s["posts"] if post.get("published", False)
    ]
})

# Register event handlers
def on_content_change(event):
    # Apply delta to state projector for granular changes
    delta = event.data.get("delta")
    projector.apply_delta(delta,
                          description=event.data.get("description", ""),
                          source=event.source)

    # Update the view data
    blog_views.update_data(projector.get_current_state())

    # If this is a major change (e.g., publish), create a snapshot in temporal store
    if event.data.get("major_change", False):
        store.commit(projector.get_current_state(),
                    description=event.data.get("description", ""))

bus.subscribe(EventType.DOCUMENT_PROCESSED, on_content_change)

# Example: Create a new blog post (fine-grained change)
def add_post(title, content, author):
    post = {
        "id": str(uuid.uuid4()),
        "title": title,
        "content": content,
        "draft_content": content,
        "author": author,
        "created_at": time.time(),
        "updated_at": time.time(),
        "published": False
    }

    # Create a delta that adds this post to the posts array
    def add_post_delta(state):
        new_state = copy.deepcopy(state)
        new_state["posts"].append(post)
        return new_state

    # Publish the change event
    bus.publish(
        EventType.DOCUMENT_PROCESSED,
        data={
            "delta": add_post_delta,
            "description": f"Add post: {title}",
            "major_change": False
        },
        source="user_editor"
    )

    return post["id"]

# Example: Update a post (fine-grained change)
def update_post(post_id, new_content):
    # Create a delta that updates specific post content
    def update_post_delta(state):
        new_state = copy.deepcopy(state)
        for post in new_state["posts"]:
            if post["id"] == post_id:
                post["draft_content"] = new_content
                post["updated_at"] = time.time()
                break
        return new_state

    # Publish the change event
    bus.publish(
        EventType.DOCUMENT_PROCESSED,
        data={
            "delta": update_post_delta,
            "description": f"Update post draft: {post_id}",
            "major_change": False
        },
        source="user_editor"
    )

# Example: Publish a post (major change - create a snapshot)
def publish_post(post_id):
    # Create a delta that marks the post as published and updates content
    def publish_post_delta(state):
        new_state = copy.deepcopy(state)
        for post in new_state["posts"]:
            if post["id"] == post_id:
                post["published"] = True
                post["content"] = post["draft_content"]  # Promote draft content
                post["updated_at"] = time.time()
                break
        return new_state

    # Publish the change event
    bus.publish(
        EventType.DOCUMENT_PROCESSED,
        data={
            "delta": publish_post_delta,
            "description": f"Publish post: {post_id}",
            "major_change": True  # This will trigger a temporal store snapshot
        },
        source="user_editor"
    )

# Example usage
post_id = add_post("Getting Started with NERV", "Initial content...", "Alice")
update_post(post_id, "Updated draft content...")
update_post(post_id, "Final draft content ready for publishing...")
publish_post(post_id)

# Get current public view
public_view = blog_views.view("public")

# Get full history of major versions
version_history = store.get_history()

# Get fine-grained change history
change_history = projector.get_delta_history()

Parallel Provider Management with Effect Tracking

python
# This example shows how to combine QuantumPartitioning, Effect System, and Event Bus
# for robust provider operations with full effect tracking

# Initialize systems
bus = EventBus()
effect_handler = EffectHandler()
partitioner = QuantumPartitioner()

# Set up effect handler
effect_handler.register_handler(
    EffectType.MODEL_CALL,
    lambda payload: f"Called model: {payload['model']} with content: {payload['content'][:30]}..."
)
effect_handler.register_handler(
    EffectType.NETWORK_REQUEST,
    lambda payload: f"Network request to {payload['url']}"
)

# Notify event bus on effects
def effect_to_event(effect, result):
    bus.publish(
        EventType.PROVIDER_CONNECTED if effect.type == EffectType.NETWORK_REQUEST else EventType.MODEL_CALL,
        data={"effect": effect, "result": result},
        source="provider_manager"
    )

# Wrap effect handling
def handle_effect_with_events(effect):
    result = effect_handler.handle(effect)
    effect_to_event(effect, result)
    return result

# Create provider operation units
def create_provider_unit(provider_config):
    def provider_operation(context):
        # Create a chain of effects for this provider
        return EffectMonad.pure(provider_config) \
            .with_effect(Effect(
                EffectType.NETWORK_REQUEST,
                {"url": provider_config["endpoint"]}
            )) \
            .map(lambda config: {**config, "status": "connected"}) \
            .with_effect(Effect(
                EffectType.MODEL_CALL,
                {"model": provider_config["model"], "content": context["prompt"]}
            )) \
            .run(handle_effect_with_events)

    return partitioner.add_unit(
        provider_operation,
        name=f"Provider-{provider_config['name']}",
        timeout=provider_config.get("timeout", 10.0)
    )

# Define providers in priority order
providers = [
    {"name": "primary", "endpoint": "https://api.provider1.com", "model": "gpt-4", "priority": 1},
    {"name": "backup", "endpoint": "https://api.provider2.com", "model": "claude-3", "priority": 2},
    {"name": "fallback", "endpoint": "https://api.provider3.com", "model": "local-model", "priority": 3}
]

# Create units for each provider
provider_units = [create_provider_unit(p) for p in providers]

# Define collection unit that depends on all provider operations
def collect_results(context):
    # Get results from provider units that succeeded
    results = [r for r in context.get("provider_results", {}).values() if r]
    if not results:
        return {"status": "failed", "message": "All providers failed"}

    # Return the first successful result based on priority
    priority_order = {p["name"]: p["priority"] for p in providers}
    results.sort(key=lambda r: priority_order.get(r.get("name"), 999))
    return results[0]

result_collector = partitioner.add_unit(
    collect_results,
    dependencies=provider_units,
    name="ResultCollector"
)

# Execute the parallel provider operations with fallbacks
execution_context = {
    "prompt": "Tell me about the NERV architecture",
    "max_tokens": 1000
}

results = partitioner.execute(execution_context)
final_result = results.get(result_collector.id)

# Get history of effects that were executed
effect_history = effect_handler.get_executed_effects()

# Get events that were published
event_history = bus.get_history(limit=20)

Released under the MIT License.