Skip to content

title: Diff Synchronizer


DiffSynchronizer Implementation

Overview

The DiffSynchronizer component implements the State Synchronization pattern in NERV, providing mechanisms for reconciling state between different systems, components, or representations. It handles bidirectional updates while managing conflicts and transformations, ensuring consistent state across system boundaries.

Architectural Role

The DiffSynchronizer serves as a key integration component in Atlas:

  • Model Reconciliation: Synchronizes data between different representations
  • Conflict Resolution: Manages conflicting updates systematically
  • Efficient Updates: Transfers only changed data rather than complete state
  • Bidirectional Flow: Handles updates in both directions between systems
  • Schema Adaptation: Manages transforms between different data schemas

Implementation Details

Library: DiffSync

DiffSync was chosen for the DiffSynchronizer implementation because it provides:

  • Efficient differencing: Smart comparison of models to identify changes
  • Conflict resolution: Strategies for handling concurrent modifications
  • Bidirectional sync: Support for updates flowing in both directions
  • Flexible models: Support for different model structures and types
  • Adapter framework: Easy integration with external systems

Key Features

The DiffSynchronizer implementation includes:

  1. Flexible model adapters for adapting different data representations
  2. Multiple conflict resolution strategies for handling inconsistencies
  3. Incremental synchronization for efficient updates
  4. Bidirectional and unidirectional modes for different sync scenarios
  5. Event-driven synchronization integration with the event system

Core Data Types

python
from typing import Dict, List, Any, Optional, Type, TypeVar, Generic, Callable, Set
from diffsync import DiffSync as DiffSyncBase, DiffModel
from enum import Enum, auto
import uuid
import time

# Type variables
T = TypeVar('T')  # Source model type
S = TypeVar('S')  # Target model type

class ConflictStrategy(Enum):
    """Strategies for handling synchronization conflicts."""
    SOURCE_WINS = auto()
    TARGET_WINS = auto()
    LAST_UPDATE_WINS = auto()
    MERGE = auto()
    MANUAL = auto()

class SyncDirection(Enum):
    """Direction of synchronization flow."""
    BIDIRECTIONAL = auto()
    SOURCE_TO_TARGET = auto()
    TARGET_TO_SOURCE = auto()

class DiffResult(Generic[T, S]):
    """Result of a synchronization operation."""
    def __init__(self):
        self.changes: Dict[str, Any] = {}
        self.conflicts: List[Dict[str, Any]] = []
        self.source_model: Optional[T] = None
        self.target_model: Optional[S] = None
        self.timestamp: float = time.time()
        self.success: bool = True
        self.error: Optional[Exception] = None

Implementation Structure

The DiffSynchronizer builds on DiffSync’s core functionality while adding a more robust framework:

python
class DiffSynchronizer:
    """Central component for state synchronization in NERV."""

    def __init__(self, event_bus=None):
        """Initialize the diff synchronizer."""
        self.adapters = {}
        self.event_bus = event_bus
        self._lock = threading.RLock()

    def register_adapter(self, model_type: Type, adapter_class: Type) -> str:
        """Register a model adapter for a specific model type."""
        # Implementation details...

    def create_sync_engine(self, source_type: Type, target_type: Type,
                         conflict_strategy: ConflictStrategy = ConflictStrategy.LAST_UPDATE_WINS) -> "SyncEngine":
        """Create a synchronization engine for specific model types."""
        # Implementation details...

    def sync(self, source: Any, target: Any,
           direction: SyncDirection = SyncDirection.BIDIRECTIONAL,
           conflict_strategy: ConflictStrategy = ConflictStrategy.LAST_UPDATE_WINS) -> DiffResult:
        """Synchronize state between source and target models."""
        # Implementation details...

Performance Considerations

DiffSync is designed for efficiency, but the DiffSynchronizer implementation adds several optimizations:

Adapter Caching

The DiffSynchronizer caches model adapters for improved performance:

python
# Adapter caching for better performance
def _get_adapter_for_model(self, model: Any) -> "ModelAdapter":
    """Get or create adapter for a specific model instance."""
    model_type = type(model)

    # Check adapter cache
    if model_type in self._adapter_cache:
        adapter_class = self._adapter_cache[model_type]
        return adapter_class(model)

    # Find adapter by type hierarchy
    for registered_type, adapter_class in self.adapters.items():
        if isinstance(model, registered_type):
            # Cache for future use
            self._adapter_cache[model_type] = adapter_class
            return adapter_class(model)

    raise ValueError(f"No adapter registered for model type: {model_type}")

Incremental Synchronization

The DiffSynchronizer supports incremental syncs for large models:

python
# Incremental synchronization for large models
def incremental_sync(self, source: Any, target: Any,
                 path: str = None) -> DiffResult:
    """Synchronize only a portion of the models specified by path."""
    source_adapter = self._get_adapter_for_model(source)
    target_adapter = self._get_adapter_for_model(target)

    # Extract subset of models at path
    source_subset = source_adapter.get_subset(path)
    target_subset = target_adapter.get_subset(path)

    # Perform sync on subset
    result = self.sync(source_subset, target_subset)

    # Apply changes back to original models
    source_adapter.apply_subset_changes(path, result.changes)
    target_adapter.apply_subset_changes(path, result.changes)

    return result

Conflict Resolution Optimization

The DiffSynchronizer implements efficient conflict resolution strategies:

python
# Efficient conflict resolution
def _resolve_conflicts(self, conflicts: List[Dict], strategy: ConflictStrategy) -> Dict:
    """Resolve conflicts based on selected strategy efficiently."""
    if not conflicts:
        return {}

    resolutions = {}

    if strategy == ConflictStrategy.SOURCE_WINS:
        # Batch process all conflicts for source wins
        for conflict in conflicts:
            resolutions[conflict["path"]] = conflict["source_value"]

    elif strategy == ConflictStrategy.TARGET_WINS:
        # Batch process all conflicts for target wins
        for conflict in conflicts:
            resolutions[conflict["path"]] = conflict["target_value"]

    elif strategy == ConflictStrategy.LAST_UPDATE_WINS:
        # Choose based on timestamps
        for conflict in conflicts:
            if conflict["source_timestamp"] > conflict["target_timestamp"]:
                resolutions[conflict["path"]] = conflict["source_value"]
            else:
                resolutions[conflict["path"]] = conflict["target_value"]

    return resolutions

Change Batching

The DiffSynchronizer groups changes for more efficient application:

python
# Batch change application for better performance
def _apply_changes(self, model: Any, changes: Dict[str, Any]) -> None:
    """Apply changes to model in efficient batches."""
    adapter = self._get_adapter_for_model(model)

    # Group changes by section
    changes_by_section = {}
    for path, value in changes.items():
        section = path.split('.')[0]
        if section not in changes_by_section:
            changes_by_section[section] = {}
        changes_by_section[section][path] = value

    # Apply changes section by section
    for section, section_changes in changes_by_section.items():
        adapter.apply_changes_batch(section, section_changes)

Integration Patterns

Configuration Synchronization

python
# Type definition only - not full implementation
class ConfigurationSync:
    """Synchronizes configuration across components."""

    def __init__(self, diff_synchronizer: DiffSynchronizer):
        """Initialize with diff synchronizer."""
        self.diff_synchronizer = diff_synchronizer

        # Register configuration adapters
        self.diff_synchronizer.register_adapter(
            dict, DictConfigAdapter
        )
        self.diff_synchronizer.register_adapter(
            Config, ConfigModelAdapter
        )

    def sync_component_config(self, central_config: Dict[str, Any],
                            component_config: Any) -> DiffResult:
        """Synchronize central configuration with component config."""
        return self.diff_synchronizer.sync(
            central_config,
            component_config,
            direction=SyncDirection.SOURCE_TO_TARGET
        )

Document Reconciliation

python
# Type definition only - not full implementation
class DocumentSync:
    """Synchronizes document state between systems."""

    def __init__(self, diff_synchronizer: DiffSynchronizer,
               event_bus: Optional[EventBus] = None):
        """Initialize with diff synchronizer and optional event bus."""
        self.diff_synchronizer = diff_synchronizer
        self.event_bus = event_bus

        # Register document adapters
        self.diff_synchronizer.register_adapter(
            Document, DocumentAdapter
        )
        self.diff_synchronizer.register_adapter(
            ExternalDocument, ExternalDocumentAdapter
        )

    def sync_with_external(self, internal_doc: Document,
                         external_doc: ExternalDocument) -> DiffResult:
        """Synchronize internal document with external representation."""
        result = self.diff_synchronizer.sync(
            internal_doc,
            external_doc,
            conflict_strategy=ConflictStrategy.LAST_UPDATE_WINS
        )

        # Emit events for document changes
        if self.event_bus and result.changes:
            self.event_bus.publish(
                EventType.DOCUMENT_SYNCHRONIZED,
                {
                    "document_id": internal_doc.id,
                    "changes": result.changes,
                    "conflicts": result.conflicts
                }
            )

        return result

Model Versioning

python
# Type definition only - not full implementation
class VersionedModelSync:
    """Synchronizes models with version tracking."""

    def __init__(self, diff_synchronizer: DiffSynchronizer,
               temporal_store: TemporalStore):
        """Initialize with diff synchronizer and temporal store."""
        self.diff_synchronizer = diff_synchronizer
        self.temporal_store = temporal_store

    def sync_with_history(self, source_model: Any, target_model: Any) -> DiffResult:
        """Synchronize models while recording version history."""
        # Perform sync
        result = self.diff_synchronizer.sync(source_model, target_model)

        # Record versions if changes were made
        if result.changes:
            # Create version snapshots
            source_version = self.temporal_store.commit(
                result.source_model,
                f"Sync with {type(target_model).__name__}"
            )

            target_version = self.temporal_store.commit(
                result.target_model,
                f"Sync with {type(source_model).__name__}"
            )

            # Link versions
            result.metadata = {
                "source_version": source_version,
                "target_version": target_version
            }

        return result

Usage Patterns

Basic Model Synchronization

Basic Synchronization

python
# Initialize synchronizer
diff_synchronizer = DiffSynchronizer()

# Register model adapters
diff_synchronizer.register_adapter(Document, DocumentAdapter)
diff_synchronizer.register_adapter(ExternalDocument, ExternalDocumentAdapter)

# Create source and target model instances
internal_doc = Document(id="doc1", title="Internal Document", content="Content")
external_doc = ExternalDocument(id="ext1", title="External Doc", body="Content")

# Perform bidirectional synchronization
result = diff_synchronizer.sync(
    internal_doc,
    external_doc,
    direction=SyncDirection.BIDIRECTIONAL
)

# Check for conflicts
if result.conflicts:
    print(f"Resolved {len(result.conflicts)} conflicts")

Conflict Resolution Strategy

Conflict Management

python
# Create synchronizer with event notification
diff_synchronizer = DiffSynchronizer(event_bus=event_bus)

# Register custom conflict resolver
class MergeFieldsResolver:
    """Custom resolver that merges fields from both models."""

    def resolve(self, conflict):
        """Merge text fields, use newer timestamp for other fields."""
        if conflict["field_type"] == "text":
            # Merge text content
            return f"{conflict['source_value']} | {conflict['target_value']}"
        else:
            # Use newer value for non-text fields
            if conflict["source_timestamp"] > conflict["target_timestamp"]:
                return conflict["source_value"]
            else:
                return conflict["target_value"]

# Perform sync with custom conflict resolution
result = diff_synchronizer.sync(
    source_model,
    target_model,
    conflict_strategy=ConflictStrategy.MANUAL,
    conflict_resolver=MergeFieldsResolver()
)

Incremental Synchronization

Large Model Sync

python
# Synchronize large models incrementally
large_source = LargeDataModel()  # Model with many nested sections
large_target = ExternalLargeModel()  # External system representation

# Register adapters for large models
diff_synchronizer.register_adapter(LargeDataModel, LargeModelAdapter)
diff_synchronizer.register_adapter(ExternalLargeModel, ExternalLargeModelAdapter)

# Sync only the "metadata" section
metadata_result = diff_synchronizer.incremental_sync(
    large_source,
    large_target,
    path="metadata"
)

# Sync only the "content.chapter1" section
chapter1_result = diff_synchronizer.incremental_sync(
    large_source,
    large_target,
    path="content.chapter1"
)

Performance Optimization

The DiffSynchronizer implementation includes several performance optimizations:

  1. Adapter Caching: Reusing model adapters for consistent model types
  2. Incremental Synchronization: Supporting partial model updates for large structures
  3. Change Batching: Grouping related changes for efficient application
  4. Strategic Differencing: Using efficient differencing algorithms for specific model types
  5. Optimized Conflict Resolution: Fast-path handling of common conflict scenarios

Relationship to Patterns

Implements:

Supports:

Implementation Reference

NERV’s DiffSynchronizer implementation provides a powerful data synchronization mechanism that supports bidirectional state reconciliation across system boundaries.

Released under the MIT License.