Source code for axiom.backends.gemini

"""
llm_engine/gemini_client.py

LLM backend client for Google Gemini models (remote / cloud fallback).

Uses the google-genai SDK (google.genai).  The client translates Axiom AI's
internal list[LLMMessage] format to Gemini's Content objects: the first
system-role message becomes the model's system_instruction, and the
remaining turns become the contents list.

Typical usage::

    from axiom.backends.gemini import GeminiClient

    llm = GeminiClient(api_key="YOUR_KEY", model_name="gemini-2.0-flash")
    if llm.is_available():
        response = llm.complete(messages)
        print(response.narrative_text)
"""

import re
import threading
import time
from typing import Callable, Iterator, TypeVar

from google import genai
from google.genai import types as genai_types

from axiom.backends.base import (
    GenerationCancelled,
    LLMBackend,
    LLMConnectionError,
    LLMMessage,
    LLMParseError,
    LLMResponse,
)
from axiom.backends.transport import IPv4FirstTransport
from axiom.logger import logger

_DEFAULT_MODEL: str = "gemini-2.0-flash"

# --- Résilience aux quotas (TICKET-031) -------------------------------------
# Le free tier Gemini est limité par requêtes/minute ET PAR MODÈLE. Trois
# parades, toutes côté backend pour couvrir tous les appels (Populate,
# canonisation, narration, Timekeeper, Chronicler) sans toucher aux appelants :
# 1. retry sur 429 en respectant le délai renvoyé par l'API ;
# 2. ralentisseur optionnel (requêtes/minute, partagé entre threads) ;
# 3. modèle de secours quand le quota du modèle principal persiste.

_MAX_QUOTA_RETRIES: int = 3      # tentatives supplémentaires par modèle
_MAX_RETRY_WAIT_S: float = 90.0  # plafond d'attente par retry

T = TypeVar("T")


def _is_quota_error(exc: Exception) -> bool:
    text = str(exc)
    return "RESOURCE_EXHAUSTED" in text or getattr(exc, "code", None) == 429 or " 429 " in f" {text} "


def _is_hard_quota_error(exc: Exception) -> bool:
    """A 429 whose quota is structurally zero: the model is simply not in this
    API key's free tier (the violation reports `limit: 0` / `quotaValue: "0"`).

    Waiting can NEVER make it succeed — yet the API still sends a misleading
    `retryDelay` — so the caller must skip the retry/backoff for this model and
    fail fast instead of burning 1-2 min of countdown per turn (TICKET-050).
    """
    if not _is_quota_error(exc):
        return False
    text = str(exc)
    # The value must be exactly zero: "limit: 100" / "quotaValue: 5" must NOT match.
    return bool(
        re.search(r"limit['\"]?\s*[:=]\s*['\"]?0(?!\d)", text)
        or re.search(r"quotaValue['\"]?\s*[:=]\s*['\"]?0(?!\d)", text)
    )


def _parse_retry_delay(exc_text: str) -> float | None:
    """Extrait le délai suggéré par l'API (« Please retry in 32.4s » / retryDelay)."""
    m = re.search(r"retry in ([0-9.]+)\s*s", exc_text, re.IGNORECASE)
    if not m:
        m = re.search(r"retryDelay'?\"?\s*[:=]\s*'?\"?([0-9.]+)s", exc_text)
    try:
        return float(m.group(1)) if m else None
    except ValueError:
        return None


class _RateLimiter:
    """Espacement minimal entre requêtes, par modèle, partagé entre threads."""

    def __init__(self) -> None:
        self._lock = threading.Lock()
        self._next_slot: dict[str, float] = {}

    def reserve_turn(self, key: str, min_interval: float) -> float:
        """Reserve the next slot and return the delay to wait (0 = go).

        Waiting is left to the caller, so it can stay interruptible
        (cancellation, TICKET-033).
        """
        if min_interval <= 0:
            return 0.0
        with self._lock:
            now = time.monotonic()
            slot = max(self._next_slot.get(key, now), now)
            self._next_slot[key] = slot + min_interval
        return max(0.0, slot - time.monotonic())

    def wait_turn(self, key: str, min_interval: float) -> None:
        delay = self.reserve_turn(key, min_interval)
        if delay > 0:
            time.sleep(delay)


_RATE_LIMITER = _RateLimiter()

# The Gemini API rejects requests with more than 5 stop sequences
# (GenerateContentRequest.generation_config.stop_sequences). The engine builds
# a backend-agnostic list that can exceed this, so we clamp it here.
_GEMINI_MAX_STOP_SEQUENCES: int = 5


def _clamp_stop_sequences(stop_sequences: list[str] | None) -> list[str] | None:
    """Truncate stop sequences to the Gemini API limit (max 5)."""
    if stop_sequences and len(stop_sequences) > _GEMINI_MAX_STOP_SEQUENCES:
        return stop_sequences[:_GEMINI_MAX_STOP_SEQUENCES]
    return stop_sequences


[docs] class GeminiClient(LLMBackend): """LLM backend targeting Google Gemini via the google-genai SDK. Args: api_key: Google Generative AI API key. model_name: Gemini model identifier. Defaults to "gemini-2.0-flash". requests_per_minute: Soft rate limiter (TICKET-031). 0 = unlimited. fallback_model: Model tried when the primary model's quota is still exhausted after the retries (quotas are per-model). "" = none. """ def __init__( self, api_key: str, model_name: str = _DEFAULT_MODEL, requests_per_minute: int = 0, fallback_model: str = "", ) -> None: self._api_key = api_key self._model_name = model_name self._min_interval = 60.0 / requests_per_minute if requests_per_minute > 0 else 0.0 self._fallback_model = fallback_model.strip() # Custom transport (IPv4 first + connect timeout): the SDK ships with # no timeout at all and overrides client-level httpx timeouts with an # explicit None per request — see axiom/backends/transport.py. self._client = genai.Client( api_key=api_key, http_options=genai_types.HttpOptions( client_args={"transport": IPv4FirstTransport()}, ), ) # ------------------------------------------------------------------ # Résilience quota (TICKET-031) # ------------------------------------------------------------------ def _candidate_models(self) -> list[str]: models = [self._model_name] if self._fallback_model and self._fallback_model != self._model_name: models.append(self._fallback_model) return models def _interruptible_wait(self, delay: float, label: str | None = None) -> None: """Attend `delay` secondes par tranches : annulable (`cancel_event`) et, si `label` est fourni, compte à rebours émis via `on_status` (TICKET-033).""" deadline = time.monotonic() + delay while True: remaining = deadline - time.monotonic() if remaining <= 0: return if label is not None: self._notify(f"{label} — retry in {max(1, round(remaining))}s") slice_s = min(remaining, 5.0) if self.cancel_event is not None: if self.cancel_event.wait(slice_s): raise GenerationCancelled("Generation cancelled by user.") else: time.sleep(slice_s) def _call_with_quota_retry(self, request: Callable[[str], T]) -> T: """Exécute `request(model)` avec pacing, retries 429 and fallback model. Pour chaque modèle candidat : jusqu'à `_MAX_QUOTA_RETRIES` reprises en respectant le délai suggéré par l'API (backoff sinon). Quota toujours épuisé → modèle de secours. Toute autre erreur part immédiatement en LLMConnectionError (comportement historique). """ last_exc: Exception | None = None for model in self._candidate_models(): for attempt in range(_MAX_QUOTA_RETRIES + 1): self._check_cancelled() # Pacing : silencieux mais interruptible (le délai peut être long # avec un faible req/min). slot_delay = _RATE_LIMITER.reserve_turn(model, self._min_interval) if slot_delay > 0: self._interruptible_wait(slot_delay) try: return request(model) except GenerationCancelled: raise except Exception as exc: # noqa: BLE001 — trié juste dessous if not _is_quota_error(exc): raise LLMConnectionError( f"Gemini API error ({type(exc).__name__}): {exc}" ) from exc last_exc = exc # TICKET-050 : quota structurellement à zéro (modèle hors # free tier) → attendre ne servira jamais. On saute les # retries pour ce modèle et on passe direct au modèle de # secours (qui, lui, peut être dans le tier gratuit). if _is_hard_quota_error(exc): logger.warning( "Gemini quota à 0 pour '%s' (modèle hors free tier) " "— pas de retry, on tente le modèle de secours.", model) break if attempt < _MAX_QUOTA_RETRIES: delay = _parse_retry_delay(str(exc)) or (5.0 * (2 ** attempt)) delay = min(delay + 0.5, _MAX_RETRY_WAIT_S) logger.warning( "Gemini quota épuisé (%s) — nouvel essai dans %.1fs " "(tentative %d/%d)", model, delay, attempt + 1, _MAX_QUOTA_RETRIES) # TICKET-033 : compte à rebours visible + annulable. self._interruptible_wait( delay, label=(f"Quota exhausted ({model}) " f"— attempt {attempt + 1}/{_MAX_QUOTA_RETRIES}"), ) if model != self._candidate_models()[-1]: logger.warning( "Gemini quota toujours épuisé pour '%s' — bascule sur le modèle " "de secours '%s'", model, self._fallback_model) self._notify(f"Quota still exhausted on '{model}' " f"— switching to fallback model '{self._fallback_model}'") # TICKET-050 : message actionnable quand la cause est un quota à 0 # (modèle hors free tier) plutôt que le générique « épuisé après retries ». if last_exc is not None and _is_hard_quota_error(last_exc): raise LLMConnectionError( "Gemini quota is 0 for this model — it is not included in your " "API key's free tier. Enable billing on the key or pick a model " "that is in the free tier (e.g. gemini-2.0-flash). " f"Original error: {last_exc}" ) from last_exc raise LLMConnectionError( f"Gemini API error (quota exhausted after retries" f"{' and fallback model' if self._fallback_model else ''}): {last_exc}" ) from last_exc # ------------------------------------------------------------------ # LLMBackend interface # ------------------------------------------------------------------
[docs] def is_available(self) -> bool: """Check whether the Gemini API is reachable with the configured key. Attempts to list available models. Returns True on success, False on any exception. Never raises. Returns: True if the API responds without error, False otherwise. """ try: # A lightweight list call — verifies connectivity and auth list(self._client.models.list()) return True except Exception: return False
[docs] def list_models(self) -> list[str]: """Return the generation-capable model names, without the "models/" prefix. Empty list on any error — best-effort, used by the settings dialog's model picker (TICKET-062). """ try: names: list[str] = [] for m in self._client.models.list(): actions = (getattr(m, "supported_actions", None) or getattr(m, "supported_generation_methods", None) or []) if actions and "generateContent" not in actions: continue name = (getattr(m, "name", "") or "").removeprefix("models/") if name: names.append(name) return names except Exception: return []
[docs] def complete( self, messages: list[LLMMessage], stream: bool = False, temperature: float = 0.7, top_p: float = 1.0, response_format: str | None = None, stop_sequences: list[str] | None = None, max_tokens: int | None = None, ) -> LLMResponse: """Send messages to Gemini and return a parsed LLMResponse. Translates list[LLMMessage] into Gemini format: - The first message with role="system" becomes system_instruction. - Remaining messages map user/assistant → user/model roles. Args: messages: Conversation turns (system, user, assistant). stream: Ignored; use stream_tokens() for token streaming. temperature: Sampling temperature (0.0 to 1.0). top_p: Nucleus sampling parameter (0.0 to 1.0). response_format: Currently unused for Gemini. stop_sequences: Custom strings to trigger generation stop. max_tokens: Optional limit on the number of tokens to generate. Returns: LLMResponse with narrative_text, optional tool_call, finish_reason. Raises: LLMConnectionError: On network failure or API error. LLMParseError: On unexpected response structure or bad tool-call JSON. """ system_instruction, contents = self._translate_messages(messages) config = genai_types.GenerateContentConfig( system_instruction=system_instruction, temperature=temperature, top_p=top_p, max_output_tokens=max_tokens if max_tokens else 1024, stop_sequences=_clamp_stop_sequences(stop_sequences), ) # TICKET-031 : pacing + retry 429 (délai suggéré par l'API) + fallback. response = self._call_with_quota_retry( lambda model: self._client.models.generate_content( model=model, contents=contents, config=config, ) ) try: raw_content: str = response.text except (AttributeError, ValueError) as exc: raise LLMParseError( f"Cannot extract text from Gemini response: {exc}" ) from exc if raw_content is None: raise LLMParseError("Gemini response returned None text.") finish_reason = self._extract_finish_reason(response) narrative, tool_call = self.parse_tool_call(raw_content) return LLMResponse( narrative_text=narrative, tool_call=tool_call, finish_reason=finish_reason, )
[docs] def generate_image_bytes( self, prompt: str, aspect_ratio: str | None = None, ) -> bytes | None: """Generate an image from a text prompt and return the raw bytes. Used by the "gemini" image backend (axiom/image_generator.py) with an image-capable model (e.g. "gemini-2.5-flash-image"). Goes through the same quota-resilience path as text calls (TICKET-031 pacing/429 retry, TICKET-033 status/cancellation hooks). Args: prompt: Visual prompt describing the image. aspect_ratio: Optional aspect ratio supported by the API ("1:1", "16:9", ...). Omitted if the installed SDK does not expose ImageConfig. Returns: The image bytes (PNG/JPEG as returned by the API), or None if the response contains no image part. Raises: LLMConnectionError: On network failure, API error or exhausted quota. GenerationCancelled: If cancel_event is set during a retry wait. """ config_kwargs: dict = {"response_modalities": ["TEXT", "IMAGE"]} if aspect_ratio and hasattr(genai_types, "ImageConfig"): config_kwargs["image_config"] = genai_types.ImageConfig( aspect_ratio=aspect_ratio ) config = genai_types.GenerateContentConfig(**config_kwargs) response = self._call_with_quota_retry( lambda model: self._client.models.generate_content( model=model, contents=prompt, config=config, ) ) for candidate in getattr(response, "candidates", None) or []: content = getattr(candidate, "content", None) for part in getattr(content, "parts", None) or []: inline = getattr(part, "inline_data", None) data = getattr(inline, "data", None) if inline is not None else None if data: # The SDK normally returns bytes; tolerate base64 strings. if isinstance(data, str): import base64 return base64.b64decode(data) return data return None
[docs] def stream_tokens( self, messages: list[LLMMessage], temperature: float = 0.7, top_p: float = 1.0, response_format: str | None = None, stop_sequences: list[str] | None = None, max_tokens: int | None = None, ) -> Iterator[str]: """Yield tokens from a streaming Gemini response. Args: messages: Conversation turns (system, user, assistant). temperature: Sampling temperature (0.0 to 1.0). top_p: Nucleus sampling parameter (0.0 to 1.0). response_format: Currently unused for Gemini. stop_sequences: Custom strings to trigger generation stop. max_tokens: Optional limit on the number of tokens to generate. Yields: Individual token strings in arrival order. Raises: LLMConnectionError: On network failure. """ system_instruction, contents = self._translate_messages(messages) config = genai_types.GenerateContentConfig( system_instruction=system_instruction, temperature=temperature, top_p=top_p, max_output_tokens=max_tokens if max_tokens else 1024, stop_sequences=_clamp_stop_sequences(stop_sequences), ) # TICKET-031 : le 429 d'un stream surgit à l'établissement (première # itération) — on force le premier chunk DANS la zone de retry, puis # on streame le reste normalement. def _open_stream(model: str): stream = iter(self._client.models.generate_content_stream( model=model, contents=contents, config=config, )) first = next(stream, None) return first, stream try: first_chunk, stream = self._call_with_quota_retry(_open_stream) except LLMConnectionError: raise try: chunks = [first_chunk] if first_chunk is not None else [] for chunk in chunks: try: text = chunk.text if text: yield text except (AttributeError, ValueError): pass for chunk in stream: try: text = chunk.text if text: yield text except (AttributeError, ValueError): # Empty or safety-blocked chunk — skip silently continue except LLMConnectionError: raise except Exception as exc: raise LLMConnectionError( f"Gemini stream error ({type(exc).__name__}): {exc}" ) from exc
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @staticmethod def _translate_messages( messages: list[LLMMessage], ) -> tuple[str | None, list[genai_types.ContentDict]]: """Translate list[LLMMessage] into (system_instruction, contents). The first message with role='system' is extracted as system_instruction. Remaining messages are mapped: user→user, assistant→model. Additional system messages after the first are folded into the user turn that follows them as a prefixed block. Args: messages: Axiom AI internal message list. Returns: Tuple of (system_instruction_str_or_None, gemini_contents_list). """ system_instruction: str | None = None contents: list[dict] = [] pending_system_injections: list[str] = [] for msg in messages: role: str = msg["role"] content: str = msg["content"] if role == "system": if system_instruction is None: system_instruction = content else: pending_system_injections.append(content) elif role == "user": if pending_system_injections: injection = "\n".join( f"[SYSTEM: {s}]" for s in pending_system_injections ) content = f"{injection}\n\n{content}" pending_system_injections = [] contents.append({"role": "user", "parts": [{"text": content}]}) elif role == "assistant": contents.append({"role": "model", "parts": [{"text": content}]}) return system_instruction, contents @staticmethod def _extract_finish_reason(response: object) -> str: """Extract a normalised finish_reason string from a Gemini response. Args: response: The GenerateContentResponse object. Returns: "stop", "length", or "error". """ try: candidates = response.candidates # type: ignore[attr-defined] if not candidates: return "error" finish = candidates[0].finish_reason # FinishReason: STOP=1, MAX_TOKENS=2, others treated as stop finish_str = str(finish).upper() if finish else "" if "MAX_TOKENS" in finish_str or finish_str == "2": return "length" return "stop" except Exception: return "stop"