Source code for axiom.events

"""
database/event_sourcing.py

Core Event Sourcing implementation for Axiom AI.

Every state change in the game is recorded as an immutable event in
Event_Log.  The State_Cache is a performance materialisation derived by
replaying those events.  This module is the authoritative bridge between
the two.

Supported event_type values (non-exhaustive; the engine is extensible):
    - 'entity_create'  payload: {"entity_id": str, "entity_type": str, "name": str}
    - 'stat_change'    payload: {"entity_id": str, "stat_key": str, "delta": float}
                                OR {"entity_id": str, "stat_key": str, "value": str}
    - 'stat_set'       payload: {"entity_id": str, "stat_key": str, "value": str}
    - 'dialogue'       payload: {"speaker": str, "text": str}   (no cache mutation)
    - 'combat_roll'    payload: {"entity_id": str, ...}         (no cache mutation)
"""

import json
import sqlite3
from typing import Any

from axiom.schema import get_connection


# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------

def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]:
    """Convert a sqlite3.Row into a plain dict."""
    return dict(zip(row.keys(), tuple(row)))


# ---------------------------------------------------------------------------
# EventSourcer
# ---------------------------------------------------------------------------

[docs] class EventSourcer: """Manages event appending, querying, and State_Cache reconstruction for a single Axiom AI universe database. Args: db_path: Filesystem path to an existing universe .db file created by database.schema.create_universe_db(). """ def __init__(self, db_path: str) -> None: self._db_path = db_path # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def append_event( self, save_id: str, turn_id: int, event_type: str, target_entity: str, payload: dict[str, Any], ) -> int: """Insert a new event into Event_Log and return its auto-generated event_id. """ payload_json = json.dumps(payload) with get_connection(self._db_path) as conn: cursor = conn.execute( """ INSERT INTO Event_Log (save_id, turn_id, event_type, target_entity, payload) VALUES (?, ?, ?, ?, ?); """, (save_id, turn_id, event_type, target_entity, payload_json), ) event_id = cursor.lastrowid conn.commit() return event_id # type: ignore[return-value]
[docs] def append_events_batch( self, events: list[tuple[str, int, str, str, dict[str, Any]]], ) -> None: """Insert multiple events in a single transaction. Args: events: List of (save_id, turn_id, event_type, target_entity, payload) tuples. """ if not events: return rows = [(s, t, e, tg, json.dumps(p)) for s, t, e, tg, p in events] with get_connection(self._db_path) as conn: conn.executemany( "INSERT INTO Event_Log (save_id, turn_id, event_type, target_entity, payload) " "VALUES (?, ?, ?, ?, ?);", rows, ) conn.commit()
[docs] def get_events( self, save_id: str, start_turn_id: int = 0, up_to_turn_id: int | None = None, ) -> list[dict[str, Any]]: """Fetch events for a save, ordered chronologically. Args: save_id: The save whose events are requested. start_turn_id: Only events with turn_id > this value are returned. up_to_turn_id: If provided, only events with turn_id <= this value are returned. None means all future events. """ with get_connection(self._db_path) as conn: query = "SELECT * FROM Event_Log WHERE save_id = ? AND turn_id > ?" params: list[Any] = [save_id, start_turn_id] if up_to_turn_id is not None: query += " AND turn_id <= ?" params.append(up_to_turn_id) query += " ORDER BY event_id ASC;" rows = conn.execute(query, params).fetchall() events: list[dict[str, Any]] = [] for row in rows: event = _row_to_dict(row) event["payload"] = json.loads(event["payload"]) events.append(event) return events
[docs] def rebuild_state_cache( self, save_id: str, up_to_turn_id: int | None = None, force_full: bool = False, ) -> None: """Flush and rebuild State_Cache for a save by replaying Event_Log. Args: save_id: The save whose cache is being rebuilt. up_to_turn_id: If provided, only events up to this turn are replayed. None means the full history. force_full: If True, ignores all snapshots and replays the entire history from turn 0. Use this to fix cache corruption. Optimized to start from the nearest previous snapshot if one exists, unless force_full=True. """ target_turn = up_to_turn_id if up_to_turn_id is not None else 9999999 # 1. Load nearest snapshot cache: dict[str, dict[str, str]] = {} start_turn = -1 # Start from turn 0 if no snapshot if not force_full: with get_connection(self._db_path) as conn: row = conn.execute( """ SELECT turn_id, state_json FROM Snapshots WHERE save_id = ? AND turn_id <= ? ORDER BY turn_id DESC LIMIT 1; """, (save_id, target_turn), ).fetchone() if row: start_turn = row["turn_id"] cache = json.loads(row["state_json"]) # 2. Replay events from after the snapshot (or turn 0 if force_full) events = self.get_events(save_id, start_turn_id=start_turn, up_to_turn_id=up_to_turn_id) for event in events: cache = self._apply_event(event, cache) # 3. Atomic bulk update of State_Cache with get_connection(self._db_path) as conn: conn.execute("DELETE FROM State_Cache WHERE save_id = ?;", (save_id,)) insert_data = [] for entity_id, stats in cache.items(): for stat_key, stat_value in stats.items(): insert_data.append((save_id, entity_id, stat_key, stat_value)) if insert_data: conn.executemany( """ INSERT INTO State_Cache (save_id, entity_id, stat_key, stat_value) VALUES (?, ?, ?, ?); """, insert_data, ) conn.commit()
[docs] def update_state_cache( self, save_id: str, events: list[tuple[str, int, str, str, dict[str, Any]]], ) -> None: """Incrementally apply a just-appended batch of events to State_Cache. State_Cache is a materialised view of the base stats derived from Event_Log. rebuild_state_cache() replays the entire history (or from the nearest snapshot); this method instead applies *only* the given batch on top of the affected entities' current cached values, keeping the cache fresh after each turn without an O(history) replay. This is what keeps DB reads (the sidebar's load_full_game_state / load_stats tasks, which read State_Cache) in sync with the changes a turn just produced — see TICKET-002. Args: save_id: The save whose cache is being updated. events: List of (save_id, turn_id, event_type, target_entity, payload) tuples, in the same shape as append_events_batch(). Only entity_create / stat_change / stat_set events mutate the cache; all others are ignored. """ # Normalise to the dict shape _apply_event expects, keeping only the # cache-relevant events. relevant = [ {"event_type": etype, "target_entity": target, "payload": payload} for (_sid, _tid, etype, target, payload) in events if etype in ("entity_create", "stat_change", "stat_set", "chronicler_update", "manual_edit") ] if not relevant: return entity_ids = { e["payload"].get("entity_id", e["target_entity"]) for e in relevant } entity_ids.discard("") if not entity_ids: return # Seed an in-memory cache with the affected entities' current base # stats, then replay the batch on top (handles intra-batch chained # deltas, e.g. rule-engine cascades touching the same stat twice). cache: dict[str, dict[str, str]] = {} placeholders = ",".join("?" * len(entity_ids)) with get_connection(self._db_path) as conn: rows = conn.execute( f"SELECT entity_id, stat_key, stat_value FROM State_Cache " f"WHERE save_id = ? AND entity_id IN ({placeholders});", (save_id, *entity_ids), ).fetchall() for r in rows: cache.setdefault(r["entity_id"], {})[r["stat_key"]] = r["stat_value"] for event in relevant: cache = self._apply_event(event, cache) upsert_data = [ (save_id, eid, sk, sv) for eid, stats in cache.items() for sk, sv in stats.items() ] if not upsert_data: return with get_connection(self._db_path) as conn: conn.executemany( """ INSERT INTO State_Cache (save_id, entity_id, stat_key, stat_value) VALUES (?, ?, ?, ?) ON CONFLICT(save_id, entity_id, stat_key) DO UPDATE SET stat_value = excluded.stat_value; """, upsert_data, ) conn.commit()
[docs] def validate_integrity(self, save_id: str) -> tuple[bool, dict[str, Any]]: """Verify that the current State_Cache matches a fresh replay of history. This is a diagnostic tool to detect corruption in the materialised cache. It does NOT use snapshots. Returns: A (passed, mismatches) tuple — mismatches maps entity_id to a dict of stat_key to (cached_val, actual_val) pairs. """ # 1. Get current cache from DB with get_connection(self._db_path) as conn: rows = conn.execute( "SELECT entity_id, stat_key, stat_value FROM State_Cache WHERE save_id = ?;", (save_id,), ).fetchall() db_cache: dict[str, dict[str, str]] = {} for r in rows: db_cache.setdefault(r["entity_id"], {})[r["stat_key"]] = r["stat_value"] # 2. Freshly replay all events events = self.get_events(save_id) replayed_cache: dict[str, dict[str, str]] = {} for event in events: replayed_cache = self._apply_event(event, replayed_cache) # 3. Compare mismatches: dict[str, Any] = {} all_entity_ids = set(db_cache.keys()) | set(replayed_cache.keys()) for eid in all_entity_ids: db_stats = db_cache.get(eid, {}) re_stats = replayed_cache.get(eid, {}) all_stat_keys = set(db_stats.keys()) | set(re_stats.keys()) for sk in all_stat_keys: dv = db_stats.get(sk) rv = re_stats.get(sk) if dv != rv: mismatches.setdefault(eid, {})[sk] = (dv, rv) return (len(mismatches) == 0, mismatches)
[docs] def state_at( self, save_id: str, up_to_turn_id: int | None = None, ) -> dict[str, dict[str, str]]: """Compute the materialised state by replaying events, without touching the DB. Pure read: replays Event_Log (optionally up to a turn) and returns the resulting `entity_id -> {stat_key: stat_value}` map. Used by snapshotting and by the save editor (`axiom.saves`) to materialise state at any point. """ # start_turn_id=-1 : rejoue depuis le tout début, **tour 0 inclus** (le # défaut start_turn_id=0 de get_events exclut le tour 0, réservé aux events # « genesis » d'une save importée — cf. axiom.saves). cache: dict[str, dict[str, str]] = {} for event in self.get_events(save_id, start_turn_id=-1, up_to_turn_id=up_to_turn_id): cache = self._apply_event(event, cache) return cache
[docs] def take_snapshot(self, save_id: str, turn_id: int) -> None: """Capture the current materialised state and store it in Snapshots. This is an expensive operation (JSON-serialising the full state) and should be called sparingly. """ state_json = json.dumps(self.state_at(save_id, up_to_turn_id=turn_id)) with get_connection(self._db_path) as conn: conn.execute( "INSERT OR REPLACE INTO Snapshots (save_id, turn_id, state_json) VALUES (?, ?, ?);", (save_id, turn_id, state_json), ) conn.commit()
[docs] def get_current_stats( self, save_id: str, entity_id: str, ) -> dict[str, str]: """Read the current materialised stats for one entity from State_Cache. Args: save_id: The active save identifier. entity_id: The entity whose stats are requested. Returns: Dict mapping stat_key -> stat_value strings. Empty dict if the entity has no cached stats or does not exist. Raises: sqlite3.Error: On any database failure. """ with get_connection(self._db_path) as conn: rows = conn.execute( """ SELECT stat_key, stat_value FROM State_Cache WHERE save_id = ? AND entity_id = ?; """, (save_id, entity_id), ).fetchall() return {row[0]: row[1] for row in rows}
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @staticmethod def _apply_event( event: dict[str, Any], cache: dict[str, dict[str, str]], ) -> dict[str, dict[str, str]]: """Apply a single event to an in-memory stats cache and return the result. This is a pure function: it does not mutate the database. A new copy of the cache is returned when modifications are made. Handled event types: entity_create — registers the entity in the cache with no stats. stat_change — adjusts a numeric stat by delta, or sets a string value when a 'value' key is present instead of 'delta'. stat_set — unconditionally sets a stat to a string value. chronicler_update — world-simulation stat change (TICKET-006) : même payload (delta|value) ; matérialisé comme un stat_change/stat_set tout en conservant sa provenance « chronicler » dans le journal. manual_edit — correction manuelle (humain/LLM via l'éditeur de saves) : même payload (delta|value), provenance « édition » conservée. All other event types (e.g. 'dialogue', 'combat_roll') are ignored because they carry no cache-relevant state. Args: event: A single event dict (as returned by get_events). cache: The current in-memory stats dict (entity_id -> stats). Returns: Updated cache dict (may be the same object if no mutation needed). """ event_type: str = event["event_type"] payload: dict[str, Any] = event["payload"] if event_type == "entity_create": entity_id: str = payload["entity_id"] if entity_id not in cache: cache[entity_id] = {} elif event_type in ("stat_change", "stat_set", "chronicler_update", "manual_edit"): entity_id = payload.get("entity_id", event.get("target_entity", "")) stat_key: str = payload["stat_key"] if entity_id not in cache: cache[entity_id] = {} if event_type == "stat_set" or "value" in payload: # Unconditional string assignment cache[entity_id][stat_key] = str(payload["value"]) else: # Numeric delta delta: float = float(payload["delta"]) current_raw = cache[entity_id].get(stat_key, "0") try: current = float(current_raw) except ValueError: current = 0.0 new_val = current + delta # Preserve integer display when the result is whole if new_val == int(new_val): cache[entity_id][stat_key] = str(int(new_val)) else: cache[entity_id][stat_key] = str(new_val) # All other event types (dialogue, combat_roll, …) produce no cache change return cache