"""User identity store — SQLite-backed user management, credential vault, conversation storage, and event tracking. Provides: - Schema migrations applied idempotently at startup - Fernet encryption for per-user API tokens at rest - User resolution (provider + external_id → internal User) - Credential CRUD with lazy decryption support - Session / message / event persistence - Owner bootstrap migration from env-var credentials """ from __future__ import annotations import logging import sqlite3 import time import uuid from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any from cryptography.fernet import Fernet from config import settings logger = logging.getLogger(__name__) # ── Data classes ───────────────────────────────────────────────── def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() @dataclass class User: id: str display_name: str email: str | None created_at: str updated_at: str is_owner: bool onboarding_complete: bool telegram_approved: bool | None = None # None = pending, True = approved, False = denied is_new: bool = False # transient — not persisted @dataclass class ExternalIdentity: id: int user_id: str provider: str external_id: str metadata_json: str | None created_at: str @dataclass class ServiceCredential: id: int user_id: str service: str encrypted_token: str service_user_id: str | None service_username: str | None created_at: str expires_at: str | None last_used_at: str | None @dataclass class ProvisioningLogEntry: id: int user_id: str service: str action: str detail_json: str | None created_at: str @dataclass class Session: id: str user_id: str surface: str topic_id: str | None title: str | None created_at: str last_active_at: str @dataclass class Message: id: str session_id: str role: str content: str created_at: str @dataclass class Event: id: int user_id: str source: str event_type: str summary: str detail_json: str | None created_at: str consumed_at: str | None # ── Fernet helpers (T012) ──────────────────────────────────────── def _get_fernet() -> Fernet: key = settings.CREDENTIAL_VAULT_KEY if not key: raise RuntimeError("CREDENTIAL_VAULT_KEY is not set — cannot encrypt/decrypt credentials") return Fernet(key.encode() if isinstance(key, str) else key) def encrypt(plaintext: str) -> str: """Encrypt *plaintext* and return a base64-encoded TEXT string.""" f = _get_fernet() return f.encrypt(plaintext.encode()).decode() def decrypt(ciphertext: str) -> str: """Decrypt a Fernet ciphertext (base64 TEXT) back to the original string.""" f = _get_fernet() return f.decrypt(ciphertext.encode()).decode() # ── Schema migrations (T011) ──────────────────────────────────── _MIGRATIONS: list[tuple[int, str, str]] = [ ( 1, "initial_schema", """ CREATE TABLE IF NOT EXISTS users ( id TEXT PRIMARY KEY, display_name TEXT NOT NULL, email TEXT UNIQUE, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, is_owner INTEGER NOT NULL DEFAULT 0, onboarding_complete INTEGER NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS external_identities ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL REFERENCES users(id), provider TEXT NOT NULL, external_id TEXT NOT NULL, metadata_json TEXT, created_at TEXT NOT NULL, UNIQUE(provider, external_id) ); CREATE INDEX IF NOT EXISTS idx_external_identities_lookup ON external_identities(provider, external_id); CREATE TABLE IF NOT EXISTS service_credentials ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL REFERENCES users(id), service TEXT NOT NULL, encrypted_token TEXT NOT NULL, service_user_id TEXT, service_username TEXT, created_at TEXT NOT NULL, expires_at TEXT, last_used_at TEXT, UNIQUE(user_id, service) ); CREATE INDEX IF NOT EXISTS idx_service_credentials_user_id ON service_credentials(user_id); CREATE TABLE IF NOT EXISTS provisioning_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL REFERENCES users(id), service TEXT NOT NULL, action TEXT NOT NULL, detail_json TEXT, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS sessions ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL REFERENCES users(id), surface TEXT NOT NULL, topic_id TEXT, title TEXT, created_at TEXT NOT NULL, last_active_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id); CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL REFERENCES sessions(id), role TEXT NOT NULL, content TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_messages_session_id ON messages(session_id); CREATE INDEX IF NOT EXISTS idx_messages_created_at ON messages(created_at); CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL REFERENCES users(id), source TEXT NOT NULL, event_type TEXT NOT NULL, summary TEXT NOT NULL, detail_json TEXT, created_at TEXT NOT NULL, consumed_at TEXT ); CREATE INDEX IF NOT EXISTS idx_events_user_created ON events(user_id, created_at); """, ), ( 2, "add_telegram_approved", """ ALTER TABLE users ADD COLUMN telegram_approved INTEGER; """, ), ] def _apply_migrations(conn: sqlite3.Connection) -> None: """Ensure all migrations are applied idempotently.""" conn.execute( """CREATE TABLE IF NOT EXISTS schema_migrations ( version INTEGER PRIMARY KEY, name TEXT NOT NULL, applied_at TEXT NOT NULL )""" ) applied = {row[0] for row in conn.execute("SELECT version FROM schema_migrations")} for version, name, ddl in _MIGRATIONS: if version in applied: continue logger.info("Applying migration %d: %s", version, name) conn.executescript(ddl) conn.execute( "INSERT INTO schema_migrations (version, name, applied_at) VALUES (?, ?, ?)", (version, name, _now_iso()), ) conn.commit() logger.info("Schema migrations up to date (latest: %d)", max(v for v, _, _ in _MIGRATIONS)) # ── SQLite connection (T010) ───────────────────────────────────── _MAX_WRITE_RETRIES = 3 _RETRY_BASE_DELAY = 0.05 # seconds def _connect(db_path: str | Path) -> sqlite3.Connection: """Open a SQLite connection with WAL mode and foreign keys enabled.""" conn = sqlite3.connect(str(db_path), timeout=10) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") conn.row_factory = sqlite3.Row return conn def _write_with_retry(conn: sqlite3.Connection, fn: Any) -> Any: """Execute *fn(conn)* with exponential-backoff retry on SQLITE_BUSY.""" for attempt in range(_MAX_WRITE_RETRIES): try: return fn(conn) except sqlite3.OperationalError as exc: if "database is locked" not in str(exc) or attempt == _MAX_WRITE_RETRIES - 1: raise delay = _RETRY_BASE_DELAY * (2**attempt) logger.warning("SQLITE_BUSY — retrying in %.2fs (attempt %d)", delay, attempt + 1) time.sleep(delay) return None # unreachable # ── UserStore ──────────────────────────────────────────────────── class UserStore: """Central access layer for user identity, credentials, sessions, and events.""" def __init__(self, db_path: str | Path | None = None) -> None: if db_path is None: db_path = Path(settings.DATA_DIR) / "users.db" self._db_path = Path(db_path) self._db_path.parent.mkdir(parents=True, exist_ok=True) self._conn = _connect(self._db_path) _apply_migrations(self._conn) @property def conn(self) -> sqlite3.Connection: return self._conn def close(self) -> None: self._conn.close() # ── User resolution (T013) ─────────────────────────────────── def resolve_or_create_user( self, provider: str, external_id: str, display_name: str, email: str | None = None, ) -> User: """Resolve (provider, external_id) → User. Creates if not found.""" def _do(conn: sqlite3.Connection) -> User: row = conn.execute( """SELECT u.id, u.display_name, u.email, u.created_at, u.updated_at, u.is_owner, u.onboarding_complete, u.telegram_approved FROM external_identities ei JOIN users u ON u.id = ei.user_id WHERE ei.provider = ? AND ei.external_id = ?""", (provider, external_id), ).fetchone() if row: _raw_approved = row["telegram_approved"] return User( id=row["id"], display_name=row["display_name"], email=row["email"], created_at=row["created_at"], updated_at=row["updated_at"], is_owner=bool(row["is_owner"]), onboarding_complete=bool(row["onboarding_complete"]), telegram_approved=None if _raw_approved is None else bool(_raw_approved), is_new=False, ) # Create new user now = _now_iso() user_id = str(uuid.uuid4()) conn.execute( """INSERT INTO users (id, display_name, email, created_at, updated_at, is_owner, onboarding_complete) VALUES (?, ?, ?, ?, ?, 0, 0)""", (user_id, display_name, email, now, now), ) conn.execute( """INSERT INTO external_identities (user_id, provider, external_id, created_at) VALUES (?, ?, ?, ?)""", (user_id, provider, external_id, now), ) conn.commit() return User( id=user_id, display_name=display_name, email=email, created_at=now, updated_at=now, is_owner=False, onboarding_complete=False, is_new=True, ) return _write_with_retry(self._conn, _do) def get_user(self, user_id: str) -> User | None: row = self._conn.execute( """SELECT id, display_name, email, created_at, updated_at, is_owner, onboarding_complete, telegram_approved FROM users WHERE id = ?""", (user_id,), ).fetchone() if not row: return None _raw_approved = row["telegram_approved"] return User( id=row["id"], display_name=row["display_name"], email=row["email"], created_at=row["created_at"], updated_at=row["updated_at"], is_owner=bool(row["is_owner"]), onboarding_complete=bool(row["onboarding_complete"]), telegram_approved=None if _raw_approved is None else bool(_raw_approved), ) def set_onboarding_complete(self, user_id: str) -> None: def _do(conn: sqlite3.Connection) -> None: conn.execute( "UPDATE users SET onboarding_complete = 1, updated_at = ? WHERE id = ?", (_now_iso(), user_id), ) conn.commit() _write_with_retry(self._conn, _do) def set_telegram_approval(self, user_id: str, approved: bool) -> None: """Set telegram_approved to 1 (approved) or 0 (denied).""" def _do(conn: sqlite3.Connection) -> None: conn.execute( "UPDATE users SET telegram_approved = ?, updated_at = ? WHERE id = ?", (int(approved), _now_iso(), user_id), ) conn.commit() _write_with_retry(self._conn, _do) # ── Credential CRUD (T014) ─────────────────────────────────── def get_credentials(self, user_id: str) -> dict[str, ServiceCredential]: """Return all credentials for a user keyed by service name.""" rows = self._conn.execute( """SELECT id, user_id, service, encrypted_token, service_user_id, service_username, created_at, expires_at, last_used_at FROM service_credentials WHERE user_id = ?""", (user_id,), ).fetchall() return { row["service"]: ServiceCredential( id=row["id"], user_id=row["user_id"], service=row["service"], encrypted_token=row["encrypted_token"], service_user_id=row["service_user_id"], service_username=row["service_username"], created_at=row["created_at"], expires_at=row["expires_at"], last_used_at=row["last_used_at"], ) for row in rows } def get_credential(self, user_id: str, service: str) -> ServiceCredential | None: row = self._conn.execute( """SELECT id, user_id, service, encrypted_token, service_user_id, service_username, created_at, expires_at, last_used_at FROM service_credentials WHERE user_id = ? AND service = ?""", (user_id, service), ).fetchone() if not row: return None return ServiceCredential( id=row["id"], user_id=row["user_id"], service=row["service"], encrypted_token=row["encrypted_token"], service_user_id=row["service_user_id"], service_username=row["service_username"], created_at=row["created_at"], expires_at=row["expires_at"], last_used_at=row["last_used_at"], ) def store_credential( self, user_id: str, service: str, token: str, service_user_id: str | None = None, service_username: str | None = None, expires_at: str | None = None, ) -> None: """Store (or replace) a credential. Encrypts the token before writing.""" enc_token = encrypt(token) now = _now_iso() def _do(conn: sqlite3.Connection) -> None: conn.execute( """INSERT INTO service_credentials (user_id, service, encrypted_token, service_user_id, service_username, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(user_id, service) DO UPDATE SET encrypted_token = excluded.encrypted_token, service_user_id = excluded.service_user_id, service_username = excluded.service_username, expires_at = excluded.expires_at, last_used_at = NULL""", (user_id, service, enc_token, service_user_id, service_username, now, expires_at), ) conn.commit() _write_with_retry(self._conn, _do) def delete_credential(self, user_id: str, service: str) -> None: def _do(conn: sqlite3.Connection) -> None: conn.execute( "DELETE FROM service_credentials WHERE user_id = ? AND service = ?", (user_id, service), ) conn.commit() _write_with_retry(self._conn, _do) def touch_credential(self, user_id: str, service: str) -> None: """Update last_used_at for a credential.""" def _do(conn: sqlite3.Connection) -> None: conn.execute( "UPDATE service_credentials SET last_used_at = ? WHERE user_id = ? AND service = ?", (_now_iso(), user_id, service), ) conn.commit() _write_with_retry(self._conn, _do) # ── Provisioning log ───────────────────────────────────────── def log_provisioning( self, user_id: str, service: str, action: str, detail_json: str | None = None, ) -> None: def _do(conn: sqlite3.Connection) -> None: conn.execute( """INSERT INTO provisioning_log (user_id, service, action, detail_json, created_at) VALUES (?, ?, ?, ?, ?)""", (user_id, service, action, detail_json, _now_iso()), ) conn.commit() _write_with_retry(self._conn, _do) # ── Session + message CRUD (T023) ──────────────────────────── def get_or_create_session( self, user_id: str, surface: str, topic_id: str | None = None, ) -> Session: """Resolve or create a session for the given user/surface/topic.""" def _do(conn: sqlite3.Connection) -> Session: if topic_id is not None: row = conn.execute( """SELECT id, user_id, surface, topic_id, title, created_at, last_active_at FROM sessions WHERE user_id = ? AND surface = ? AND topic_id = ?""", (user_id, surface, topic_id), ).fetchone() else: row = conn.execute( """SELECT id, user_id, surface, topic_id, title, created_at, last_active_at FROM sessions WHERE user_id = ? AND surface = ? AND topic_id IS NULL""", (user_id, surface), ).fetchone() if row: now = _now_iso() conn.execute( "UPDATE sessions SET last_active_at = ? WHERE id = ?", (now, row["id"]), ) conn.commit() return Session( id=row["id"], user_id=row["user_id"], surface=row["surface"], topic_id=row["topic_id"], title=row["title"], created_at=row["created_at"], last_active_at=now, ) now = _now_iso() session_id = str(uuid.uuid4()) conn.execute( """INSERT INTO sessions (id, user_id, surface, topic_id, title, created_at, last_active_at) VALUES (?, ?, ?, ?, NULL, ?, ?)""", (session_id, user_id, surface, topic_id, now, now), ) conn.commit() return Session( id=session_id, user_id=user_id, surface=surface, topic_id=topic_id, title=None, created_at=now, last_active_at=now, ) return _write_with_retry(self._conn, _do) def log_message(self, session_id: str, role: str, content: str) -> None: def _do(conn: sqlite3.Connection) -> None: conn.execute( """INSERT INTO messages (id, session_id, role, content, created_at) VALUES (?, ?, ?, ?, ?)""", (str(uuid.uuid4()), session_id, role, content, _now_iso()), ) conn.commit() _write_with_retry(self._conn, _do) def get_session_messages(self, session_id: str) -> list[Message]: rows = self._conn.execute( """SELECT id, session_id, role, content, created_at FROM messages WHERE session_id = ? ORDER BY created_at""", (session_id,), ).fetchall() return [ Message( id=row["id"], session_id=row["session_id"], role=row["role"], content=row["content"], created_at=row["created_at"], ) for row in rows ] # ── Event CRUD (T025) ──────────────────────────────────────── def store_event( self, user_id: str, source: str, event_type: str, summary: str, detail_json: str | None = None, ) -> None: def _do(conn: sqlite3.Connection) -> None: conn.execute( """INSERT INTO events (user_id, source, event_type, summary, detail_json, created_at) VALUES (?, ?, ?, ?, ?, ?)""", (user_id, source, event_type, summary, detail_json, _now_iso()), ) conn.commit() _write_with_retry(self._conn, _do) def get_recent_events(self, user_id: str, window_hours: int = 24) -> list[Event]: cutoff = datetime.now(timezone.utc).isoformat() # Use simple string comparison — ISO8601 sorts lexicographically rows = self._conn.execute( """SELECT id, user_id, source, event_type, summary, detail_json, created_at, consumed_at FROM events WHERE user_id = ? AND created_at >= datetime(?, '-' || ? || ' hours') ORDER BY created_at DESC""", (user_id, cutoff, window_hours), ).fetchall() return [ Event( id=row["id"], user_id=row["user_id"], source=row["source"], event_type=row["event_type"], summary=row["summary"], detail_json=row["detail_json"], created_at=row["created_at"], consumed_at=row["consumed_at"], ) for row in rows ] def mark_events_consumed(self, event_ids: list[int]) -> None: if not event_ids: return def _do(conn: sqlite3.Connection) -> None: placeholders = ",".join("?" for _ in event_ids) conn.execute( f"UPDATE events SET consumed_at = ? WHERE id IN ({placeholders})", # noqa: S608 [_now_iso(), *event_ids], ) conn.commit() _write_with_retry(self._conn, _do) # ── Owner bootstrap (T016) ─────────────────────────────────── def bootstrap_owner(self) -> User: """Idempotent: ensure the owner user exists, migrate env-var credentials.""" def _do(conn: sqlite3.Connection) -> User: row = conn.execute( """SELECT id, display_name, email, created_at, updated_at, is_owner, onboarding_complete FROM users WHERE is_owner = 1""", ).fetchone() if row: owner = User( id=row["id"], display_name=row["display_name"], email=row["email"], created_at=row["created_at"], updated_at=row["updated_at"], is_owner=True, onboarding_complete=True, ) # Still migrate any new env-var credentials that weren't there before self._migrate_env_credentials(owner.id) return owner now = _now_iso() owner_id = str(uuid.uuid4()) conn.execute( """INSERT INTO users (id, display_name, email, created_at, updated_at, is_owner, onboarding_complete) VALUES (?, ?, NULL, ?, ?, 1, 1)""", (owner_id, "Owner", now, now), ) # Create "web" external identity for owner conn.execute( """INSERT INTO external_identities (user_id, provider, external_id, created_at) VALUES (?, 'web', 'owner', ?)""", (owner_id, now), ) conn.commit() logger.info("Created owner user %s", owner_id) self._migrate_env_credentials(owner_id) return User( id=owner_id, display_name="Owner", email=None, created_at=now, updated_at=now, is_owner=True, onboarding_complete=True, is_new=True, ) return _write_with_retry(self._conn, _do) def _migrate_env_credentials(self, owner_id: str) -> None: """Migrate static env-var API keys into the owner's credential vault.""" migrations = [ ("vikunja", settings.VIKUNJA_API_KEY, settings.VIKUNJA_API_URL), ("karakeep", settings.KARAKEEP_API_KEY, settings.KARAKEEP_API_URL), ] for service, api_key, _url in migrations: if not api_key: continue existing = self.get_credential(owner_id, service) if existing: continue self.store_credential(owner_id, service, api_key) self.log_provisioning(owner_id, service, "env_migrated", '{"source": "env_var"}') logger.info("Migrated %s env-var credential for owner %s", service, owner_id) # ── Module-level singleton ─────────────────────────────────────── _store: UserStore | None = None def get_store() -> UserStore: """Return the module-level UserStore singleton, creating it if needed.""" global _store # noqa: PLW0603 if _store is None: _store = UserStore() return _store def init_store(db_path: str | Path | None = None) -> UserStore: """Initialise the UserStore singleton explicitly (e.g. in FastAPI lifespan).""" global _store # noqa: PLW0603 _store = UserStore(db_path) return _store