Source code for axiom.arbitrator

"""
core/arbitrator.py

The ArbitratorEngine — Axiom AI's deterministic firewall between LLM creativity and
the game's mathematical state.

On every narrative turn the ArbitratorEngine:

1. Fetches current entity stats from State_Cache + applies modifier overlay.
2. Retrieves relevant narrative memories from VectorMemory (RAG).
3. Builds the full narrative prompt (injecting any pending correction).
4. Calls the LLM and parses its response.
5. Validates every proposed state change against current stats.
6. Persists valid changes via EventSourcer; queues corrections for invalids.
7. Runs the Rules Engine for each mutated entity; persists triggered actions.
8. Ticks modifier durations.
9. Embeds the narrative chunk into VectorMemory.
10. Returns an ArbitratorResult with full detail for the UI / tests.

The Correction Loop (spec §4-B)
---------------------------------
If a change is rejected, a hidden system message is stored in
`_pending_correction`.  On the VERY NEXT turn this message is injected into
the prompt immediately before the user's input, then immediately cleared
so it cannot affect turn N+2.
"""

import json
import re
import sqlite3
from dataclasses import dataclass, field
from typing import Any, Callable

from axiom.logger import logger
from axiom.rules import RulesEngine
from axiom.events import EventSourcer
from axiom.modifiers import ModifierProcessor
from axiom.schema import get_connection
from axiom.backends.base import LLMBackend, LLMMessage, LLMResponse
from axiom.prompts import (
    HISTORY_TURN_CAP,
    build_narrative_prompt,
    build_timekeeper_prompt,
    format_entity_stats_block,
)
from axiom.memory import VectorMemory
from axiom.db_helpers import (
    get_current_time,
    get_spatial_context,
    get_time_of_day_context,
)


# Common words ignored when matching a turn's input against Lore_Book keywords
# and entry names — otherwise articles/prepositions (notably the leading "The"
# in most entry names) match nearly every entry and drown the ranking in noise.
_LORE_STOPWORDS: frozenset[str] = frozenset({
    "the", "a", "an", "of", "and", "or", "to", "in", "on", "at", "for", "with",
    "is", "are", "was", "were", "be", "by", "as", "it", "its", "this", "that",
    "these", "those", "i", "me", "my", "you", "your", "we", "our", "they",
    "them", "he", "she", "his", "her", "about", "tell", "what", "who", "where",
    "from", "into", "do", "does", "did", "go", "get", "us",
})


# ---------------------------------------------------------------------------
# Result type
# ---------------------------------------------------------------------------

[docs] @dataclass class ArbitratorResult: """The complete output of one ArbitratorEngine turn. Attributes: narrative_text: The prose to display to the player. Always present. applied_changes: State changes that passed validation and were persisted. rejected_changes: State changes that failed validation, each augmented with a "reason" key explaining the failure. triggered_rules: Rule actions fired as a consequence of applied changes. rule_chain_warning: True if the rules engine reached its iteration limit, indicating a possible infinite loop in creator rules. game_state_tag: The ambiance tag returned by the LLM (e.g. 'exploration'). player_entity_id: The ID of the player who sent the message for this turn. image_path: The file path of the generated image for this turn. """ narrative_text: str applied_changes: list[dict[str, Any]] = field(default_factory=list) rejected_changes: list[dict[str, Any]] = field(default_factory=list) inventory_changes: list[dict[str, Any]] = field(default_factory=list) triggered_rules: list[dict[str, Any]] = field(default_factory=list) rule_chain_warning: bool = False game_state_tag: str = "exploration" player_entity_id: str = "player" elapsed_minutes: int = 1 scene_pace: str = "deliberate" image_path: str | None = None #: Absolute in-game time (minutes) after this turn — i.e. the value written #: to the Timeline. Lets the GUI refresh its clock without a main-thread DB #: read (see ui/tabletop_view._on_turn_complete). in_game_time: int = 0
# --------------------------------------------------------------------------- # ArbitratorEngine # ---------------------------------------------------------------------------
[docs] class ArbitratorEngine: """Validates and applies LLM-proposed state changes for one narrative turn. Args: db_path: Path to the universe .db for direct entity queries. rules_list: List of creator-defined rules. """ def __init__( self, db_path: str, rules_list: list[dict], ) -> None: self._db_path = db_path self._rules_engine = RulesEngine(rules_list) self._event_sourcer = EventSourcer(db_path) self._modifier_processor = ModifierProcessor(db_path) # Dependencies to be injected via configure() self._llm: LLMBackend | None = None self._vector_memory: VectorMemory | None = None self._pending_correction: str | None = None self._mode: str = "Normal" self._hero_entity_id: str | None = None
[docs] def configure(self, llm: LLMBackend, vector_memory: VectorMemory, time_llm: LLMBackend | None = None) -> None: """Inject runtime dependencies before process_turn.""" self._llm = llm self._vector_memory = vector_memory self._time_llm = time_llm if time_llm is not None else llm
[docs] def invalidate_stats_cache(self) -> None: """No-op kept for API compatibility. Effective stats (base + modifier overlay) are now re-read from State_Cache + Active_Modifiers on every turn (see `_fetch_effective_stats`), so there is no stale cross-turn cache to clear. Previously this dropped an in-memory snapshot that could keep an *expired* modifier's delta baked in until the next chronicler/rewind — callers (rewind, post-chronicler) still call this harmlessly. """
# ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def process_turn( self, save_id: str, turn_id: int, intents: dict[str, str], universe_system_prompt: str, history: list[LLMMessage], stream_token_callback: Callable[[str], None] | None = None, temperature: float = 0.7, top_p: float = 1.0, verbosity_level: str = "balanced", mode: str = "Normal", hero_entity_id: str | None = None, ) -> ArbitratorResult: """Execute one full ArbitratorEngine turn and return the result. Args: save_id: The active save identifier. turn_id: The current turn number (monotonically increasing). intents: Dict mapping actor entity_id to their intent text. universe_system_prompt: The universe's foundational system prompt. history: Prior conversation turns for context. stream_token_callback: Optional callable invoked with each streaming token as it arrives from the LLM. temperature: Sampling temperature (0.0 to 1.0). top_p: Nucleus sampling parameter (0.0 to 1.0). verbosity_level: 'short', 'balanced', or 'talkative'. mode: Game mode ('Normal', 'Hardcore', 'Companion'). hero_entity_id: Optional ID of the AI Hero entity. Returns: ArbitratorResult containing narrative text and all change outcomes. Raises: LLMConnectionError: If the LLM backend is unreachable. """ self._mode = mode self._hero_entity_id = hero_entity_id # Determine the primary player ID for legacy events and UI fallback player_entity_id = next((aid for aid in intents.keys() if aid != hero_entity_id), "player") if intents else "player" self._player_entity_id = player_entity_id # Step 0 — Log intents _pending_events: list[tuple] = [] for actor_id, intent_text in intents.items(): # For backward compatibility with existing tests, the primary player gets 'user_input' event_type = "hero_intent" if actor_id == hero_entity_id else "user_input" self._event_sourcer.append_event( save_id, turn_id, event_type, actor_id, {"text": intent_text} ) combined_intents_text = " ".join(intents.values()) if intents else "" # Step 1 — Fetch and overlay stats for all active entities all_stats = self._fetch_effective_stats(save_id) # Step 2 — RAG retrieval (Narrative Memories + Lore Book) # We query for both prior narrative chunks and structured lore # Point 2: Pass current_turn_id for Time-Weighted search # Mission: Exclude turns that are already in the conversation history (HISTORY_TURN_CAP) # NB: imported locally so tests can patch `axiom.config.load_config`. from axiom.config import load_config cfg = load_config() # Calculate the oldest turn ID still in the history window max_turn_id = max(0, turn_id - HISTORY_TURN_CAP) rag_results = self._vector_memory.query( save_id, combined_intents_text, k=cfg.rag_chunk_count, current_turn_id=turn_id, max_turn_id=max_turn_id ) rag_chunks = [r["text"] for r in rag_results if r.get("metadata", {}).get("type") != "lore"] # Step 3 — Relevant Context Filtering (Context Optimization) # Only send stats for entities that are "active" in the current context # to save tokens and reduce LLM confusion. relevant_entity_ids = self._identify_relevant_entities( save_id, combined_intents_text, history, rag_chunks, all_stats ) logger.debug(f"[ARBITRATOR] Identified {len(relevant_entity_ids)} relevant entities: {sorted(list(relevant_entity_ids))}") # Always include the actors that submitted intents for actor_id in intents.keys(): relevant_entity_ids.add(actor_id) if not intents: relevant_entity_ids.add("player") filtered_stats = { eid: stats for eid, stats in all_stats.items() if eid in relevant_entity_ids } # Fetch entity names/types AND the player persona in a single connection # (both feed the prompt below — no need for two round-trips). player_persona = "" with get_connection(self._db_path) as conn: name_rows = conn.execute("SELECT entity_id, name, entity_type FROM Entities").fetchall() id_to_name = {r["entity_id"]: r["name"] for r in name_rows} id_to_type = {r["entity_id"]: r["entity_type"] for r in name_rows} # The player's ID is often "player", but might have a real name. # If not explicitly named, default to "Player" to give it a proper noun. if "player" not in id_to_name: id_to_name["player"] = "Player" try: row = conn.execute( "SELECT player_persona FROM Saves WHERE save_id = ?;", (save_id,), ).fetchone() if row and row["player_persona"]: player_persona = row["player_persona"] except sqlite3.Error: pass # Step 4 — Build prompt (with pending correction) entity_block = format_entity_stats_block( [ { "entity_id": eid, "name": id_to_name.get(eid, eid), "entity_type": id_to_type.get(eid, "unknown"), "stats": stats } for eid, stats in filtered_stats.items() ] ) # Fetch actual Lore Book entries if available (subset matching the query) lore_book_subset = self._fetch_relevant_lore(save_id, combined_intents_text) # Get current time context total_mins = get_current_time(self._db_path, save_id) time_ctx = get_time_of_day_context(total_mins) # Spatial Context (Approach A: Hierarchical Breadcrumbs + Neighbors) spatial_ctx = None player_loc_id = filtered_stats.get(player_entity_id, {}).get("Location") if player_loc_id: spatial_ctx = get_spatial_context(self._db_path, player_loc_id) # Phase 12.1: Fetch triggered scheduled events triggered_events = self._fetch_triggered_events(save_id, total_mins) # Map intents to names named_intents = {id_to_name.get(eid, eid): intent for eid, intent in (intents or {}).items()} hero_name_str = id_to_name.get(hero_entity_id, hero_entity_id) if hero_entity_id else None # Collect local character names for Group Awareness local_character_names = [] player_loc = filtered_stats.get(player_entity_id, {}).get("Location") if player_loc: for eid, stats in filtered_stats.items(): loc = stats.get("Location") if loc and str(loc).lower() == str(player_loc).lower(): # Map the entity ID to name name = id_to_name.get(eid, eid) if name.lower() == "player": name = "Player" local_character_names.append(name) messages = build_narrative_prompt( universe_system_prompt=universe_system_prompt, entity_stats_block=entity_block, rag_chunks=rag_chunks, history=history, intents=named_intents, pending_correction=self._pending_correction, player_persona=player_persona, lore_book=lore_book_subset, verbosity_level=verbosity_level, current_time_str=time_ctx, scheduled_events=triggered_events, spatial_context=spatial_ctx, mode=self._mode, hero_entity_id=hero_name_str, local_character_names=local_character_names, ) # Step 4 — Clear pending correction immediately after prompt is built self._pending_correction = None # Step 5 — Call LLM (streaming or non-streaming based on callback) # Phase 11: Dynamic stop sequences to prevent impersonation stops = ["\nUser:", "\nPlayer:", "\n[User]", "<|eot_id|>"] for actor_id in intents.keys(): stops.extend([f"\n{actor_id}:", f"\n[{actor_id}]"]) # Mapping verbosity to max_tokens to prevent runaway generation verbosity_to_tokens = { "short": 150, "balanced": 400, "talkative": 1024 } max_tokens = verbosity_to_tokens.get(verbosity_level.lower(), 400) llm_response = self._call_llm( messages, stream_token_callback, temperature, top_p, stop_sequences=stops, max_tokens=max_tokens ) narrative_text: str = llm_response.narrative_text # Step 6 — Parse tool call state_changes: list[dict[str, Any]] = [] inventory_changes: list[dict[str, Any]] = [] game_state_tag: str = "exploration" scene_pace: str = "deliberate" if llm_response.tool_call: state_changes = llm_response.tool_call.get("state_changes", []) inventory_changes = llm_response.tool_call.get("inventory_changes", []) game_state_tag = str(llm_response.tool_call.get("game_state_tag", "exploration")).strip().lower() scene_pace = str(llm_response.tool_call.get("scene_pace", "deliberate")).strip().lower() # Deduce elapsed in-game minutes. By default a dedicated "Timekeeper" LLM # call parses the action + narrative to estimate the time that passed. # This second call can be disabled (cfg.timekeeper_enabled = False) to save # an LLM round-trip per turn, in which case the time is derived from the # scene pace alone — cheaper but coarser. See Pilier 5 / TICKET-015. elapsed_minutes: int | None = None if cfg.timekeeper_enabled: prompt = build_timekeeper_prompt(combined_intents_text, narrative_text) try: time_llm = getattr(self, "_time_llm", self._llm) tk_resp = time_llm.complete(prompt, max_tokens=150, temperature=0.1) tk_data = getattr(tk_resp, "tool_call", {}) or {} if not tk_data: tk_text = getattr(tk_resp, "narrative_text", str(tk_resp)) match = re.search(r'\{.*\}', tk_text, re.DOTALL) if match: try: tk_data = json.loads(match.group(0)) except json.JSONDecodeError: pass if tk_data and "elapsed_minutes" in tk_data: elapsed_minutes = int(tk_data["elapsed_minutes"]) except Exception as e: logger.error(f"[ARBITRATOR] Timekeeper failed: {e}") # Fallback based on scene pace. Also the primary path when the Timekeeper # is disabled or could not produce a value. if elapsed_minutes is None: pace_defaults = { "combat": 2, "dialogue": 5, "conversation": 5, "exploration": 15, "travel": 60, "deliberate": 15, "montage": 60, "tension": 10 } elapsed_minutes = pace_defaults.get(scene_pace, 15) # In-game clock after this turn. Persisted to Timeline once, after the # state-change loop below, so a travelling turn produces a single timeline # row (with the travel note) instead of two (TICKET-019). new_time = total_mins + elapsed_minutes travel_note: str | None = None # Step 7 — Validate and apply each state change applied_changes: list[dict[str, Any]] = [] rejected_changes: list[dict[str, Any]] = [] rejection_messages: list[str] = [] # Load the set of defined stat names ONCE per turn (lowercased) instead # of re-querying Stat_Definitions for every single change (was an N+1). defined_stats = self._load_defined_stats() if state_changes else set() for change in state_changes: entity_id: str = change.get("entity_id", "") stat_key: str = change.get("stat_key", "") delta: float | None = change.get("delta") value: Any = change.get("value") valid, reason = self._validate_change( entity_id, stat_key, delta, value, all_stats, defined_stats ) if valid: payload: dict[str, Any] = {"entity_id": entity_id, "stat_key": stat_key} if delta is not None: payload["delta"] = delta event_type = "stat_change" else: payload["value"] = value event_type = "stat_set" # Special Case: Location Change -> annotate the timeline row with # the distance traveled. The actual in-game time cost is already # captured in elapsed_minutes (the LLM/Timekeeper accounts for the # mode of transport via the narrative); here we only enrich the # single timeline entry written after this loop (TICKET-019). if entity_id == player_entity_id and stat_key == "Location" and value: old_loc = all_stats.get(entity_id, {}).get("Location") if old_loc and old_loc != value: travel_dist = self._get_travel_distance(old_loc, value) if travel_dist > 0: travel_note = f"Traveled to {value} ({travel_dist} km)" _pending_events.append((save_id, turn_id, event_type, entity_id, payload)) # Update local snapshot for downstream Rules evaluation self._apply_local_change(entity_id, payload, all_stats) applied_changes.append(change) else: rejected = dict(change) rejected["reason"] = reason rejected_changes.append(rejected) rejection_messages.append( f"{entity_id}.{stat_key}: {reason}" ) # Queue a combined correction if any changes were rejected if rejection_messages: self._queue_correction("; ".join(rejection_messages)) # Persist the advanced in-game clock — exactly one Timeline row per turn # (TICKET-010 / TICKET-019). The description carries the travel note when # the player moved, otherwise the plain time advance. timeline_desc = travel_note or f"Turn advanced by {elapsed_minutes} mins" try: with get_connection(self._db_path) as conn: conn.execute( "INSERT INTO Timeline (save_id, turn_id, in_game_time, description) VALUES (?, ?, ?, ?);", (save_id, turn_id, new_time, timeline_desc) ) conn.commit() except Exception as e: logger.error(f"[ARBITRATOR] Failed to persist new time: {e}") # Step 7.5 — Process Inventory Changes applied_inventory: list[dict[str, Any]] = [] for inv_change in inventory_changes: # { "entity_id": str, "item_id": str, "action": "add"|"remove", "quantity": int } valid, reason = self._validate_inventory_change(save_id, inv_change) if valid: self._apply_inventory_change(save_id, turn_id, inv_change, _pending_events) applied_inventory.append(inv_change) else: self._queue_correction(f"Inventory: {reason}") # Step 8 — Run Rules Engine (Persistent & Chained) # Point 4: Rule actions now generate persistent events and update stats triggered_rules: list[dict[str, Any]] = [] _seen_rule_signatures: set[str] = set() mutated_entities = {c.get("entity_id") for c in applied_changes if c.get("entity_id")} rule_chain_warning = False # We keep evaluating while new rules trigger (chaining) # Max 5 iterations to prevent infinite loops from poorly defined rules for i in range(5): new_mutations = set() for entity_id in list(mutated_entities): stats = all_stats.get(entity_id, {}) triggered_actions = self._rules_engine.evaluate(entity_id, stats) for action in triggered_actions: # Check if this rule already triggered for this entity this turn # to prevent trivial infinite loops action_id = f"{entity_id}_{action.get('type')}_{action.get('stat')}_{action.get('value')}" if action_id in _seen_rule_signatures: continue _seen_rule_signatures.add(action_id) # 1. Map rule action to a persistent event payload: dict[str, Any] = { "entity_id": entity_id, "stat_key": action.get("stat"), "source_rule": action.get("rule_id") } event_type = "stat_set" if action["type"] == "stat_change": payload["delta"] = action.get("value") event_type = "stat_change" else: payload["value"] = action.get("value") _pending_events.append((save_id, turn_id, event_type, entity_id, payload)) # 1b. Also record the trigger itself for tracking/history _pending_events.append((save_id, turn_id, "rule_trigger", entity_id, action)) # 2. Update local stats for chaining/consistency self._apply_local_change(entity_id, payload, all_stats) triggered_rules.append(action) new_mutations.add(entity_id) if not new_mutations: break if i == 4: # Reached limit rule_chain_warning = True _pending_events.append((save_id, turn_id, "rule_engine_warning", "system", {"message": "Maximum rule chaining depth (5) reached. Possible infinite loop detected."}) ) mutated_entities = new_mutations # Step 9 — Tick modifiers self._modifier_processor.tick_modifiers(save_id, elapsed_minutes=elapsed_minutes) # Step 10 — Embed narrative chunk if narrative_text.strip(): self._vector_memory.embed_chunk(save_id, turn_id, narrative_text) # Step 11 — Log narrative event (Multiverse-compatible) text_to_log = combined_intents_text if not narrative_text.strip() else narrative_text _pending_events.append(( save_id, turn_id, "narrative_text", "system", {"active": 0, "variants": [text_to_log]}, )) # Flush all deferred events in a single transaction self._event_sourcer.append_events_batch(_pending_events) # Keep the materialised State_Cache in sync with this turn's changes so # DB reads (sidebar load tasks, next turn's stat fetch) don't show stats # frozen at session load. This is also what lets the next turn re-read # fresh effective stats without any in-memory cache. (TICKET-002) self._event_sourcer.update_state_cache(save_id, _pending_events) # Phase 12.1: Mark scheduled events as fired for ev in triggered_events: self._mark_event_as_fired(save_id, ev["event_id"]) return ArbitratorResult( narrative_text=narrative_text, applied_changes=applied_changes, rejected_changes=rejected_changes, inventory_changes=applied_inventory, triggered_rules=triggered_rules, rule_chain_warning=rule_chain_warning, game_state_tag=game_state_tag, player_entity_id=player_entity_id, elapsed_minutes=elapsed_minutes, scene_pace=scene_pace, in_game_time=new_time, )
# ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _call_llm( self, messages: list[LLMMessage], stream_token_callback: Callable[[str], None] | None, temperature: float = 0.7, top_p: float = 1.0, stop_sequences: list[str] | None = None, max_tokens: int | None = None, ) -> LLMResponse: """Call the LLM, optionally streaming tokens via a callback. When stream_token_callback is None: uses llm.complete() — identical to Phase 2 behaviour, all existing tests pass unchanged. When stream_token_callback is provided: uses llm.stream_tokens() to yield tokens one by one. Each token is passed to the callback (e.g. NarrativeWorker.token_received.emit) for real-time UI updates. Tokens are accumulated and assembled into a full string, then parse_tool_call() is applied to extract narrative and tool_call JSON. Args: messages: The fully built prompt message list. stream_token_callback: Optional token handler; None = non-streaming. temperature: Sampling temperature (0.0 to 1.0). top_p: Nucleus sampling parameter (0.0 to 1.0). stop_sequences: Custom strings to trigger generation stop. max_tokens: Optional limit on the number of tokens to generate. Returns: LLMResponse with narrative_text, optional tool_call, finish_reason. Raises: LLMConnectionError: If the LLM is unreachable during streaming. """ if stream_token_callback is None: resp = self._llm.complete( messages, temperature=temperature, top_p=top_p, stop_sequences=stop_sequences, max_tokens=max_tokens ) else: # Streaming path raw_tokens: list[str] = [] is_json_block = False buffer = "" for token in self._llm.stream_tokens( messages, temperature=temperature, top_p=top_p, stop_sequences=stop_sequences, max_tokens=max_tokens ): raw_tokens.append(token) if not is_json_block: buffer += token match = re.search(r'(~~+json|~~~|```json|```)', buffer) if match: is_json_block = True idx = match.start() if idx > 0: stream_token_callback(buffer[:idx]) elif len(buffer) > 15: stream_token_callback(buffer[:-15]) buffer = buffer[-15:] if not is_json_block and buffer: stream_token_callback(buffer) full_raw = "".join(raw_tokens) narrative, tool_call = self._llm.parse_tool_call(full_raw) resp = LLMResponse( narrative_text=narrative, tool_call=tool_call, finish_reason=self._llm.last_finish_reason, ) # Apply narrator-specific "Trim Sentences" post-processing from axiom.config import load_config try: config = load_config() if getattr(config, "trim_sentences", True): # Trim if finish_reason is length OR if the narrative text does not end with a sentence terminator is_incomplete = not re.search(r'[.!?。!?]+["\'”»\s\)]*$', resp.narrative_text.strip()) if resp.finish_reason == "length" or is_incomplete: resp.narrative_text = LLMResponse._trim_incomplete_sentence(resp.narrative_text) except Exception: pass return resp def _fetch_effective_stats(self, save_id: str) -> dict[str, dict[str, str]]: """Fetch all active entity stats and apply modifier overlays. Uses two global queries (one for all stats, one for all modifiers) in a single connection instead of per-entity round-trips. Args: save_id: The active save identifier. Returns: Dict mapping entity_id -> effective stats dict. """ from axiom.textfmt import fmt_num with get_connection(self._db_path) as conn: stat_rows = conn.execute( "SELECT entity_id, stat_key, stat_value FROM State_Cache WHERE save_id = ?;", (save_id,), ).fetchall() mod_rows = conn.execute( """ SELECT entity_id, stat_key, delta FROM Active_Modifiers WHERE save_id = ?; """, (save_id,), ).fetchall() base: dict[str, dict[str, str]] = {} for r in stat_rows: base.setdefault(r["entity_id"], {})[r["stat_key"]] = r["stat_value"] effective = {eid: dict(stats) for eid, stats in base.items()} for r in mod_rows: if r["entity_id"] in effective: current_raw = effective[r["entity_id"]].get(r["stat_key"], "0") try: current = float(current_raw) effective[r["entity_id"]][r["stat_key"]] = fmt_num(current + r["delta"]) except ValueError: pass return effective def _identify_relevant_entities( self, save_id: str, user_message: str, history: list[LLMMessage], rag_chunks: list[str], all_stats: dict[str, dict[str, str]], ) -> set[str]: """Identify relevant entities based on mentions, type, and location. Always includes: - Entities explicitly mentioned in the recent context. - Entities of type 'world' or 'faction' (global context). - NPCs that share the same 'Location' stat as the player. """ # 1. Mentions-based detection text_to_scan = user_message.lower() for msg in history[-3:]: text_to_scan += " " + msg["content"].lower() for chunk in rag_chunks: text_to_scan += " " + chunk.lower() words_in_text = set(re.findall(r"\b\w+\b", text_to_scan)) with get_connection(self._db_path) as conn: rows = conn.execute("SELECT entity_id, entity_type FROM Entities;").fetchall() # Map lowercase ID -> Original ID original_case_map = {row[0].lower(): row[0] for row in rows} id_to_type = {row[0]: row[1] for row in rows} all_ids_lower = set(original_case_map.keys()) matched_ids = {original_case_map[ml] for ml in words_in_text.intersection(all_ids_lower)} # 2. Location-based and Type-based inclusion relevant = set(matched_ids) # Get player's location player_id = getattr(self, "_player_entity_id", "player") player_stats = all_stats.get(player_id, {}) player_loc = player_stats.get("Location", "").lower() npc_count_at_loc = 0 for eid, etype in id_to_type.items(): # Include all global entities if etype in ("world", "faction"): relevant.add(eid) continue # Include NPCs at the same location (Limit to 3 to prevent bloat) if etype == "npc" and player_loc: entity_loc = all_stats.get(eid, {}).get("Location", "").lower() if entity_loc == player_loc: if npc_count_at_loc < 3: relevant.add(eid) npc_count_at_loc += 1 return relevant def _fetch_triggered_events(self, save_id: str, current_minute: int) -> list[dict]: """Fetch global scheduled events that have triggered but not yet fired for this save.""" events = [] try: with get_connection(self._db_path) as conn: rows = conn.execute( """ SELECT e.event_id, e.title, e.description FROM Scheduled_Events e LEFT JOIN Fired_Scheduled_Events f ON e.event_id = f.event_id AND f.save_id = ? WHERE e.trigger_minute <= ? AND f.event_id IS NULL; """, (save_id, current_minute) ).fetchall() events = [dict(r) for r in rows] except Exception as e: logger.error(f"[ARBITRATOR] Error fetching scheduled events: {e}") return events def _mark_event_as_fired(self, save_id: str, event_id: str) -> None: """Record that a scheduled event has occurred for this save.""" try: with get_connection(self._db_path) as conn: conn.execute( "INSERT OR IGNORE INTO Fired_Scheduled_Events (save_id, event_id) VALUES (?, ?);", (save_id, event_id) ) conn.commit() except Exception as e: logger.error(f"[ARBITRATOR] Error marking event as fired: {e}") def _fetch_relevant_lore(self, save_id: str, user_message: str, k: int = 5) -> list[dict]: """Fetch Lore Book entries relevant to the current turn. Reads the structured `Lore_Book` table directly and ranks entries by keyword / name overlap with the current input. Returns the top-k matches as `{category, name, content}` dicts (the shape the prompt builder expects), or an empty list when nothing matches. Previously this went through VectorMemory and filtered on a metadata key the query never returned (and lore was never embedded), so it always yielded an empty list while still paying a vector query per turn. Lore_Book is universe-level structured data with an explicit `keywords` column — a direct, deterministic SQL lookup is both correct and cheaper. """ text = (user_message or "").lower() if not text: return [] words = {w for w in re.findall(r"\b\w+\b", text) if w not in _LORE_STOPWORDS} if not words: return [] try: with get_connection(self._db_path) as conn: rows = conn.execute( "SELECT category, name, keywords, content FROM Lore_Book;" ).fetchall() except sqlite3.Error as e: logger.error(f"[ARBITRATOR] Error fetching lore book: {e}") return [] scored: list[tuple[int, dict]] = [] for r in rows: name = (r["name"] or "").strip() keywords = (r["keywords"] or "").lower() # Tokenise both the keywords column and the entry name for matching. tokens = set(re.findall(r"\b\w+\b", keywords)) | set( re.findall(r"\b\w+\b", name.lower()) ) score = len(words & tokens) if score > 0: scored.append((score, { "category": r["category"] or "", "name": name, "content": r["content"] or "", })) scored.sort(key=lambda s: s[0], reverse=True) return [entry for _score, entry in scored[:k]] def _get_travel_distance(self, source_id: str, target_id: str) -> int: """Query the distance between two locations in kilometers.""" try: with get_connection(self._db_path) as conn: row = conn.execute( "SELECT distance_km FROM Location_Connections WHERE source_id = ? AND target_id = ?;", (source_id, target_id) ).fetchone() if row: return int(row[0]) except Exception: # Distance unknown → treat as 0 (adjacent), but leave a trace: a DB # error here would otherwise silently distort travel-time logic. logger.debug( "Travel-distance lookup %s%s failed; defaulting to 0.", source_id, target_id, exc_info=True, ) return 0 def _load_defined_stats(self) -> set[str]: """Return the set of defined stat names (lowercased) for this universe. Read once per turn and passed to `_validate_change` so the stat-restriction rule no longer issues one query per proposed change (former N+1). """ try: with get_connection(self._db_path) as conn: rows = conn.execute("SELECT name FROM Stat_Definitions;").fetchall() return {str(r[0]).lower() for r in rows} except sqlite3.Error: return set() def _validate_change( self, entity_id: str, stat_key: str, delta: float | None, value: Any, all_effective_stats: dict[str, dict[str, str]], defined_stats: set[str], ) -> tuple[bool, str]: """Validate a single proposed state change. Rules: - Unknown entity_id → rejected (if the entity set is non-empty). - Stat key not in Stat_Definitions → rejected (except special 'Description'). - Delta change on a non-negative resource that would go below 0 → rejected. - Absolute assignment on a non-negative resource that would go below 0 → rejected. Args: entity_id: The entity to modify. stat_key: The stat to change. delta: Signed numeric change, or None if using value. value: Absolute assignment value, or None if using delta. all_effective_stats: Full map of entity_id -> stats for all active entities in this save. defined_stats: Lowercased set of stat names defined in this universe (loaded once per turn). Returns: (True, "") if valid, or (False, reason_string) if invalid. """ if not entity_id: return False, "Missing entity_id in state change." if all_effective_stats and entity_id not in all_effective_stats: # Non-empty entity set means we know all valid entities return False, f"Unknown entity: {entity_id}" # Stat Restriction Rule: Only allow stats defined in Stat_Definitions (case-insensitive) # Plus the special 'Description' stat which is allowed for all entities. if stat_key.lower() != "description": if stat_key.lower() not in defined_stats: return False, f"Stat '{stat_key}' is not defined in this universe. Custom stats are forbidden." # Resource sufficiency rules (prevent stats like HP, Gold, etc. from going below zero) entity_stats = all_effective_stats.get(entity_id, {}) current_raw = entity_stats.get(stat_key, "0") try: current_val = float(current_raw) except ValueError: current_val = None # Non-numeric stat # Calculate proposed new value if delta is not None: if current_val is None: return False, f"Cannot apply numeric delta to non-numeric stat {entity_id}.{stat_key}." result_val = current_val + float(delta) elif value is not None: try: result_val = float(value) except (ValueError, TypeError): result_val = None # Assigning a non-numeric string is always valid for the cache else: return False, f"State change for {entity_id}.{stat_key} has neither delta nor value." # Enforce non-negativity if it's a numeric resource if current_val is not None and result_val is not None: if current_val >= 0 and result_val < 0: # COMPANION MODE: Hero has Plot Armor (cannot drop below 0 for critical resources) if self._mode == "Companion" and entity_id == self._hero_entity_id: # Allow it but set to 0 instead of rejecting, or just ignore the reduction # Here we silently cap at 0 to ensure the turn proceeds but the hero survives. return True, "" return False, ( f"{entity_id} does not have enough {stat_key} (current: {current_val:.0f})" ) return True, "" def _queue_correction(self, reason: str) -> None: """Format and store a correction message for the next turn's prompt. If a correction is already queued (from multiple rejections), the new reason is concatenated. Args: reason: Human-readable description of what failed. """ correction = ( f"[NARRATOR HINT: The previous action failed because {reason}. " "Describe this failure naturally in the story. Do not mention this hint.]" ) if self._pending_correction is None: self._pending_correction = correction else: self._pending_correction += f" {correction}" def _apply_local_change(self, entity_id: str, payload: dict, all_stats: dict) -> None: """Update a local stats snapshot with a proposed change. Ensures that within a single turn, subsequent validations or rules see the effects of previous changes. """ if entity_id not in all_stats: all_stats[entity_id] = {} stat_key = payload["stat_key"] if "delta" in payload: current_raw = all_stats[entity_id].get(stat_key, "0") try: current = float(current_raw) except ValueError: current = 0.0 new_val = current + float(payload["delta"]) all_stats[entity_id][stat_key] = ( str(int(new_val)) if new_val == int(new_val) else str(new_val) ) else: all_stats[entity_id][stat_key] = str(payload["value"]) def _validate_inventory_change(self, save_id: str, change: dict) -> tuple[bool, str]: """Verify if an inventory transaction is legal.""" entity_id = change.get("entity_id") item_id = change.get("item_id") action = change.get("action") if not entity_id or not item_id or action not in ("add", "remove"): return False, "Malformed inventory change (missing entity_id, item_id, or invalid action)." # `quantity` comes straight from untrusted LLM JSON: it can be missing, # null, a non-numeric / float string, or <= 0. Coerce defensively here so # a bad value rejects the change (→ correction hint) instead of crashing # the whole turn (int(None)/int("two") raise) or later violating the # quantity >= 0 CHECK / silently turning a "remove" into an add. try: quantity = int(change.get("quantity", 1)) except (ValueError, TypeError): return False, "Inventory quantity must be a whole number." if quantity <= 0: return False, "Inventory quantity must be a positive whole number." with get_connection(self._db_path) as conn: # 1. Check if item exists in definitions item_exists = conn.execute( "SELECT 1 FROM Item_Definitions WHERE item_id = ?;", (item_id,) ).fetchone() if not item_exists: return False, f"Unknown item: {item_id}" # 2. Check if entity exists in this save entity_exists = conn.execute( "SELECT 1 FROM Entities WHERE entity_id = ?;", (entity_id,) ).fetchone() if not entity_exists: return False, f"Unknown entity: {entity_id}" if action == "remove": # 3. Check if entity has enough quantity row = conn.execute( "SELECT quantity FROM Items_Inventory WHERE save_id = ? AND entity_id = ? AND item_id = ?;", (save_id, entity_id, item_id) ).fetchone() current_qty = row["quantity"] if row else 0 if current_qty < quantity: return False, f"Insufficient quantity for {item_id} (has {current_qty}, needs {quantity})." return True, "" def _apply_inventory_change(self, save_id: str, turn_id: int, change: dict, pending_events: list[tuple] | None = None) -> None: """Persist an inventory transaction and log the event.""" entity_id = change["entity_id"] item_id = change["item_id"] action = change["action"] quantity = int(change.get("quantity", 1)) with get_connection(self._db_path) as conn: if action == "add": conn.execute( """ INSERT INTO Items_Inventory (save_id, entity_id, item_id, quantity) VALUES (?, ?, ?, ?) ON CONFLICT(save_id, entity_id, item_id) DO UPDATE SET quantity = quantity + excluded.quantity; """, (save_id, entity_id, item_id, quantity) ) elif action == "remove": conn.execute( """ UPDATE Items_Inventory SET quantity = quantity - ? WHERE save_id = ? AND entity_id = ? AND item_id = ?; """, (quantity, save_id, entity_id, item_id) ) # Cleanup zero-quantity items conn.execute( "DELETE FROM Items_Inventory WHERE quantity <= 0;" ) conn.commit() # Log to event source for perfect rewindability event_tuple = (save_id, turn_id, f"inventory_{action}", entity_id, change) if pending_events is not None: pending_events.append(event_tuple) else: self._event_sourcer.append_event(*event_tuple)