"""axiom.compile — Universe-as-Code: compiling a source tree into a `.db` cache.
A universe is defined as a versionable **tree of text files** (TOML/MD); the
SQLite `.db` is only a **compiled cache** derived from the text. The text is
the source of truth. This module reads the tree and populates a runtime `.db`.
Zero Qt dependency: pure engine, drivable from the CLI (`axiom compile`).
Boundary: only the **definition tables** are produced here. The runtime/save
tables (Saves, Event_Log, State_Cache, Snapshots, Timeline, …) stay empty in
the cache — they do not belong to the universe definition.
TOML reading: `tomllib` (stdlib). Writing (decompile) uses `tomlkit`.
"""
from __future__ import annotations
import hashlib
import json
import sqlite3
import tomllib
from pathlib import Path
from typing import Any
from axiom.schema import create_universe_db
from axiom.time_system import CalendarConfig
# ---------------------------------------------------------------------------
# Constantes
# ---------------------------------------------------------------------------
CACHE_DIRNAME = ".axiom-cache"
CACHE_DB_NAME = "universe.db"
CACHE_HASH_NAME = "cache_hash.txt"
# Frontmatter TOML pour les entrées de lore en Markdown : `+++\n<toml>\n+++\n<corps>`
_FRONTMATTER_DELIM = "+++"
# Clés Universe_Meta connues, gérées par des sections structurées de universe.toml.
# Toute autre clé est préservée verbatim dans [extra] (lossless).
_META_NAME = "universe_name"
_META_SYSTEM_PROMPT = "system_prompt"
_META_GLOBAL_LORE = "global_lore"
_META_FIRST_MESSAGE = "first_message"
_META_WORLD_TENSION = "world_tension_level"
_META_CALENDAR = "calendar_config"
_META_COMPANION_ENABLED = "companion_mode_enabled"
_META_COMPANION_HERO = "companion_hero_id"
_STRUCTURED_META_KEYS = frozenset({
_META_NAME,
_META_SYSTEM_PROMPT,
_META_GLOBAL_LORE,
_META_FIRST_MESSAGE,
_META_WORLD_TENSION,
_META_CALENDAR,
_META_COMPANION_ENABLED,
_META_COMPANION_HERO,
})
[docs]
class CompileError(Exception):
"""Universe compilation error (malformed source, missing required field)."""
# ---------------------------------------------------------------------------
# Hash de l'arborescence source (cache invalidation)
# ---------------------------------------------------------------------------
def _iter_source_files(src_dir: Path):
"""Itère sur les fichiers source pertinents (hors cache et VCS), triés."""
for path in sorted(src_dir.rglob("*")):
if not path.is_file():
continue
rel_parts = path.relative_to(src_dir).parts
if rel_parts and rel_parts[0] in (CACHE_DIRNAME, ".git"):
continue
yield path
[docs]
def hash_directory(src_dir: Path) -> str:
"""Stable hash of the source content (relative path + bytes of each file)."""
h = hashlib.sha256()
for path in _iter_source_files(src_dir):
rel = path.relative_to(src_dir).as_posix()
h.update(rel.encode("utf-8"))
h.update(b"\0")
h.update(path.read_bytes())
h.update(b"\0")
return h.hexdigest()
# ---------------------------------------------------------------------------
# Helpers de lecture
# ---------------------------------------------------------------------------
def _load_toml(path: Path) -> dict[str, Any]:
"""Charge un fichier TOML. Lève CompileError si malformé."""
try:
with path.open("rb") as fh:
return tomllib.load(fh)
except (tomllib.TOMLDecodeError, OSError) as exc:
raise CompileError(f"Invalid TOML: {path} — {exc}") from exc
def _require(data: dict, key: str, ctx: str) -> Any:
"""Récupère une clé requise ou lève CompileError."""
if key not in data:
raise CompileError(f"Required field '{key}' missing in {ctx}")
return data[key]
def _resolve_text(src_dir: Path, data: dict, inline_key: str, file_key: str) -> str:
"""Résout un texte fourni soit inline (`inline_key`) soit via fichier (`file_key`)."""
if data.get(file_key):
target = src_dir / str(data[file_key])
try:
# newline="" : pas de traduction de fin de ligne (fidélité LF).
# Via open() et non read_text(newline=) : ce dernier exige Python 3.13+ (TICKET-049).
with target.open("r", encoding="utf-8", newline="") as fh:
return fh.read()
except OSError as exc:
raise CompileError(f"Referenced file not found: {target} — {exc}") from exc
return str(data.get(inline_key, ""))
def _toml_files(directory: Path):
"""Itère sur les *.toml d'un dossier (non récursif), hors `_index.toml`, triés."""
if not directory.is_dir():
return
for path in sorted(directory.glob("*.toml")):
if path.name == "_index.toml":
continue
yield path
# ---------------------------------------------------------------------------
# Parsers (chacun tolérant à l'absence du fichier/dossier)
# ---------------------------------------------------------------------------
def _parse_universe(src_dir: Path) -> tuple[dict[str, str], set[str]]:
"""Parse universe.toml → dict {clé Universe_Meta: valeur str}.
Retourne aussi l'ensemble des fichiers référencés (`*_file`) pour que le
parseur de lore les exclue du Lore_Book.
"""
path = src_dir / "universe.toml"
if not path.exists():
raise CompileError(f"Required universe.toml not found in {src_dir}")
data = _load_toml(path)
meta: dict[str, str] = {}
referenced: set[str] = set()
meta_section = data.get("meta", {})
name = meta_section.get("name")
if name:
meta[_META_NAME] = str(name)
narrative = data.get("narrative", {})
if "system_prompt" in narrative:
meta[_META_SYSTEM_PROMPT] = str(narrative["system_prompt"])
if narrative.get("global_lore_file"):
referenced.add(str(narrative["global_lore_file"]))
if narrative.get("first_message_file"):
referenced.add(str(narrative["first_message_file"]))
global_lore = _resolve_text(src_dir, narrative, "global_lore", "global_lore_file")
if global_lore:
meta[_META_GLOBAL_LORE] = global_lore
first_message = _resolve_text(src_dir, narrative, "first_message", "first_message_file")
if first_message:
meta[_META_FIRST_MESSAGE] = first_message
if "world_tension_level" in narrative:
meta[_META_WORLD_TENSION] = str(narrative["world_tension_level"])
calendar = data.get("calendar")
if calendar:
cfg = CalendarConfig(
minutes_per_hour=int(calendar.get("minutes_per_hour", 60)),
hours_per_day=int(calendar.get("hours_per_day", 24)),
days_per_month=list(calendar.get("days_per_month", [30] * 12)),
month_names=list(calendar.get("month_names", [f"Month {i + 1}" for i in range(12)])),
start_day=int(calendar.get("start_day", 1)),
start_hour=int(calendar.get("start_hour", 0)),
start_minute=int(calendar.get("start_minute", 0)),
)
meta[_META_CALENDAR] = cfg.to_json()
companion = data.get("companion")
if companion is not None:
# Mapping clé par clé (symétrique avec decompile → round-trip lossless).
# Le moteur stocke `companion_mode_enabled` en "1"/"0" (comparé `== "1"`).
if "enabled" in companion:
meta[_META_COMPANION_ENABLED] = "1" if companion["enabled"] else "0"
if "hero_id" in companion:
meta[_META_COMPANION_HERO] = str(companion["hero_id"])
# Passthrough verbatim des clés inconnues (lossless).
for key, value in data.get("extra", {}).items():
meta[str(key)] = str(value)
return meta, referenced
def _parse_stat_definitions(src_dir: Path) -> list[tuple]:
"""stats/definitions.toml → lignes Stat_Definitions."""
path = src_dir / "stats" / "definitions.toml"
if not path.exists():
return []
data = _load_toml(path)
rows: list[tuple] = []
for entry in data.get("definitions", []):
params = entry.get("parameters", {})
rows.append((
_require(entry, "stat_id", path.name),
str(entry.get("name", entry["stat_id"])),
str(entry.get("description", "")),
str(entry.get("value_type", "numeric")),
json.dumps(params) if isinstance(params, (dict, list)) else str(params),
))
return rows
def _parse_entities(src_dir: Path) -> list[tuple[tuple, dict]]:
"""entities/*.toml → [(ligne Entities, dict stats)]."""
out: list[tuple[tuple, dict]] = []
for path in _toml_files(src_dir / "entities"):
data = _load_toml(path)
entity_id = _require(data, "entity_id", path.name)
row = (
entity_id,
str(data.get("entity_type", "npc")),
str(data.get("name", entity_id)),
str(data.get("description", "")),
1 if data.get("is_active", True) else 0,
)
stats = {str(k): str(v) for k, v in data.get("stats", {}).items()}
out.append((row, stats))
return out
def _parse_rules(src_dir: Path) -> list[tuple]:
"""rules/*.toml → lignes Rules (conditions/actions sérialisées en JSON)."""
rows: list[tuple] = []
for path in _toml_files(src_dir / "rules"):
data = _load_toml(path)
rows.append((
_require(data, "rule_id", path.name),
int(data.get("priority", 0)),
json.dumps(data.get("conditions", {})),
json.dumps(data.get("actions", [])),
str(data.get("target_entity", "*")),
))
return rows
def _parse_locations(src_dir: Path) -> tuple[list[tuple], list[tuple]]:
"""locations/map.toml → (lignes Locations, lignes Location_Connections)."""
path = src_dir / "locations" / "map.toml"
if not path.exists():
return [], []
data = _load_toml(path)
locations: list[tuple] = []
for loc in data.get("locations", []):
locations.append((
_require(loc, "location_id", path.name),
str(loc.get("name", loc["location_id"])),
str(loc.get("scale", "poi")),
loc.get("parent_id"),
str(loc.get("description", "")),
float(loc.get("x", 0)),
float(loc.get("y", 0)),
))
connections: list[tuple] = []
for conn in data.get("connections", []):
connections.append((
_require(conn, "source_id", path.name),
_require(conn, "target_id", path.name),
int(conn.get("distance_km", 0)),
))
return locations, connections
def _parse_lore(src_dir: Path, referenced: set[str]) -> list[tuple]:
"""lore/**/*.md → lignes Lore_Book.
Chaque .md peut porter un frontmatter TOML (`+++ … +++`) avec entry_id,
category, name, keywords ; le corps = content. Les fichiers référencés par
universe.toml (`*_file`) sont exclus.
"""
lore_dir = src_dir / "lore"
if not lore_dir.is_dir():
return []
referenced_paths = {(src_dir / r).resolve() for r in referenced}
rows: list[tuple] = []
for path in sorted(lore_dir.rglob("*.md")):
if path.resolve() in referenced_paths:
continue
# open(newline="") et non read_text(newline=) : ce dernier exige Python 3.13+ (TICKET-049).
with path.open("r", encoding="utf-8", newline="") as fh:
front, body = _split_frontmatter(fh.read())
default_id = path.relative_to(lore_dir).with_suffix("").as_posix().replace("/", "_")
rows.append((
str(front.get("entry_id", default_id)),
str(front.get("category", "")),
str(front.get("name", path.stem.replace("_", " ").title())),
str(front.get("keywords", "")),
body,
))
return rows
def _split_frontmatter(text: str) -> tuple[dict, str]:
"""Sépare un frontmatter TOML optionnel (`+++ … +++`) du corps Markdown.
Préserve le corps **octet pour octet** (pas de splitlines/strip) pour garantir
un round-trip fidèle.
"""
opener = _FRONTMATTER_DELIM + "\n"
if not text.startswith(opener):
return {}, text
rest = text[len(opener):]
closer = "\n" + _FRONTMATTER_DELIM + "\n"
end = rest.find(closer)
if end == -1:
return {}, text
front_text = rest[:end]
body = rest[end + len(closer):]
try:
return tomllib.loads(front_text), body
except tomllib.TOMLDecodeError as exc:
raise CompileError(f"Invalid TOML frontmatter — {exc}") from exc
def _parse_events(src_dir: Path) -> list[tuple]:
"""events/*.toml → lignes Scheduled_Events."""
rows: list[tuple] = []
for path in _toml_files(src_dir / "events"):
data = _load_toml(path)
rows.append((
_require(data, "event_id", path.name),
int(data.get("trigger_minute", 0)),
str(data.get("title", "")),
str(data.get("description", "")),
))
return rows
def _parse_setup(src_dir: Path) -> list[tuple]:
"""setup/questions.toml → lignes Story_Setup."""
path = src_dir / "setup" / "questions.toml"
if not path.exists():
return []
data = _load_toml(path)
rows: list[tuple] = []
for q in data.get("questions", []):
options = q.get("options", [])
rows.append((
_require(q, "setup_id", path.name),
str(q.get("question", "")),
str(q.get("type", "text")),
json.dumps(options) if isinstance(options, list) else str(options),
int(q.get("max_selections", 1)),
int(q.get("priority", 0)),
))
return rows
def _parse_items(src_dir: Path) -> list[tuple]:
"""items/*.toml → lignes Item_Definitions."""
rows: list[tuple] = []
for path in _toml_files(src_dir / "items"):
data = _load_toml(path)
rows.append((
_require(data, "item_id", path.name),
str(data.get("name", data["item_id"])),
str(data.get("description", "")),
str(data.get("category", "misc")),
float(data.get("weight", 0.0)),
str(data.get("rarity", "common")),
))
return rows
# ---------------------------------------------------------------------------
# Population de la base
# ---------------------------------------------------------------------------
def _populate(conn: sqlite3.Connection, parsed: dict[str, Any]) -> None:
"""Insère toutes les données de définition parsées dans une DB fraîche."""
conn.execute("PRAGMA foreign_keys=ON;")
conn.executemany(
"INSERT OR REPLACE INTO Universe_Meta (key, value) VALUES (?, ?);",
list(parsed["meta"].items()),
)
conn.executemany(
"INSERT INTO Stat_Definitions (stat_id, name, description, value_type, parameters) "
"VALUES (?, ?, ?, ?, ?);",
parsed["stat_definitions"],
)
for row, stats in parsed["entities"]:
conn.execute(
"INSERT INTO Entities (entity_id, entity_type, name, description, is_active) "
"VALUES (?, ?, ?, ?, ?);",
row,
)
conn.executemany(
"INSERT INTO Entity_Stats (entity_id, stat_key, stat_value) VALUES (?, ?, ?);",
[(row[0], k, v) for k, v in stats.items()],
)
conn.executemany(
"INSERT INTO Rules (rule_id, priority, conditions, actions, target_entity) "
"VALUES (?, ?, ?, ?, ?);",
parsed["rules"],
)
conn.executemany(
"INSERT INTO Locations (location_id, name, scale, parent_id, description, x, y) "
"VALUES (?, ?, ?, ?, ?, ?, ?);",
parsed["locations"],
)
conn.executemany(
"INSERT INTO Location_Connections (source_id, target_id, distance_km) "
"VALUES (?, ?, ?);",
parsed["connections"],
)
conn.executemany(
"INSERT INTO Lore_Book (entry_id, category, name, keywords, content) "
"VALUES (?, ?, ?, ?, ?);",
parsed["lore"],
)
conn.executemany(
"INSERT INTO Scheduled_Events (event_id, trigger_minute, title, description) "
"VALUES (?, ?, ?, ?);",
parsed["events"],
)
conn.executemany(
"INSERT INTO Story_Setup (setup_id, question, type, options, max_selections, priority) "
"VALUES (?, ?, ?, ?, ?, ?);",
parsed["setup"],
)
conn.executemany(
"INSERT INTO Item_Definitions (item_id, name, description, category, weight, rarity) "
"VALUES (?, ?, ?, ?, ?, ?);",
parsed["items"],
)
conn.commit()
def _parse_tree(src_dir: Path) -> dict[str, Any]:
"""Parse l'arborescence source complète en structures Python prêtes pour la DB."""
meta, referenced = _parse_universe(src_dir)
locations, connections = _parse_locations(src_dir)
return {
"meta": meta,
"stat_definitions": _parse_stat_definitions(src_dir),
"entities": _parse_entities(src_dir),
"rules": _parse_rules(src_dir),
"locations": locations,
"connections": connections,
"lore": _parse_lore(src_dir, referenced),
"events": _parse_events(src_dir),
"setup": _parse_setup(src_dir),
"items": _parse_items(src_dir),
}
# ---------------------------------------------------------------------------
# API publique
# ---------------------------------------------------------------------------
[docs]
def compile_universe(
src_dir: str | Path,
output_db: str | Path | None = None,
force: bool = False,
) -> Path:
"""Compile a source tree into a runtime SQLite database.
Args:
src_dir: Universe source folder (contains universe.toml).
output_db: Path of the `.db` to produce. Defaults to
`<src_dir>/.axiom-cache/universe.db`.
force: Recompile even if the source hash is unchanged.
Returns:
The path of the compiled `.db`.
"""
src_dir = Path(src_dir)
if not src_dir.is_dir():
raise CompileError(f"Source folder not found: {src_dir}")
if output_db is None:
output_db = src_dir / CACHE_DIRNAME / CACHE_DB_NAME
output_db = Path(output_db)
src_hash = hash_directory(src_dir)
hash_file = src_dir / CACHE_DIRNAME / CACHE_HASH_NAME
if not force and output_db.exists() and hash_file.exists():
if hash_file.read_text(encoding="utf-8").strip() == src_hash:
return output_db # cache à jour
parsed = _parse_tree(src_dir)
# Construction atomique : on écrit dans un fichier temporaire puis on remplace.
output_db.parent.mkdir(parents=True, exist_ok=True)
tmp_db = output_db.with_suffix(output_db.suffix + ".tmp")
_remove_db_files(tmp_db)
create_universe_db(str(tmp_db))
conn = sqlite3.connect(str(tmp_db))
try:
_populate(conn, parsed)
# Vide le WAL dans le fichier principal avant la bascule (sinon le .db
# déplacé serait incomplet et les sidecars -wal/-shm seraient orphelins).
conn.execute("PRAGMA wal_checkpoint(TRUNCATE);")
finally:
conn.close()
_remove_db_files(output_db)
tmp_db.replace(output_db)
_remove_db_files(tmp_db, keep_main=True) # nettoie les sidecars -wal/-shm résiduels
hash_file.parent.mkdir(parents=True, exist_ok=True)
hash_file.write_text(src_hash, encoding="utf-8")
return output_db
def _remove_db_files(db: Path, keep_main: bool = False) -> None:
"""Supprime un .db et ses sidecars WAL (-wal/-shm)."""
suffixes = ("-wal", "-shm") if keep_main else ("", "-wal", "-shm")
for suffix in suffixes:
p = Path(str(db) + suffix)
if p.exists():
p.unlink()