Save Node Edit History: Before & After JSON

by Alex Johnson 44 views

In today's data-driven world, maintaining the integrity and traceability of information is paramount. Whether you're managing complex knowledge graphs, intricate datasets, or critical business information, understanding how your data evolves over time is essential. This is where a robust edit tracking system becomes invaluable. We'll explore how to implement a feature that meticulously saves the edit process of nodes, capturing their state before and after any modification, along with their associated relationships. This ensures a complete audit trail, empowers data rollback capabilities, and enhances overall data governance. Let's dive into how we can achieve this using Python, focusing on a practical implementation within a framework like Iroko.

Understanding the Need for Edit History

Imagine a scenario where a critical node in your system is updated, and shortly after, inconsistencies or errors appear. Without a proper edit history, pinpointing the cause, identifying who made the change, and reverting to a previous stable state can be a daunting, if not impossible, task. This is precisely why saving the before and after states of a node, along with its relationships, is so crucial. It provides a detailed narrative of data evolution, offering:

  • Accountability: Clearly identifies the user responsible for each change.
  • Reproducibility: Allows for the recreation of past data states.
  • Auditing: Provides a verifiable record for compliance and security purposes.
  • Debugging: Simplifies the process of identifying the root cause of data anomalies.
  • Rollback: Enables seamless restoration of previous versions of data.

This comprehensive approach to tracking edits moves beyond simple logging; it preserves the context of the data at the moment of modification. By saving the complete JSON representation of the node and its relationships, we capture not just the changed values but the entire snapshot, making restoration and analysis far more straightforward. This detailed capture is what we aim to implement.

Technical Implementation: A Step-by-Step Approach

To effectively implement a feature that saves the edit process of nodes with before and after JSON states, we need to integrate several components. This involves defining database models, creating API endpoints, and introducing logic to capture and store the relevant data. We'll be using Python, likely within a web framework context, and leveraging a graph database for node storage and a relational database for audit logs.

1. Database Model for Edit History

First, we need a dedicated table to store the edit history. This table should record key information about each edit operation. Essential fields include a unique identifier for the edit record, the ID of the user who performed the edit, the ID of the node that was edited, the type of operation performed (e.g., update, create, delete, relationship change), a timestamp of when the edit occurred, and crucially, the JSON representations of the node's state before and after the edit. We might also store a diff of the changes made and specific data related to relationship modifications.

Example EditHistory Model:

from sqlalchemy import Column, DateTime, String, JSON, ForeignKey, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from iroko.database import Base
import uuid

class EditHistory(Base):
    __tablename__ = "edit_history"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
    node_id = Column(String(255), nullable=False, index=True)  # iroko_uuid of edited node
    operation = Column(String(50), nullable=False)  # 'create', 'update', 'delete', 'relationship'
    timestamp = Column(DateTime(timezone=True), server_default=func.now())
    
    # Store the changes as JSON
    before_state = Column(JSON, nullable=True)  # Node state before edit
    after_state = Column(JSON, nullable=True)   # Node state after edit
    changes = Column(JSON, nullable=True)       # Diff of what changed
    
    # Relationship changes (for relationship operations)
    relationship_data = Column(JSON, nullable=True)
    
    # User relationship
    user = relationship("User", backref="edit_history")

This model lays the foundation for storing comprehensive edit logs. The node_id will link to the specific node in our graph, operation categorizes the change, and before_state/after_state will hold the detailed JSON snapshots.

2. API Schema Definitions

To interact with this new history tracking, we need to define API schemas. These schemas will dictate the structure of data sent to and received from the API. We'll need schemas for creating new edit history entries, and response schemas for retrieving them. The creation schema should accommodate optional fields for before_state, after_state, changes, and relationship_data, as not all operations might require all these fields. The response schema should include all relevant details, potentially linking to user information.

Example EditHistory Schemas:

from pydantic import BaseModel
from typing import Optional, Dict, Any
from uuid import UUID
from datetime import datetime

# Assuming UserResponse schema is defined elsewhere in your auth module
class UserResponse(BaseModel):
    id: UUID
    email: str
    full_name: Optional[str] = None
    is_active: bool
    is_superuser: bool
    created_at: datetime
    updated_at: datetime

class EditHistoryBase(BaseModel):
    node_id: str
    operation: str
    timestamp: datetime

class EditHistoryCreate(EditHistoryBase):
    before_state: Optional[Dict[str, Any]] = None
    after_state: Optional[Dict[str, Any]] = None
    changes: Optional[Dict[str, Any]] = None
    relationship_data: Optional[Dict[str, Any]] = None

class EditHistoryResponse(EditHistoryBase):
    id: UUID
    user_id: UUID
    user: Optional[UserResponse] = None
    before_state: Optional[Dict[str, Any]] = None
    after_state: Optional[Dict[str, Any]] = None
    changes: Optional[Dict[str, Any]] = None
    relationship_data: Optional[Dict[str, Any]] = None
    
    class Config:
        from_attributes = True

These schemas ensure data consistency and facilitate clear communication between the client and the server.

3. Edit History Service Layer

A service layer is essential for abstracting the database operations related to edit history. This service will provide methods for creating new edit records and for querying existing history, either for a specific node or for a specific user. This separation of concerns makes the codebase cleaner and easier to maintain.

Example EditHistoryService:

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
from typing import List, Dict, Any
from uuid import UUID

from iroko.auth.models import EditHistory
from iroko.auth.schemas import EditHistoryResponse, UserResponse

class EditHistoryService:
    def __init__(self, db: AsyncSession):
        self.db = db
    
    async def create_edit_record(
        self, 
        user_id: UUID, 
        node_id: str, 
        operation: str,
        before_state: Optional[Dict] = None,
        after_state: Optional[Dict] = None,
        changes: Optional[Dict] = None,
        relationship_data: Optional[Dict] = None
    ) -> EditHistory:
        """Create a new edit history record"""
        edit_record = EditHistory(
            user_id=user_id,
            node_id=node_id,
            operation=operation,
            before_state=before_state,
            after_state=after_state,
            changes=changes,
            relationship_data=relationship_data
        )
        
        self.db.add(edit_record)
        await self.db.commit()
        await self.db.refresh(edit_record)
        return edit_record
    
    async def get_edit_history_by_node(
        self, 
        node_id: str, 
        skip: int = 0, 
        limit: int = 100
    ) -> List[EditHistory]:
        """Get edit history for a specific node"""
        result = await self.db.execute(
            select(EditHistory)
            .options(selectinload(EditHistory.user))
            .filter(EditHistory.node_id == node_id)
            .order_by(EditHistory.timestamp.desc())
            .offset(skip)
            .limit(limit)
        )
        return result.scalars().all()
    
    async def get_edit_history_by_user(
        self, 
        user_id: UUID, 
        skip: int = 0, 
        limit: int = 100
    ) -> List[EditHistory]:
        """Get edit history for a specific user"""
        result = await self.db.execute(
            select(EditHistory)
            .options(selectinload(EditHistory.user))
            .filter(EditHistory.user_id == user_id)
            .order_by(EditHistory.timestamp.desc())
            .offset(skip)
            .limit(limit)
        )
        return result.scalars().all()

This service handles the persistence and retrieval logic, keeping the API endpoints focused on request handling and response formatting.

4. Modifying Existing Edit Routers

The core of our implementation lies in modifying the existing edit functionality to incorporate the tracking logic. For any endpoint that modifies a node or its relationships, we need to:

  1. Check if saving edits is enabled: This can be controlled by a new parameter in the request.
  2. Capture the before_state: Before making any modifications, retrieve the current state of the node (including properties and labels) as a JSON object.
  3. Perform the edit operation: Execute the Cypher query to update or modify the node/relationship.
  4. Capture the after_state: After the edit, retrieve the new state of the node.
  5. Calculate changes: Compare the before_state and after_state to identify specific additions, modifications, and removals. This helps in creating a concise diff.
  6. Create an EditHistory record: Use the EditHistoryService to save all the captured information (user ID, node ID, operation type, before state, after state, changes, relationship data) to the database.

Example: Updating Node Properties

Let's consider modifying an endpoint that updates node properties. We'll add the save_edit parameter and integrate the capturing and saving logic.

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession as SQLAsyncSession
from iroko.database import get_db_session as get_sql_session
from iroko.auth.service import EditHistoryService
from iroko.auth.dependencies import require_edit_permission, get_current_user
from iroko.auth.schemas import TokenUser
from iroko.logger import logger
from typing import List, Dict, Any
from uuid import UUID

# Assuming these are defined in your project
# from iroko.cypher.routers.edit import router, _node_exists, _sanitize_property_key, EditResponse
# from iroko.cypher.schemas_edit import NodePropertyUpdate, RelationshipUpdate, NodeEditRequest, RelationshipDeleteRequest
# from iroko.auth.schemas import EditHistoryResponse # Ensure this is imported

router = APIRouter()

# Placeholder for existing functions and models
def _node_exists(session, iroko_uuid):
    # Dummy implementation
    return True

def _sanitize_property_key(key):
    # Dummy implementation
    return key

class EditResponse(BaseModel):
    success: bool
    message: str
    updated_properties: int = 0

# Dummy schemas
class NodePropertyUpdate(BaseModel):
    iroko_uuid: str
    properties: Dict[str, Any]
    save_edit: bool = True

async def _get_node_state(session: AsyncSession, node_uuid: str) -> Dict[str, Any]: # Defined below
    # Dummy implementation for example, actual query is complex
    print(f"Fetching state for {node_uuid}")
    return {"_labels": ["Person"], "name": "John Doe", "age": 30}

def _calculate_changes(before_state: Dict, after_state: Dict, updated_properties: Dict) -> Dict[str, Any]: # Defined below
    print("Calculating changes")
    return {"modified": {"age": {"before": 30, "after": 31}}}


@router.patch("/node/properties", response_model=EditResponse)
async def update_node_properties(
    update: NodePropertyUpdate,
    session: AsyncSession = Depends(get_sql_session), # Assuming get_sql_session provides AsyncSession for graph db
    db_session: SQLAsyncSession = Depends(get_sql_session), # Assuming get_sql_session provides AsyncSession for relational db
    current_user: TokenUser = Depends(require_edit_permission),
):
    """
    Update properties of a node by iroko_uuid
    """
    try:
        # Check if node exists
        if not await _node_exists(session, update.iroko_uuid):
            raise HTTPException(status_code=404, detail=f"Node with iroko_uuid '{update.iroko_uuid}' not found")
        
        # Capture before state if save_edit is enabled
        before_state = None
        if update.save_edit:
            before_state = await _get_node_state(session, update.iroko_uuid)
        
        # Build SET clause for properties
        set_clauses = []
        parameters = {"iroko_uuid": update.iroko_uuid}
        
        for i, (key, value) in enumerate(update.properties.items()):
            sanitized_key = _sanitize_property_key(key)
            param_name = f"prop_{i}"
            set_clauses.append(f"n.{sanitized_key} = ${param_name}")
            parameters[param_name] = value
        
        if not set_clauses:
            return EditResponse(success=True, message="No properties to update")
        
        set_clause = "SET " + ", ".join(set_clauses)
        
        # Execute update
        query = f"""
        MATCH (n {{iroko_uuid: $iroko_uuid}})
        {set_clause}
        RETURN n, count(n) as updated_count
        """
        
        # NOTE: This part needs to be adapted to your specific graph DB driver (e.g., neo4j-driver)
        # For demonstration, assuming session.run is the correct method.
        # In reality, it might be something like: await session.execute_write(lambda tx: tx.run(query, **parameters))
        print(f"Executing query: {query} with params: {parameters}")
        # Dummy result for demonstration
        updated_count = 1 
        
        # Capture after state and save edit history if enabled
        if update.save_edit and updated_count > 0:
            after_state = await _get_node_state(session, update.iroko_uuid)
            # Pass the actual updated properties to _calculate_changes for accurate diff
            changes = _calculate_changes(before_state, after_state, update.properties)
            
            history_service = EditHistoryService(db_session)
            await history_service.create_edit_record(
                user_id=current_user.id,
                node_id=update.iroko_uuid,
                operation="update_properties",
                before_state=before_state,
                after_state=after_state,
                changes=changes
            )
        
        logger.info(f"User {current_user.email} updated properties for node {update.iroko_uuid}")
        
        return EditResponse(
            success=True,
            message=f"Successfully updated {updated_count} node(s)",
            updated_properties=updated_count
        )
        
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error updating node properties: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Failed to update node: {str(e)}")

This shows how the save_edit flag controls the capture and logging process. We also introduce helper functions to fetch node states and calculate differences.

5. Helper Functions for State Capture and Diff Calculation

To make the edit tracking robust, we need precise functions to retrieve the complete state of a node and to calculate the differences between states. The _get_node_state function should fetch all properties and labels of a node. The _calculate_changes function will compare the before and after states, identifying exactly which properties were added, modified, or removed.

Helper Function Implementations:

async def _get_node_state(session: AsyncSession, node_uuid: str) -> Dict[str, Any]:
    """Get complete node state including properties and labels"""
    # This query needs to be adapted based on your graph database syntax (e.g., Neo4j Cypher)
    query = """
    MATCH (n {iroko_uuid: $node_uuid})
    RETURN n, labels(n) as labels
    """
    
    # Placeholder for actual graph database execution
    # Example using a hypothetical session.run method:
    print(f"Executing _get_node_state for {node_uuid}")
    # result = await session.run(query, node_uuid=node_uuid)
    # record = await result.single()
    
    # Dummy return for demonstration:
    if node_uuid == "some-uuid-123":
        return {"name": "Alice", "age": 30, "_labels": ["User"], "_iroko_uuid": node_uuid}
    elif node_uuid == "some-uuid-456":
        return {"name": "Bob", "city": "New York", "_labels": ["Location"], "_iroko_uuid": node_uuid}
    else:
        return {"_labels": ["Unknown"], "_iroko_uuid": node_uuid}

async def _get_relationship_state(session: AsyncSession, from_uuid: str, to_uuid: str, relation_type: str) -> Dict[str, Any]:
    """Get relationship state properties"""
    # Adapt query to your graph database
    query = f"""
    MATCH (a {{iroko_uuid: $from_uuid}})-[r:{relation_type}]->(b {{iroko_uuid: $to_uuid}})
    RETURN properties(r) as relationship_properties
    """
    print(f"Fetching relationship state: {from_uuid} -[{relation_type}]-> {to_uuid}")
    # Placeholder for actual graph database execution
    # result = await session.run(query, from_uuid=from_uuid, to_uuid=to_uuid)
    # record = await result.single()
    
    # Dummy return
    if relation_type == "WORKS_FOR":
        return {"since": "2022-01-01", "role": "Developer"}
    return {}

def _calculate_changes(before_state: Dict, after_state: Dict, updated_properties: Dict) -> Dict[str, Any]:
    """Calculate what changed between before and after states"""
    changes = {
        "added": {},
        "modified": {},
        "removed": {}
    }
    
    # Combine all keys from both states and the updated properties to ensure we check everything
    all_keys = set(before_state.keys()) | set(after_state.keys()) | set(updated_properties.keys())
    
    for key in all_keys:
        if key.startswith('_'):  # Skip internal metadata like _labels, _iroko_uuid
            continue
            
        before_val = before_state.get(key)
        after_val = after_state.get(key)
        
        # If the key was explicitly updated in this operation
        if key in updated_properties:
            if before_val != after_val:
                changes["modified"][key] = {
                    "before": before_val,
                    "after": after_val
                }
        # If the key was not explicitly updated but exists in the after state and not before state
        elif key in after_state and key not in before_state:
            changes["added"][key] = after_val
        # If the key was not explicitly updated but exists in the before state and not after state
        elif key in before_state and key not in after_state:
            changes["removed"][key] = before_val
            
    # Refine the 'modified' section to only include keys actually changed
    # This is important if _calculate_changes is called with a full state diff but only partial properties were updated.
    # However, given the logic, if key is in updated_properties and before_val != after_val, it is inherently modified.
    # The current logic seems sound for capturing changes based on the provided states and knowing which properties were targeted.
    
    return changes

These helpers are critical for generating accurate diffs and full state snapshots. The _get_node_state function must be carefully implemented to query the graph database efficiently and return a structured dictionary that includes node properties and labels. The _calculate_changes function then uses this structured data to pinpoint the exact modifications.

6. New API Endpoints for Retrieving Edit History

To make the captured history accessible, we need new API endpoints. These endpoints will allow users to query the edit history either by a specific node_id or by the currently authenticated user (user/me). These endpoints will leverage the EditHistoryService to fetch the data and format it using the EditHistoryResponse schema.

Example History Retrieval Endpoints:

# Ensure necessary imports are present
# from fastapi import APIRouter, Depends, HTTPException, Query
# from sqlalchemy.ext.asyncio import AsyncSession as SQLAsyncSession
# from iroko.database import get_db_session as get_sql_session
# from iroko.auth.service import EditHistoryService
# from iroko.auth.dependencies import require_edit_permission, get_current_user
# from iroko.auth.schemas import TokenUser, UserResponse, EditHistoryResponse
# from iroko.logger import logger
# from typing import List, Dict, Any
# from uuid import UUID

# Assuming 'router' is already defined APIRouter()

@router.get("/history/node/{node_id}", response_model=List[EditHistoryResponse])
async def get_node_edit_history(
    node_id: str,
    skip: int = Query(0, ge=0),
    limit: int = Query(100, ge=1, le=1000),
    db_session: SQLAsyncSession = Depends(get_sql_session), # Relational DB session
    current_user: TokenUser = Depends(require_edit_permission) # For permission check
):
    """Get edit history for a specific node"""
    try:
        history_service = EditHistoryService(db_session)
        history_records = await history_service.get_edit_history_by_node(node_id, skip, limit)
        
        # Convert ORM objects to Pydantic models for response
        result = []
        for record in history_records:
            # Construct user data for response
            user_data = UserResponse(
                id=record.user.id,
                email=record.user.email,
                full_name=record.user.full_name,
                is_active=record.user.is_active,
                is_superuser=record.user.is_superuser,
                created_at=record.user.created_at,
                updated_at=record.user.updated_at
            )
            
            history_response = EditHistoryResponse(
                id=record.id,
                user_id=record.user_id,
                user=user_data, # Embed user info
                node_id=record.node_id,
                operation=record.operation,
                timestamp=record.timestamp,
                before_state=record.before_state,
                after_state=record.after_state,
                changes=record.changes,
                relationship_data=record.relationship_data
            )
            result.append(history_response)
        
        return result
        
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error retrieving edit history for node {node_id}: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Failed to retrieve edit history: {str(e)}")

@router.get("/history/user/me", response_model=List[EditHistoryResponse])
async def get_my_edit_history(
    skip: int = Query(0, ge=0),
    limit: int = Query(100, ge=1, le=1000),
    db_session: SQLAsyncSession = Depends(get_sql_session), # Relational DB session
    current_user: TokenUser = Depends(get_current_user) # Get current user info
):
    """Get edit history for the current user"""
    try:
        history_service = EditHistoryService(db_session)
        history_records = await history_service.get_edit_history_by_user(current_user.id, skip, limit)
        
        # Convert ORM objects to Pydantic models for response (similar to above)
        result = []
        for record in history_records:
            user_data = UserResponse(
                id=record.user.id,
                email=record.user.email,
                full_name=record.user.full_name,
                is_active=record.user.is_active,
                is_superuser=record.user.is_superuser,
                created_at=record.user.created_at,
                updated_at=record.user.updated_at
            )
            
            history_response = EditHistoryResponse(
                id=record.id,
                user_id=record.user_id,
                user=user_data,
                node_id=record.node_id,
                operation=record.operation,
                timestamp=record.timestamp,
                before_state=record.before_state,
                after_state=record.after_state,
                changes=record.changes,
                relationship_data=record.relationship_data
            )
            result.append(history_response)
        
        return result
        
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error retrieving edit history for user {current_user.id}: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Failed to retrieve edit history: {str(e)}")

These endpoints provide the necessary interface for users and administrators to audit changes effectively.

7. Updating Request Schemas for save_edit Parameter

Finally, we need to ensure that the request schemas for edit operations include the save_edit parameter. This allows clients to explicitly control whether the edit should be logged. By defaulting save_edit to True, we ensure that logging is enabled by default, but provide flexibility to disable it if needed for bulk operations or specific scenarios.

Example Schema Updates:

from pydantic import BaseModel
from typing import Optional, Dict, List, Any

class NodePropertyUpdate(BaseModel):
    iroko_uuid: str
    properties: Dict[str, Any]
    save_edit: bool = True  # New parameter to control saving history

class RelationshipUpdate(BaseModel):
    from_uuid: str
    to_uuid: str
    relation_type: str
    properties: Optional[Dict[str, Any]] = None
    save_edit: bool = True  # New parameter

class NodeEditRequest(BaseModel):
    # This might be a composite request combining node properties and relationships
    iroko_uuid: str
    properties: Dict[str, Any]
    relationships: List[RelationshipUpdate]
    save_edit: bool = True  # New parameter

class RelationshipDeleteRequest(BaseModel):
    from_uuid: str
    to_uuid: str
    relation_type: str
    save_edit: bool = True  # New parameter

By adding save_edit: bool = True to these schemas, we make the functionality opt-in while maintaining a sensible default.

How to Use the Edit Tracking Feature

Once implemented, using this feature is straightforward:

  1. Default Behavior: All edit operations (node property updates, relationship modifications, etc.) will automatically have their history saved to the edit_history table because save_edit defaults to True.
  2. Capturing Data: For each edit operation, the system will retrieve and store:
    • The complete JSON state of the node before the change.
    • The complete JSON state of the node after the change.
    • A detailed diff indicating which specific properties were added, modified, or removed.
    • Metadata such as the user performing the action, the node_id, the operation type, and the timestamp.
    • If relationships are involved, relevant relationship_data will also be stored.
  3. Accessing History: You can retrieve the edit history using the new API endpoints:
    • GET /api/v1/edit/history/node/{node_id}: To view all historical edits for a particular node.
    • GET /api/v1/edit/history/user/me: To view all edits performed by the currently authenticated user.
  4. Controlling Logging: If you need to disable history saving for a specific operation (e.g., during large batch imports or routine maintenance), you can set save_edit to false in your API request payload.

The Power of Comprehensive Edit Tracking

Implementing a feature that saves the before and after JSON states of node edits, along with their relationships, offers significant advantages:

  • Unparalleled Audit Trail: Every change is meticulously recorded, providing an indisputable history of data modifications. This is invaluable for compliance, security, and debugging.
  • Robust Rollback Capabilities: By having complete snapshots of previous states, you can easily revert nodes or even entire subgraphs to a known good state if errors are introduced.
  • Enhanced Data Governance: This feature supports better data management policies by enforcing accountability and transparency in data handling.
  • Simplified Debugging and Forensics: When data anomalies occur, tracing the exact changes and the users responsible becomes a far more efficient process.

This detailed approach ensures that your system not only stores data but also maintains a rich, accessible history of its evolution, making it a more reliable and manageable asset.

For further reading on best practices in data versioning and auditing, you can explore resources from The Data Vault Alliance or research concepts related to event sourcing, which share similar principles of capturing state changes over time.