Save Node Edit History: Before & After JSON
In today's data-driven world, maintaining the integrity and traceability of information is paramount. Whether you're managing complex knowledge graphs, intricate datasets, or critical business information, understanding how your data evolves over time is essential. This is where a robust edit tracking system becomes invaluable. We'll explore how to implement a feature that meticulously saves the edit process of nodes, capturing their state before and after any modification, along with their associated relationships. This ensures a complete audit trail, empowers data rollback capabilities, and enhances overall data governance. Let's dive into how we can achieve this using Python, focusing on a practical implementation within a framework like Iroko.
Understanding the Need for Edit History
Imagine a scenario where a critical node in your system is updated, and shortly after, inconsistencies or errors appear. Without a proper edit history, pinpointing the cause, identifying who made the change, and reverting to a previous stable state can be a daunting, if not impossible, task. This is precisely why saving the before and after states of a node, along with its relationships, is so crucial. It provides a detailed narrative of data evolution, offering:
- Accountability: Clearly identifies the user responsible for each change.
- Reproducibility: Allows for the recreation of past data states.
- Auditing: Provides a verifiable record for compliance and security purposes.
- Debugging: Simplifies the process of identifying the root cause of data anomalies.
- Rollback: Enables seamless restoration of previous versions of data.
This comprehensive approach to tracking edits moves beyond simple logging; it preserves the context of the data at the moment of modification. By saving the complete JSON representation of the node and its relationships, we capture not just the changed values but the entire snapshot, making restoration and analysis far more straightforward. This detailed capture is what we aim to implement.
Technical Implementation: A Step-by-Step Approach
To effectively implement a feature that saves the edit process of nodes with before and after JSON states, we need to integrate several components. This involves defining database models, creating API endpoints, and introducing logic to capture and store the relevant data. We'll be using Python, likely within a web framework context, and leveraging a graph database for node storage and a relational database for audit logs.
1. Database Model for Edit History
First, we need a dedicated table to store the edit history. This table should record key information about each edit operation. Essential fields include a unique identifier for the edit record, the ID of the user who performed the edit, the ID of the node that was edited, the type of operation performed (e.g., update, create, delete, relationship change), a timestamp of when the edit occurred, and crucially, the JSON representations of the node's state before and after the edit. We might also store a diff of the changes made and specific data related to relationship modifications.
Example EditHistory Model:
from sqlalchemy import Column, DateTime, String, JSON, ForeignKey, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from iroko.database import Base
import uuid
class EditHistory(Base):
__tablename__ = "edit_history"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
node_id = Column(String(255), nullable=False, index=True) # iroko_uuid of edited node
operation = Column(String(50), nullable=False) # 'create', 'update', 'delete', 'relationship'
timestamp = Column(DateTime(timezone=True), server_default=func.now())
# Store the changes as JSON
before_state = Column(JSON, nullable=True) # Node state before edit
after_state = Column(JSON, nullable=True) # Node state after edit
changes = Column(JSON, nullable=True) # Diff of what changed
# Relationship changes (for relationship operations)
relationship_data = Column(JSON, nullable=True)
# User relationship
user = relationship("User", backref="edit_history")
This model lays the foundation for storing comprehensive edit logs. The node_id will link to the specific node in our graph, operation categorizes the change, and before_state/after_state will hold the detailed JSON snapshots.
2. API Schema Definitions
To interact with this new history tracking, we need to define API schemas. These schemas will dictate the structure of data sent to and received from the API. We'll need schemas for creating new edit history entries, and response schemas for retrieving them. The creation schema should accommodate optional fields for before_state, after_state, changes, and relationship_data, as not all operations might require all these fields. The response schema should include all relevant details, potentially linking to user information.
Example EditHistory Schemas:
from pydantic import BaseModel
from typing import Optional, Dict, Any
from uuid import UUID
from datetime import datetime
# Assuming UserResponse schema is defined elsewhere in your auth module
class UserResponse(BaseModel):
id: UUID
email: str
full_name: Optional[str] = None
is_active: bool
is_superuser: bool
created_at: datetime
updated_at: datetime
class EditHistoryBase(BaseModel):
node_id: str
operation: str
timestamp: datetime
class EditHistoryCreate(EditHistoryBase):
before_state: Optional[Dict[str, Any]] = None
after_state: Optional[Dict[str, Any]] = None
changes: Optional[Dict[str, Any]] = None
relationship_data: Optional[Dict[str, Any]] = None
class EditHistoryResponse(EditHistoryBase):
id: UUID
user_id: UUID
user: Optional[UserResponse] = None
before_state: Optional[Dict[str, Any]] = None
after_state: Optional[Dict[str, Any]] = None
changes: Optional[Dict[str, Any]] = None
relationship_data: Optional[Dict[str, Any]] = None
class Config:
from_attributes = True
These schemas ensure data consistency and facilitate clear communication between the client and the server.
3. Edit History Service Layer
A service layer is essential for abstracting the database operations related to edit history. This service will provide methods for creating new edit records and for querying existing history, either for a specific node or for a specific user. This separation of concerns makes the codebase cleaner and easier to maintain.
Example EditHistoryService:
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
from typing import List, Dict, Any
from uuid import UUID
from iroko.auth.models import EditHistory
from iroko.auth.schemas import EditHistoryResponse, UserResponse
class EditHistoryService:
def __init__(self, db: AsyncSession):
self.db = db
async def create_edit_record(
self,
user_id: UUID,
node_id: str,
operation: str,
before_state: Optional[Dict] = None,
after_state: Optional[Dict] = None,
changes: Optional[Dict] = None,
relationship_data: Optional[Dict] = None
) -> EditHistory:
"""Create a new edit history record"""
edit_record = EditHistory(
user_id=user_id,
node_id=node_id,
operation=operation,
before_state=before_state,
after_state=after_state,
changes=changes,
relationship_data=relationship_data
)
self.db.add(edit_record)
await self.db.commit()
await self.db.refresh(edit_record)
return edit_record
async def get_edit_history_by_node(
self,
node_id: str,
skip: int = 0,
limit: int = 100
) -> List[EditHistory]:
"""Get edit history for a specific node"""
result = await self.db.execute(
select(EditHistory)
.options(selectinload(EditHistory.user))
.filter(EditHistory.node_id == node_id)
.order_by(EditHistory.timestamp.desc())
.offset(skip)
.limit(limit)
)
return result.scalars().all()
async def get_edit_history_by_user(
self,
user_id: UUID,
skip: int = 0,
limit: int = 100
) -> List[EditHistory]:
"""Get edit history for a specific user"""
result = await self.db.execute(
select(EditHistory)
.options(selectinload(EditHistory.user))
.filter(EditHistory.user_id == user_id)
.order_by(EditHistory.timestamp.desc())
.offset(skip)
.limit(limit)
)
return result.scalars().all()
This service handles the persistence and retrieval logic, keeping the API endpoints focused on request handling and response formatting.
4. Modifying Existing Edit Routers
The core of our implementation lies in modifying the existing edit functionality to incorporate the tracking logic. For any endpoint that modifies a node or its relationships, we need to:
- Check if saving edits is enabled: This can be controlled by a new parameter in the request.
- Capture the
before_state: Before making any modifications, retrieve the current state of the node (including properties and labels) as a JSON object. - Perform the edit operation: Execute the Cypher query to update or modify the node/relationship.
- Capture the
after_state: After the edit, retrieve the new state of the node. - Calculate changes: Compare the
before_stateandafter_stateto identify specific additions, modifications, and removals. This helps in creating a concise diff. - Create an
EditHistoryrecord: Use theEditHistoryServiceto save all the captured information (user ID, node ID, operation type, before state, after state, changes, relationship data) to the database.
Example: Updating Node Properties
Let's consider modifying an endpoint that updates node properties. We'll add the save_edit parameter and integrate the capturing and saving logic.
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession as SQLAsyncSession
from iroko.database import get_db_session as get_sql_session
from iroko.auth.service import EditHistoryService
from iroko.auth.dependencies import require_edit_permission, get_current_user
from iroko.auth.schemas import TokenUser
from iroko.logger import logger
from typing import List, Dict, Any
from uuid import UUID
# Assuming these are defined in your project
# from iroko.cypher.routers.edit import router, _node_exists, _sanitize_property_key, EditResponse
# from iroko.cypher.schemas_edit import NodePropertyUpdate, RelationshipUpdate, NodeEditRequest, RelationshipDeleteRequest
# from iroko.auth.schemas import EditHistoryResponse # Ensure this is imported
router = APIRouter()
# Placeholder for existing functions and models
def _node_exists(session, iroko_uuid):
# Dummy implementation
return True
def _sanitize_property_key(key):
# Dummy implementation
return key
class EditResponse(BaseModel):
success: bool
message: str
updated_properties: int = 0
# Dummy schemas
class NodePropertyUpdate(BaseModel):
iroko_uuid: str
properties: Dict[str, Any]
save_edit: bool = True
async def _get_node_state(session: AsyncSession, node_uuid: str) -> Dict[str, Any]: # Defined below
# Dummy implementation for example, actual query is complex
print(f"Fetching state for {node_uuid}")
return {"_labels": ["Person"], "name": "John Doe", "age": 30}
def _calculate_changes(before_state: Dict, after_state: Dict, updated_properties: Dict) -> Dict[str, Any]: # Defined below
print("Calculating changes")
return {"modified": {"age": {"before": 30, "after": 31}}}
@router.patch("/node/properties", response_model=EditResponse)
async def update_node_properties(
update: NodePropertyUpdate,
session: AsyncSession = Depends(get_sql_session), # Assuming get_sql_session provides AsyncSession for graph db
db_session: SQLAsyncSession = Depends(get_sql_session), # Assuming get_sql_session provides AsyncSession for relational db
current_user: TokenUser = Depends(require_edit_permission),
):
"""
Update properties of a node by iroko_uuid
"""
try:
# Check if node exists
if not await _node_exists(session, update.iroko_uuid):
raise HTTPException(status_code=404, detail=f"Node with iroko_uuid '{update.iroko_uuid}' not found")
# Capture before state if save_edit is enabled
before_state = None
if update.save_edit:
before_state = await _get_node_state(session, update.iroko_uuid)
# Build SET clause for properties
set_clauses = []
parameters = {"iroko_uuid": update.iroko_uuid}
for i, (key, value) in enumerate(update.properties.items()):
sanitized_key = _sanitize_property_key(key)
param_name = f"prop_{i}"
set_clauses.append(f"n.{sanitized_key} = ${param_name}")
parameters[param_name] = value
if not set_clauses:
return EditResponse(success=True, message="No properties to update")
set_clause = "SET " + ", ".join(set_clauses)
# Execute update
query = f"""
MATCH (n {{iroko_uuid: $iroko_uuid}})
{set_clause}
RETURN n, count(n) as updated_count
"""
# NOTE: This part needs to be adapted to your specific graph DB driver (e.g., neo4j-driver)
# For demonstration, assuming session.run is the correct method.
# In reality, it might be something like: await session.execute_write(lambda tx: tx.run(query, **parameters))
print(f"Executing query: {query} with params: {parameters}")
# Dummy result for demonstration
updated_count = 1
# Capture after state and save edit history if enabled
if update.save_edit and updated_count > 0:
after_state = await _get_node_state(session, update.iroko_uuid)
# Pass the actual updated properties to _calculate_changes for accurate diff
changes = _calculate_changes(before_state, after_state, update.properties)
history_service = EditHistoryService(db_session)
await history_service.create_edit_record(
user_id=current_user.id,
node_id=update.iroko_uuid,
operation="update_properties",
before_state=before_state,
after_state=after_state,
changes=changes
)
logger.info(f"User {current_user.email} updated properties for node {update.iroko_uuid}")
return EditResponse(
success=True,
message=f"Successfully updated {updated_count} node(s)",
updated_properties=updated_count
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating node properties: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to update node: {str(e)}")
This shows how the save_edit flag controls the capture and logging process. We also introduce helper functions to fetch node states and calculate differences.
5. Helper Functions for State Capture and Diff Calculation
To make the edit tracking robust, we need precise functions to retrieve the complete state of a node and to calculate the differences between states. The _get_node_state function should fetch all properties and labels of a node. The _calculate_changes function will compare the before and after states, identifying exactly which properties were added, modified, or removed.
Helper Function Implementations:
async def _get_node_state(session: AsyncSession, node_uuid: str) -> Dict[str, Any]:
"""Get complete node state including properties and labels"""
# This query needs to be adapted based on your graph database syntax (e.g., Neo4j Cypher)
query = """
MATCH (n {iroko_uuid: $node_uuid})
RETURN n, labels(n) as labels
"""
# Placeholder for actual graph database execution
# Example using a hypothetical session.run method:
print(f"Executing _get_node_state for {node_uuid}")
# result = await session.run(query, node_uuid=node_uuid)
# record = await result.single()
# Dummy return for demonstration:
if node_uuid == "some-uuid-123":
return {"name": "Alice", "age": 30, "_labels": ["User"], "_iroko_uuid": node_uuid}
elif node_uuid == "some-uuid-456":
return {"name": "Bob", "city": "New York", "_labels": ["Location"], "_iroko_uuid": node_uuid}
else:
return {"_labels": ["Unknown"], "_iroko_uuid": node_uuid}
async def _get_relationship_state(session: AsyncSession, from_uuid: str, to_uuid: str, relation_type: str) -> Dict[str, Any]:
"""Get relationship state properties"""
# Adapt query to your graph database
query = f"""
MATCH (a {{iroko_uuid: $from_uuid}})-[r:{relation_type}]->(b {{iroko_uuid: $to_uuid}})
RETURN properties(r) as relationship_properties
"""
print(f"Fetching relationship state: {from_uuid} -[{relation_type}]-> {to_uuid}")
# Placeholder for actual graph database execution
# result = await session.run(query, from_uuid=from_uuid, to_uuid=to_uuid)
# record = await result.single()
# Dummy return
if relation_type == "WORKS_FOR":
return {"since": "2022-01-01", "role": "Developer"}
return {}
def _calculate_changes(before_state: Dict, after_state: Dict, updated_properties: Dict) -> Dict[str, Any]:
"""Calculate what changed between before and after states"""
changes = {
"added": {},
"modified": {},
"removed": {}
}
# Combine all keys from both states and the updated properties to ensure we check everything
all_keys = set(before_state.keys()) | set(after_state.keys()) | set(updated_properties.keys())
for key in all_keys:
if key.startswith('_'): # Skip internal metadata like _labels, _iroko_uuid
continue
before_val = before_state.get(key)
after_val = after_state.get(key)
# If the key was explicitly updated in this operation
if key in updated_properties:
if before_val != after_val:
changes["modified"][key] = {
"before": before_val,
"after": after_val
}
# If the key was not explicitly updated but exists in the after state and not before state
elif key in after_state and key not in before_state:
changes["added"][key] = after_val
# If the key was not explicitly updated but exists in the before state and not after state
elif key in before_state and key not in after_state:
changes["removed"][key] = before_val
# Refine the 'modified' section to only include keys actually changed
# This is important if _calculate_changes is called with a full state diff but only partial properties were updated.
# However, given the logic, if key is in updated_properties and before_val != after_val, it is inherently modified.
# The current logic seems sound for capturing changes based on the provided states and knowing which properties were targeted.
return changes
These helpers are critical for generating accurate diffs and full state snapshots. The _get_node_state function must be carefully implemented to query the graph database efficiently and return a structured dictionary that includes node properties and labels. The _calculate_changes function then uses this structured data to pinpoint the exact modifications.
6. New API Endpoints for Retrieving Edit History
To make the captured history accessible, we need new API endpoints. These endpoints will allow users to query the edit history either by a specific node_id or by the currently authenticated user (user/me). These endpoints will leverage the EditHistoryService to fetch the data and format it using the EditHistoryResponse schema.
Example History Retrieval Endpoints:
# Ensure necessary imports are present
# from fastapi import APIRouter, Depends, HTTPException, Query
# from sqlalchemy.ext.asyncio import AsyncSession as SQLAsyncSession
# from iroko.database import get_db_session as get_sql_session
# from iroko.auth.service import EditHistoryService
# from iroko.auth.dependencies import require_edit_permission, get_current_user
# from iroko.auth.schemas import TokenUser, UserResponse, EditHistoryResponse
# from iroko.logger import logger
# from typing import List, Dict, Any
# from uuid import UUID
# Assuming 'router' is already defined APIRouter()
@router.get("/history/node/{node_id}", response_model=List[EditHistoryResponse])
async def get_node_edit_history(
node_id: str,
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
db_session: SQLAsyncSession = Depends(get_sql_session), # Relational DB session
current_user: TokenUser = Depends(require_edit_permission) # For permission check
):
"""Get edit history for a specific node"""
try:
history_service = EditHistoryService(db_session)
history_records = await history_service.get_edit_history_by_node(node_id, skip, limit)
# Convert ORM objects to Pydantic models for response
result = []
for record in history_records:
# Construct user data for response
user_data = UserResponse(
id=record.user.id,
email=record.user.email,
full_name=record.user.full_name,
is_active=record.user.is_active,
is_superuser=record.user.is_superuser,
created_at=record.user.created_at,
updated_at=record.user.updated_at
)
history_response = EditHistoryResponse(
id=record.id,
user_id=record.user_id,
user=user_data, # Embed user info
node_id=record.node_id,
operation=record.operation,
timestamp=record.timestamp,
before_state=record.before_state,
after_state=record.after_state,
changes=record.changes,
relationship_data=record.relationship_data
)
result.append(history_response)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error retrieving edit history for node {node_id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve edit history: {str(e)}")
@router.get("/history/user/me", response_model=List[EditHistoryResponse])
async def get_my_edit_history(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
db_session: SQLAsyncSession = Depends(get_sql_session), # Relational DB session
current_user: TokenUser = Depends(get_current_user) # Get current user info
):
"""Get edit history for the current user"""
try:
history_service = EditHistoryService(db_session)
history_records = await history_service.get_edit_history_by_user(current_user.id, skip, limit)
# Convert ORM objects to Pydantic models for response (similar to above)
result = []
for record in history_records:
user_data = UserResponse(
id=record.user.id,
email=record.user.email,
full_name=record.user.full_name,
is_active=record.user.is_active,
is_superuser=record.user.is_superuser,
created_at=record.user.created_at,
updated_at=record.user.updated_at
)
history_response = EditHistoryResponse(
id=record.id,
user_id=record.user_id,
user=user_data,
node_id=record.node_id,
operation=record.operation,
timestamp=record.timestamp,
before_state=record.before_state,
after_state=record.after_state,
changes=record.changes,
relationship_data=record.relationship_data
)
result.append(history_response)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error retrieving edit history for user {current_user.id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve edit history: {str(e)}")
These endpoints provide the necessary interface for users and administrators to audit changes effectively.
7. Updating Request Schemas for save_edit Parameter
Finally, we need to ensure that the request schemas for edit operations include the save_edit parameter. This allows clients to explicitly control whether the edit should be logged. By defaulting save_edit to True, we ensure that logging is enabled by default, but provide flexibility to disable it if needed for bulk operations or specific scenarios.
Example Schema Updates:
from pydantic import BaseModel
from typing import Optional, Dict, List, Any
class NodePropertyUpdate(BaseModel):
iroko_uuid: str
properties: Dict[str, Any]
save_edit: bool = True # New parameter to control saving history
class RelationshipUpdate(BaseModel):
from_uuid: str
to_uuid: str
relation_type: str
properties: Optional[Dict[str, Any]] = None
save_edit: bool = True # New parameter
class NodeEditRequest(BaseModel):
# This might be a composite request combining node properties and relationships
iroko_uuid: str
properties: Dict[str, Any]
relationships: List[RelationshipUpdate]
save_edit: bool = True # New parameter
class RelationshipDeleteRequest(BaseModel):
from_uuid: str
to_uuid: str
relation_type: str
save_edit: bool = True # New parameter
By adding save_edit: bool = True to these schemas, we make the functionality opt-in while maintaining a sensible default.
How to Use the Edit Tracking Feature
Once implemented, using this feature is straightforward:
- Default Behavior: All edit operations (node property updates, relationship modifications, etc.) will automatically have their history saved to the
edit_historytable becausesave_editdefaults toTrue. - Capturing Data: For each edit operation, the system will retrieve and store:
- The complete JSON state of the node before the change.
- The complete JSON state of the node after the change.
- A detailed diff indicating which specific properties were added, modified, or removed.
- Metadata such as the user performing the action, the
node_id, theoperationtype, and thetimestamp. - If relationships are involved, relevant
relationship_datawill also be stored.
- Accessing History: You can retrieve the edit history using the new API endpoints:
GET /api/v1/edit/history/node/{node_id}: To view all historical edits for a particular node.GET /api/v1/edit/history/user/me: To view all edits performed by the currently authenticated user.
- Controlling Logging: If you need to disable history saving for a specific operation (e.g., during large batch imports or routine maintenance), you can set
save_edittofalsein your API request payload.
The Power of Comprehensive Edit Tracking
Implementing a feature that saves the before and after JSON states of node edits, along with their relationships, offers significant advantages:
- Unparalleled Audit Trail: Every change is meticulously recorded, providing an indisputable history of data modifications. This is invaluable for compliance, security, and debugging.
- Robust Rollback Capabilities: By having complete snapshots of previous states, you can easily revert nodes or even entire subgraphs to a known good state if errors are introduced.
- Enhanced Data Governance: This feature supports better data management policies by enforcing accountability and transparency in data handling.
- Simplified Debugging and Forensics: When data anomalies occur, tracing the exact changes and the users responsible becomes a far more efficient process.
This detailed approach ensures that your system not only stores data but also maintains a rich, accessible history of its evolution, making it a more reliable and manageable asset.
For further reading on best practices in data versioning and auditing, you can explore resources from The Data Vault Alliance or research concepts related to event sourcing, which share similar principles of capturing state changes over time.