title: Diff Synchronizer
DiffSynchronizer Implementation
Overview
The DiffSynchronizer component implements the State Synchronization pattern in NERV, providing mechanisms for reconciling state between different systems, components, or representations. It handles bidirectional updates while managing conflicts and transformations, ensuring consistent state across system boundaries.
Architectural Role
The DiffSynchronizer serves as a key integration component in Atlas:
- Model Reconciliation: Synchronizes data between different representations
- Conflict Resolution: Manages conflicting updates systematically
- Efficient Updates: Transfers only changed data rather than complete state
- Bidirectional Flow: Handles updates in both directions between systems
- Schema Adaptation: Manages transforms between different data schemas
Implementation Details
Library: DiffSync
DiffSync was chosen for the DiffSynchronizer implementation because it provides:
- Efficient differencing: Smart comparison of models to identify changes
- Conflict resolution: Strategies for handling concurrent modifications
- Bidirectional sync: Support for updates flowing in both directions
- Flexible models: Support for different model structures and types
- Adapter framework: Easy integration with external systems
Key Features
The DiffSynchronizer implementation includes:
- Flexible model adapters for adapting different data representations
- Multiple conflict resolution strategies for handling inconsistencies
- Incremental synchronization for efficient updates
- Bidirectional and unidirectional modes for different sync scenarios
- Event-driven synchronization integration with the event system
Core Data Types
from typing import Dict, List, Any, Optional, Type, TypeVar, Generic, Callable, Set
from diffsync import DiffSync as DiffSyncBase, DiffModel
from enum import Enum, auto
import uuid
import time
# Type variables
T = TypeVar('T') # Source model type
S = TypeVar('S') # Target model type
class ConflictStrategy(Enum):
"""Strategies for handling synchronization conflicts."""
SOURCE_WINS = auto()
TARGET_WINS = auto()
LAST_UPDATE_WINS = auto()
MERGE = auto()
MANUAL = auto()
class SyncDirection(Enum):
"""Direction of synchronization flow."""
BIDIRECTIONAL = auto()
SOURCE_TO_TARGET = auto()
TARGET_TO_SOURCE = auto()
class DiffResult(Generic[T, S]):
"""Result of a synchronization operation."""
def __init__(self):
self.changes: Dict[str, Any] = {}
self.conflicts: List[Dict[str, Any]] = []
self.source_model: Optional[T] = None
self.target_model: Optional[S] = None
self.timestamp: float = time.time()
self.success: bool = True
self.error: Optional[Exception] = None
Implementation Structure
The DiffSynchronizer builds on DiffSync’s core functionality while adding a more robust framework:
class DiffSynchronizer:
"""Central component for state synchronization in NERV."""
def __init__(self, event_bus=None):
"""Initialize the diff synchronizer."""
self.adapters = {}
self.event_bus = event_bus
self._lock = threading.RLock()
def register_adapter(self, model_type: Type, adapter_class: Type) -> str:
"""Register a model adapter for a specific model type."""
# Implementation details...
def create_sync_engine(self, source_type: Type, target_type: Type,
conflict_strategy: ConflictStrategy = ConflictStrategy.LAST_UPDATE_WINS) -> "SyncEngine":
"""Create a synchronization engine for specific model types."""
# Implementation details...
def sync(self, source: Any, target: Any,
direction: SyncDirection = SyncDirection.BIDIRECTIONAL,
conflict_strategy: ConflictStrategy = ConflictStrategy.LAST_UPDATE_WINS) -> DiffResult:
"""Synchronize state between source and target models."""
# Implementation details...
Performance Considerations
DiffSync is designed for efficiency, but the DiffSynchronizer implementation adds several optimizations:
Adapter Caching
The DiffSynchronizer caches model adapters for improved performance:
# Adapter caching for better performance
def _get_adapter_for_model(self, model: Any) -> "ModelAdapter":
"""Get or create adapter for a specific model instance."""
model_type = type(model)
# Check adapter cache
if model_type in self._adapter_cache:
adapter_class = self._adapter_cache[model_type]
return adapter_class(model)
# Find adapter by type hierarchy
for registered_type, adapter_class in self.adapters.items():
if isinstance(model, registered_type):
# Cache for future use
self._adapter_cache[model_type] = adapter_class
return adapter_class(model)
raise ValueError(f"No adapter registered for model type: {model_type}")
Incremental Synchronization
The DiffSynchronizer supports incremental syncs for large models:
# Incremental synchronization for large models
def incremental_sync(self, source: Any, target: Any,
path: str = None) -> DiffResult:
"""Synchronize only a portion of the models specified by path."""
source_adapter = self._get_adapter_for_model(source)
target_adapter = self._get_adapter_for_model(target)
# Extract subset of models at path
source_subset = source_adapter.get_subset(path)
target_subset = target_adapter.get_subset(path)
# Perform sync on subset
result = self.sync(source_subset, target_subset)
# Apply changes back to original models
source_adapter.apply_subset_changes(path, result.changes)
target_adapter.apply_subset_changes(path, result.changes)
return result
Conflict Resolution Optimization
The DiffSynchronizer implements efficient conflict resolution strategies:
# Efficient conflict resolution
def _resolve_conflicts(self, conflicts: List[Dict], strategy: ConflictStrategy) -> Dict:
"""Resolve conflicts based on selected strategy efficiently."""
if not conflicts:
return {}
resolutions = {}
if strategy == ConflictStrategy.SOURCE_WINS:
# Batch process all conflicts for source wins
for conflict in conflicts:
resolutions[conflict["path"]] = conflict["source_value"]
elif strategy == ConflictStrategy.TARGET_WINS:
# Batch process all conflicts for target wins
for conflict in conflicts:
resolutions[conflict["path"]] = conflict["target_value"]
elif strategy == ConflictStrategy.LAST_UPDATE_WINS:
# Choose based on timestamps
for conflict in conflicts:
if conflict["source_timestamp"] > conflict["target_timestamp"]:
resolutions[conflict["path"]] = conflict["source_value"]
else:
resolutions[conflict["path"]] = conflict["target_value"]
return resolutions
Change Batching
The DiffSynchronizer groups changes for more efficient application:
# Batch change application for better performance
def _apply_changes(self, model: Any, changes: Dict[str, Any]) -> None:
"""Apply changes to model in efficient batches."""
adapter = self._get_adapter_for_model(model)
# Group changes by section
changes_by_section = {}
for path, value in changes.items():
section = path.split('.')[0]
if section not in changes_by_section:
changes_by_section[section] = {}
changes_by_section[section][path] = value
# Apply changes section by section
for section, section_changes in changes_by_section.items():
adapter.apply_changes_batch(section, section_changes)
Integration Patterns
Configuration Synchronization
# Type definition only - not full implementation
class ConfigurationSync:
"""Synchronizes configuration across components."""
def __init__(self, diff_synchronizer: DiffSynchronizer):
"""Initialize with diff synchronizer."""
self.diff_synchronizer = diff_synchronizer
# Register configuration adapters
self.diff_synchronizer.register_adapter(
dict, DictConfigAdapter
)
self.diff_synchronizer.register_adapter(
Config, ConfigModelAdapter
)
def sync_component_config(self, central_config: Dict[str, Any],
component_config: Any) -> DiffResult:
"""Synchronize central configuration with component config."""
return self.diff_synchronizer.sync(
central_config,
component_config,
direction=SyncDirection.SOURCE_TO_TARGET
)
Document Reconciliation
# Type definition only - not full implementation
class DocumentSync:
"""Synchronizes document state between systems."""
def __init__(self, diff_synchronizer: DiffSynchronizer,
event_bus: Optional[EventBus] = None):
"""Initialize with diff synchronizer and optional event bus."""
self.diff_synchronizer = diff_synchronizer
self.event_bus = event_bus
# Register document adapters
self.diff_synchronizer.register_adapter(
Document, DocumentAdapter
)
self.diff_synchronizer.register_adapter(
ExternalDocument, ExternalDocumentAdapter
)
def sync_with_external(self, internal_doc: Document,
external_doc: ExternalDocument) -> DiffResult:
"""Synchronize internal document with external representation."""
result = self.diff_synchronizer.sync(
internal_doc,
external_doc,
conflict_strategy=ConflictStrategy.LAST_UPDATE_WINS
)
# Emit events for document changes
if self.event_bus and result.changes:
self.event_bus.publish(
EventType.DOCUMENT_SYNCHRONIZED,
{
"document_id": internal_doc.id,
"changes": result.changes,
"conflicts": result.conflicts
}
)
return result
Model Versioning
# Type definition only - not full implementation
class VersionedModelSync:
"""Synchronizes models with version tracking."""
def __init__(self, diff_synchronizer: DiffSynchronizer,
temporal_store: TemporalStore):
"""Initialize with diff synchronizer and temporal store."""
self.diff_synchronizer = diff_synchronizer
self.temporal_store = temporal_store
def sync_with_history(self, source_model: Any, target_model: Any) -> DiffResult:
"""Synchronize models while recording version history."""
# Perform sync
result = self.diff_synchronizer.sync(source_model, target_model)
# Record versions if changes were made
if result.changes:
# Create version snapshots
source_version = self.temporal_store.commit(
result.source_model,
f"Sync with {type(target_model).__name__}"
)
target_version = self.temporal_store.commit(
result.target_model,
f"Sync with {type(source_model).__name__}"
)
# Link versions
result.metadata = {
"source_version": source_version,
"target_version": target_version
}
return result
Usage Patterns
Basic Model Synchronization
Basic Synchronization
# Initialize synchronizer
diff_synchronizer = DiffSynchronizer()
# Register model adapters
diff_synchronizer.register_adapter(Document, DocumentAdapter)
diff_synchronizer.register_adapter(ExternalDocument, ExternalDocumentAdapter)
# Create source and target model instances
internal_doc = Document(id="doc1", title="Internal Document", content="Content")
external_doc = ExternalDocument(id="ext1", title="External Doc", body="Content")
# Perform bidirectional synchronization
result = diff_synchronizer.sync(
internal_doc,
external_doc,
direction=SyncDirection.BIDIRECTIONAL
)
# Check for conflicts
if result.conflicts:
print(f"Resolved {len(result.conflicts)} conflicts")
Conflict Resolution Strategy
Conflict Management
# Create synchronizer with event notification
diff_synchronizer = DiffSynchronizer(event_bus=event_bus)
# Register custom conflict resolver
class MergeFieldsResolver:
"""Custom resolver that merges fields from both models."""
def resolve(self, conflict):
"""Merge text fields, use newer timestamp for other fields."""
if conflict["field_type"] == "text":
# Merge text content
return f"{conflict['source_value']} | {conflict['target_value']}"
else:
# Use newer value for non-text fields
if conflict["source_timestamp"] > conflict["target_timestamp"]:
return conflict["source_value"]
else:
return conflict["target_value"]
# Perform sync with custom conflict resolution
result = diff_synchronizer.sync(
source_model,
target_model,
conflict_strategy=ConflictStrategy.MANUAL,
conflict_resolver=MergeFieldsResolver()
)
Incremental Synchronization
Large Model Sync
# Synchronize large models incrementally
large_source = LargeDataModel() # Model with many nested sections
large_target = ExternalLargeModel() # External system representation
# Register adapters for large models
diff_synchronizer.register_adapter(LargeDataModel, LargeModelAdapter)
diff_synchronizer.register_adapter(ExternalLargeModel, ExternalLargeModelAdapter)
# Sync only the "metadata" section
metadata_result = diff_synchronizer.incremental_sync(
large_source,
large_target,
path="metadata"
)
# Sync only the "content.chapter1" section
chapter1_result = diff_synchronizer.incremental_sync(
large_source,
large_target,
path="content.chapter1"
)
Performance Optimization
The DiffSynchronizer implementation includes several performance optimizations:
- Adapter Caching: Reusing model adapters for consistent model types
- Incremental Synchronization: Supporting partial model updates for large structures
- Change Batching: Grouping related changes for efficient application
- Strategic Differencing: Using efficient differencing algorithms for specific model types
- Optimized Conflict Resolution: Fast-path handling of common conflict scenarios
Relationship to Patterns
Implements:
- State Synchronization: Primary implementation
Supports:
- Reactive Event Mesh: Emits events for synchronization actions
- Temporal Versioning: Integrates with version tracking
- Perspective Shifting: Supports different views of the same data
Implementation Reference
NERV’s DiffSynchronizer implementation provides a powerful data synchronization mechanism that supports bidirectional state reconciliation across system boundaries.