"""Disk-backed media blobs shared by the web UI, Telegram, and ChatKit uploads.""" from __future__ import annotations import base64 import json import shutil import uuid from pathlib import Path from typing import Any class LocalMediaStore: def __init__(self, data_dir: str = "/data"): self._root = Path(data_dir) / "uploads" self._root.mkdir(parents=True, exist_ok=True) def save_bytes( self, data: bytes, *, name: str, mime_type: str, file_id: str | None = None, ) -> dict[str, Any]: normalized_name = str(name).strip() or "upload" normalized_mime_type = str(mime_type).strip() or "application/octet-stream" normalized_file_id = str(file_id).strip() if file_id else uuid.uuid4().hex item_dir = self._item_dir(normalized_file_id) item_dir.mkdir(parents=True, exist_ok=True) (item_dir / "blob").write_bytes(data) metadata = { "id": normalized_file_id, "name": normalized_name, "mime_type": normalized_mime_type, "size": len(data), } (item_dir / "meta.json").write_text(json.dumps(metadata, indent=2), encoding="utf-8") return dict(metadata) def get_meta(self, file_id: str) -> dict[str, Any]: meta_path = self._item_dir(file_id) / "meta.json" if not meta_path.exists(): raise KeyError(file_id) try: payload = json.loads(meta_path.read_text(encoding="utf-8")) except json.JSONDecodeError as error: raise KeyError(file_id) from error if not isinstance(payload, dict): raise KeyError(file_id) payload.setdefault("id", str(file_id)) payload.setdefault("name", "upload") payload.setdefault("mime_type", "application/octet-stream") payload.setdefault("size", self._blob_path(file_id).stat().st_size if self._blob_path(file_id).exists() else 0) return payload def read_bytes(self, file_id: str) -> bytes: blob_path = self._blob_path(file_id) if not blob_path.exists(): raise KeyError(file_id) return blob_path.read_bytes() def delete(self, file_id: str) -> None: item_dir = self._item_dir(file_id) if item_dir.exists(): shutil.rmtree(item_dir) def build_data_url(self, file_id: str) -> str: metadata = self.get_meta(file_id) encoded = base64.b64encode(self.read_bytes(file_id)).decode("ascii") return f"data:{metadata['mime_type']};base64,{encoded}" def _item_dir(self, file_id: str) -> Path: normalized_file_id = str(file_id).strip() if not normalized_file_id: raise KeyError(file_id) return self._root / normalized_file_id def _blob_path(self, file_id: str) -> Path: return self._item_dir(file_id) / "blob"