Source code for axiom.checkpoint
"""
database/checkpoint.py
Checkpoint and rewind management for Axiom AI saves.
The CheckpointManager exposes the rewind primitive (deleting future events
and rebuilding the State_Cache), a save listing helper, and the destructive
Hardcore-mode save deletion.
"""
import os
import shutil
import sqlite3
from pathlib import Path
from axiom.schema import get_connection
from axiom.events import EventSourcer
[docs]
class CheckpointManager:
"""Manages save checkpoints, rewinds, and Hardcore deletion for one universe.
Args:
db_path: Filesystem path to an existing universe .db file created
by database.schema.create_universe_db().
"""
def __init__(self, db_path: str) -> None:
self._db_path = db_path
self._event_sourcer = EventSourcer(db_path)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
def rewind(self, save_id: str, target_turn_id: int) -> dict[str, int]:
"""Revert a save to its state at target_turn_id.
Steps:
1. Count how many future events will be deleted (for the summary).
2. DELETE all Event_Log rows where turn_id > target_turn_id.
3. Rebuild State_Cache from the surviving events.
Args:
save_id: The save to rewind.
target_turn_id: The turn to revert to (inclusive). All events
with turn_id strictly greater than this value are
permanently removed.
Returns:
A summary dict: A dict with keys deleted_events and rebuilt_to_turn.
Raises:
sqlite3.Error: On any database failure.
"""
with get_connection(self._db_path) as conn:
row = conn.execute(
"""
SELECT COUNT(*) FROM Event_Log
WHERE save_id = ? AND turn_id > ?;
""",
(save_id, target_turn_id),
).fetchone()
deleted_count: int = row[0]
conn.execute(
"""
DELETE FROM Event_Log
WHERE save_id = ? AND turn_id > ?;
""",
(save_id, target_turn_id),
)
# Clean up snapshots and timeline entries for future turns
conn.execute(
"DELETE FROM Snapshots WHERE save_id = ? AND turn_id > ?;",
(save_id, target_turn_id)
)
conn.execute(
"DELETE FROM Timeline WHERE save_id = ? AND turn_id > ?;",
(save_id, target_turn_id)
)
conn.commit()
self._event_sourcer.rebuild_state_cache(save_id, up_to_turn_id=target_turn_id)
return {"deleted_events": deleted_count, "rebuilt_to_turn": target_turn_id}
[docs]
def list_checkpoints(self, save_id: str) -> list[int]:
"""Return the distinct turn IDs present in Event_Log for a save, ascending.
This list represents the turns the player could rewind to. The UI
can use it to populate a "rewind to turn …" selector.
Args:
save_id: The save whose checkpoint turns are requested.
Returns:
Sorted list of unique turn_id integers. Empty list if the save
has no recorded events.
Raises:
sqlite3.Error: On any database failure.
"""
with get_connection(self._db_path) as conn:
rows = conn.execute(
"""
SELECT DISTINCT turn_id FROM Event_Log
WHERE save_id = ?
ORDER BY turn_id ASC;
""",
(save_id,),
).fetchall()
return [row[0] for row in rows]
[docs]
def delete_save(self, save_id: str, universe_dir: str) -> None:
"""Irrevocably delete a save and its associated universe directory.
Intended exclusively for Hardcore mode upon player death. This method:
1. Removes the save row from the database (cascades to Event_Log, etc).
2. Attempts to delete the universe_dir from the filesystem.
Args:
save_id: The save to erase from the database.
universe_dir: Absolute path to the universe directory to delete.
Raises:
OSError: If the directory cannot be deleted after multiple retries.
FileNotFoundError: If universe_dir does not exist.
sqlite3.Error: On any database failure.
"""
dir_path = Path(universe_dir)
if not dir_path.exists():
# If the dir is missing, we still want to clean up the DB
pass
# 1. Remove from database (cascades to Event_Log and State_Cache)
# We do this FIRST because if the DB delete fails, we shouldn't delete files.
with get_connection(self._db_path) as conn:
conn.execute("DELETE FROM Saves WHERE save_id = ?;", (save_id,))
conn.commit()
# 2. Irrevocably delete the filesystem directory
if dir_path.exists():
import time
max_retries = 3
for attempt in range(max_retries):
try:
shutil.rmtree(str(dir_path))
break
except OSError as exc:
if attempt == max_retries - 1:
raise OSError(
f"Failed to delete universe directory after {max_retries} attempts: {exc}. "
"Some files may be locked by another process."
) from exc
time.sleep(0.5) # Wait for locks to release