"""CodeAnywhere — FastAPI entrypoint serving Vercel AI SDK frontend, Telegram webhook, and web UI.""" import base64 import binascii import json import logging import time import uuid from pathlib import Path from typing import AsyncIterator from copilot.generated.session_events import SessionEventType from fastapi import Depends, FastAPI, HTTPException, Request, Response from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from openai import AsyncOpenAI from telegram import Update import tools as _tools_init # noqa: F401 — registers tool sets with the registry from config import settings from copilot_runtime import ( copilot, format_prompt_with_history, run_session, stream_session, ) from instance import BASE_CONTEXT, FASTAPI_TITLE, SKILLS_CONTEXT from learning import extract_learnings_from_turn, format_learnings_for_prompt from llm_costs import extract_usage_and_cost def _get_advisor_usage(thread_id: str | None) -> dict | None: """Fetch advisor sidecar usage for *thread_id*, if the module is loaded.""" if not thread_id: return None try: from tools.advisor import get_advisor_usage return get_advisor_usage(thread_id) except ImportError: return None from local_media_store import LocalMediaStore from model_selection import ( ModelSelection, ModelSelectionError, build_curated_model_options, build_provider_config, build_provider_options, default_selection, format_known_models, format_selection, resolve_selection, ) from telegram_bot import build_telegram_app, webhook_secret from tool_pipeline import ( active_toolset_names, build_capability_fragment, build_integration_tools, build_onboarding_fragment, ) from tool_registry import registry as tool_registry from user_store import init_store as _init_user_store from ux import ( append_elapsed_status, extract_final_text, format_session_error, stream_reasoning_event, stream_status_updates, stream_trace_event, ) from web_fallback_store import WebFallbackStore logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) MAGIC_RENAME_MODEL = "gpt-5.4-nano" MAGIC_RENAME_MAX_MESSAGES = 40 MAGIC_RENAME_MAX_CHARS = 12000 _magic_rename_client: AsyncOpenAI | None = None app = FastAPI(title=FASTAPI_TITLE, docs_url=None, redoc_url=None) # ── Data stores ────────────────────────────────────────────────────── media_store = LocalMediaStore(data_dir=settings.DATA_DIR) web_store = WebFallbackStore(data_dir=settings.DATA_DIR, media_store=media_store) # ── Telegram setup ─────────────────────────────────────────────────── tg_app = build_telegram_app(web_store) if settings.TG_BOT_TOKEN else None # ── Auth ───────────────────────────────────────────────────────────── def verify_auth(request: Request) -> None: token = request.cookies.get("auth_token") if not settings.AUTH_TOKEN or token != settings.AUTH_TOKEN: raise HTTPException(status_code=401, detail="Unauthorized") def _resolve_web_user(): """Resolve the authenticated web user (owner for now — single AUTH_TOKEN).""" from user_store import get_store store = get_store() # Web auth = owner; multi-user web auth is out of scope return store.bootstrap_owner() async def read_json(request: Request) -> dict: try: payload = await request.json() except Exception: return {} return payload if isinstance(payload, dict) else {} def get_web_thread(thread_id: str) -> dict: try: return web_store.get_thread(thread_id) except KeyError as error: raise HTTPException(status_code=404, detail="Thread not found") from error def get_web_selection(thread: dict) -> ModelSelection: return resolve_selection( model=str(thread.get("model") or settings.DEFAULT_MODEL), provider=str(thread.get("provider") or default_selection().provider), ) def _format_recent_events(user) -> str | None: """Format recent events for injection into the system message (T028).""" from datetime import datetime from datetime import timezone as tz from user_store import get_store store = get_store() events = store.get_recent_events(user.id, window_hours=24) if not events: return None lines = [] now = datetime.now(tz.utc) for ev in events[:15]: # cap to avoid bloating context try: created = datetime.fromisoformat(ev.created_at) delta = now - created hours = int(delta.total_seconds() // 3600) if hours < 1: ago = f"{int(delta.total_seconds() // 60)}m ago" elif hours < 24: ago = f"{hours}h ago" else: ago = "yesterday" except ValueError: ago = "recently" lines.append(f"- {ago}: {ev.summary}") # Mark as consumed store.mark_events_consumed([ev.id for ev in events[:15]]) return "Recent activity for you:\n" + "\n".join(lines) def _build_web_system_message(thread_id: str, user=None) -> str: """Build system message with learnings, integration prompts, and recent events.""" sections = [BASE_CONTEXT.rstrip(), SKILLS_CONTEXT.rstrip()] # Integration tool prompt fragments active_toolsets = active_toolset_names(user) fragments = tool_registry.get_system_prompt_fragments(active_toolsets) if fragments: sections.extend(fragments) project_learnings = web_store.get_project_learnings(thread_id) global_learnings = web_store.get_global_learnings() learnings_text = format_learnings_for_prompt(project_learnings, global_learnings) if learnings_text: sections.append(learnings_text) # Inject recent events (T028) if user: events_section = _format_recent_events(user) if events_section: sections.append(events_section) # Inject onboarding / capability status (T037) if user: onboarding = build_onboarding_fragment(user) if onboarding: sections.append(onboarding) capability = build_capability_fragment(user) if capability: sections.append(capability) return "\n\n".join(section for section in sections if section) def _sse_event(name: str, payload: dict) -> str: return f"event: {name}\ndata: {json.dumps(payload, ensure_ascii=True)}\n\n" def _ai_data(payload: dict | str) -> str: """Emit a single AI SDK data stream part (SSE `data:` line).""" if isinstance(payload, str): return f"data: {payload}\n\n" return f"data: {json.dumps(payload, ensure_ascii=True)}\n\n" def _ai_tool_name(event) -> str: if event.data: return event.data.tool_name or getattr(event.data, "name", None) or "tool" return "tool" def _ai_tool_args(event) -> dict: if event.data and getattr(event.data, "arguments", None): args = event.data.arguments if isinstance(args, str): try: return json.loads(args) except (json.JSONDecodeError, ValueError): return {"raw": args} if isinstance(args, dict): return args return {} def _ai_tool_output(event) -> str | dict: if event.data and getattr(event.data, "output", None): output = event.data.output if isinstance(output, str): if len(output) > 2000: return output[:2000] + "..." try: return json.loads(output) except (json.JSONDecodeError, ValueError): return output if isinstance(output, dict): return output return str(output) return "" def _finalize_web_assistant(thread_id: str, assistant_text: str) -> dict: assistant_message = web_store.add_message(thread_id, "assistant", assistant_text) return {"thread": web_store.get_thread(thread_id), "assistantMessage": assistant_message} def _decode_data_url(data_url: str) -> tuple[str, bytes]: normalized = str(data_url or "").strip() if not normalized.startswith("data:") or "," not in normalized: raise HTTPException(status_code=400, detail="Upload payload must be a base64 data URL") header, encoded = normalized.split(",", 1) if ";base64" not in header: raise HTTPException(status_code=400, detail="Upload payload must be base64 encoded") mime_type = header[5:].split(";", 1)[0].strip() or "application/octet-stream" try: return mime_type, base64.b64decode(encoded, validate=True) except (binascii.Error, ValueError) as error: raise HTTPException(status_code=400, detail="Upload payload is not valid base64") from error def _resolve_uploaded_images(payload: dict) -> list[dict]: raw_attachments = payload.get("attachments") if raw_attachments in (None, ""): return [] if not isinstance(raw_attachments, list): raise HTTPException(status_code=400, detail="Attachments must be provided as a list") uploads: list[dict] = [] for raw_attachment in raw_attachments: if isinstance(raw_attachment, str): file_id = raw_attachment.strip() elif isinstance(raw_attachment, dict): file_id = str(raw_attachment.get("id") or raw_attachment.get("file_id") or "").strip() else: raise HTTPException(status_code=400, detail="Attachments must be objects or IDs") if not file_id: raise HTTPException(status_code=400, detail="Attachment ID is required") try: upload = web_store.get_upload(file_id) except KeyError as error: raise HTTPException(status_code=400, detail=f"Unknown upload: {file_id}") from error if not str(upload.get("mime_type") or "").strip().lower().startswith("image/"): raise HTTPException(status_code=400, detail="Only image uploads are supported right now") uploads.append(upload) return uploads def _build_user_message_parts(content: str, uploads: list[dict]) -> list[dict]: parts: list[dict] = [] if content: parts.append({"type": "input_text", "text": content}) for upload in uploads: parts.append( { "type": "input_image", "file_id": upload["id"], "name": upload.get("name") or "image", "mime_type": upload.get("mime_type") or "image/png", "detail": "auto", } ) return parts def _build_blob_attachments(media_store: LocalMediaStore, uploads: list[dict]) -> list[dict[str, str]] | None: blob_attachments: list[dict[str, str]] = [] for upload in uploads: try: mime_type = str(upload.get("mime_type") or "application/octet-stream") encoded = base64.b64encode(media_store.read_bytes(upload["id"])).decode("ascii") except KeyError: continue blob_attachments.append( { "type": "blob", "mimeType": mime_type, "data": encoded, "displayName": str(upload.get("name") or "upload"), } ) return blob_attachments or None def _handle_web_thread_command(thread_id: str, content: str, current_selection: ModelSelection) -> str | None: if content.startswith("/model "): requested_model = content[7:].strip() if not requested_model: raise HTTPException(status_code=400, detail="Usage: /model ") selection = resolve_selection(model=requested_model, current=current_selection) web_store.update_thread(thread_id, model=selection.model, provider=selection.provider) return f"Switched to provider `{selection.provider}` with model `{selection.model}`." if content.startswith("/provider "): requested_provider = content[10:].strip() if not requested_provider: raise HTTPException(status_code=400, detail="Usage: /provider ") selection = resolve_selection( model=current_selection.model, provider=requested_provider, current=current_selection, ) web_store.update_thread(thread_id, model=selection.model, provider=selection.provider) return f"Provider switched to `{selection.provider}`. Active model: `{selection.model}`." if content == "/models" or content.startswith("/models "): requested_provider = content[8:].strip() if content.startswith("/models ") else None return format_known_models(current=current_selection, provider=requested_provider) if content == "/current": return format_selection(current_selection) return None def _get_magic_rename_client() -> AsyncOpenAI: if not settings.OPENAI_API_KEY: raise HTTPException(status_code=503, detail="OPENAI_API_KEY is not configured for magic rename") global _magic_rename_client if _magic_rename_client is None: _magic_rename_client = AsyncOpenAI( api_key=settings.OPENAI_API_KEY, base_url=settings.OPENAI_BASE_URL, ) return _magic_rename_client def _build_magic_rename_input(thread_id: str) -> tuple[str, str]: history = web_store.build_history(thread_id, limit=MAGIC_RENAME_MAX_MESSAGES) lines: list[str] = [] seen_user_content = False total_chars = 0 for message in history: role = str(message.get("role") or "assistant").strip().lower() content = str(message.get("content") or "").strip() if not content: continue if role == "user" and content.startswith("/"): continue if role == "user": seen_user_content = True single_line = " ".join(content.split()) if not single_line: continue truncated = single_line[:800].rstrip() rendered = f"{role.upper()}: {truncated}" if total_chars + len(rendered) > MAGIC_RENAME_MAX_CHARS and lines: break lines.append(rendered) total_chars += len(rendered) if not lines or not seen_user_content: raise HTTPException(status_code=400, detail="This conversation does not have enough content to summarize yet") fallback_title = next( ( " ".join(str(message.get("content") or "").split()) for message in history if str(message.get("role") or "").strip().lower() == "user" and str(message.get("content") or "").strip() and not str(message.get("content") or "").lstrip().startswith("/") ), "Conversation", ) return "\n".join(lines), fallback_title def _normalize_magic_title(raw_title: str, fallback_title: str) -> str: normalized = next((line.strip() for line in str(raw_title or "").splitlines() if line.strip()), "") if normalized.lower().startswith("title:"): normalized = normalized.split(":", 1)[1].strip() while normalized[:2] in {"- ", "* ", "• ", "1.", "1)"}: normalized = normalized[2:].strip() normalized = normalized.strip("`'\" ") normalized = " ".join(normalized.split()) normalized = normalized.rstrip(".!?;:-") fallback = " ".join(str(fallback_title or "").split()) or "Conversation" if not normalized: normalized = fallback if len(normalized) <= 60: return normalized return f"{normalized[:57].rstrip()}..." async def _generate_magic_title(thread_id: str) -> str: conversation_text, fallback_title = _build_magic_rename_input(thread_id) response = await _get_magic_rename_client().responses.create( model=MAGIC_RENAME_MODEL, instructions=( "Write a short thread title that summarizes the conversation. " "Return exactly one line with no quotes, no markdown, and no trailing punctuation. " "Aim for 4 to 8 words when possible and stay within 60 characters." ), input=conversation_text, max_output_tokens=32, ) return _normalize_magic_title(getattr(response, "output_text", ""), fallback_title) def _is_magic_rename_candidate(thread: dict) -> bool: return int(thread.get("message_count") or 0) > 0 and str(thread.get("title_source") or "").strip().lower() not in { "manual", "magic", } async def _magic_rename_thread(thread_id: str) -> dict: get_web_thread(thread_id) lock = web_store.lock(thread_id) if lock.locked(): raise HTTPException(status_code=409, detail="Still processing the previous message") async with lock: title = await _generate_magic_title(thread_id) return web_store.update_thread(thread_id, title=title, title_source="magic") # ── Event polling (T027) ──────────────────────────────────────────── async def _poll_events_loop() -> None: """Background task that polls services for events at a configurable interval.""" import asyncio interval = settings.EVENT_POLL_INTERVAL_SECONDS if interval <= 0: logger.info("Event polling disabled (interval=%d)", interval) return logger.info("Event polling started (interval=%ds)", interval) while True: try: await asyncio.sleep(interval) await _poll_events_tick() except asyncio.CancelledError: break except Exception: logger.exception("Event polling tick failed") async def _poll_events_tick() -> None: """Run one polling cycle for all supported services.""" from user_store import get_store store = get_store() owner = store.bootstrap_owner() # Vikunja: check for tasks due within 24h if settings.VIKUNJA_API_URL and settings.VIKUNJA_API_KEY: try: from datetime import datetime, timedelta from datetime import timezone as tz import httpx async with httpx.AsyncClient(timeout=15) as client: headers = {"Authorization": f"Bearer {settings.VIKUNJA_API_KEY}"} resp = await client.get( f"{settings.VIKUNJA_API_URL.rstrip('/')}/tasks", headers=headers, params={"filter": "done = false", "per_page": 50}, ) if resp.status_code == 200: tasks = resp.json() if isinstance(resp.json(), list) else [] now = datetime.now(tz.utc) for task in tasks: due = task.get("due_date") if not due or due == "0001-01-01T00:00:00Z": continue try: due_dt = datetime.fromisoformat(due.replace("Z", "+00:00")) except ValueError: continue if now <= due_dt <= now + timedelta(hours=24): summary = f'Task "{task.get("title", "untitled")}" is due within 24h' store.store_event(owner.id, "vikunja", "task.due", summary) except Exception: logger.exception("Vikunja polling failed") # ── Health check integration (T045) ────────────────────────────── async def _run_startup_health_checks(store) -> None: """Run health checks for the owner's credentials at startup.""" from provisioners.base import provisioner_registry owner = store.bootstrap_owner() creds = store.get_credentials(owner.id) for service, cred in creds.items(): provisioner = provisioner_registry.get(service) if provisioner is None: continue try: healthy = await provisioner.health_check(cred) if healthy: logger.info("Health check passed: %s", service) else: logger.warning("Health check FAILED: %s — credential may be expired", service) except Exception: logger.exception("Health check error: %s", service) # ── Lifecycle ──────────────────────────────────────────────────────── @app.on_event("startup") async def startup() -> None: # Initialise the user identity store and bootstrap the owner user store = _init_user_store() store.bootstrap_owner() logger.info("User store initialised, owner bootstrapped") # Register provisioners (guarded by settings availability) import provisioners as _provisioners_init # noqa: F401 # Run health checks for owner credentials (T045) await _run_startup_health_checks(store) await copilot.start() if tg_app: await tg_app.initialize() await tg_app.start() # post_init is not called automatically with manual initialize()/start(); # invoke it explicitly to register bot commands. if tg_app.post_init: await tg_app.post_init(tg_app) if settings.WEBHOOK_BASE_URL: webhook_url = f"{settings.WEBHOOK_BASE_URL}/telegram/webhook/{webhook_secret()}" await tg_app.bot.set_webhook(webhook_url, drop_pending_updates=True) logger.info("Telegram webhook registered (dropped pending updates) at %s", webhook_url) else: await tg_app.updater.start_polling(drop_pending_updates=True) logger.info("Telegram polling started (no WEBHOOK_BASE_URL configured)") # Start event polling background task (T027) import asyncio asyncio.create_task(_poll_events_loop()) @app.on_event("shutdown") async def shutdown() -> None: if tg_app: if tg_app.updater and tg_app.updater.running: await tg_app.updater.stop() await tg_app.stop() await tg_app.shutdown() await copilot.stop() # ── Health ─────────────────────────────────────────────────────────── @app.get("/health") async def health(): return {"status": "ok"} # ── Webhook receivers (T026) ──────────────────────────────────────── @app.post("/webhooks/vikunja") async def webhook_vikunja(request: Request): """Receive Vikunja event webhooks and store as user events.""" from user_store import get_store payload = await read_json(request) event_type = str(payload.get("event") or "unknown") task = payload.get("data") or {} title = str(task.get("title") or "untitled") summary = f"Vikunja: {event_type.replace('.', ' ')} — {title}" # Resolve user via service_user_id assignee_id = str(task.get("created_by", {}).get("id", "")) if isinstance(task.get("created_by"), dict) else "" store = get_store() if assignee_id: # Look up which internal user owns this Vikunja service_user_id row = store.conn.execute( "SELECT user_id FROM service_credentials WHERE service = 'vikunja' AND service_user_id = ?", (assignee_id,), ).fetchone() if row: store.store_event(row["user_id"], "vikunja", event_type, summary, detail_json=None) return {"ok": True} # Fallback: store for owner owner = store.bootstrap_owner() store.store_event(owner.id, "vikunja", event_type, summary, detail_json=None) return {"ok": True} @app.post("/webhooks/karakeep") async def webhook_karakeep(request: Request): """Receive Karakeep event webhooks and store as user events.""" from user_store import get_store payload = await read_json(request) event_type = str(payload.get("event") or "unknown") data = payload.get("data") or {} title = str(data.get("title") or data.get("name") or "item") summary = f"Karakeep: {event_type.replace('.', ' ')} — {title}" store = get_store() owner = store.bootstrap_owner() store.store_event(owner.id, "karakeep", event_type, summary, detail_json=None) return {"ok": True} # ── Login ──────────────────────────────────────────────────────────── @app.get("/login") async def login_page(error: str | None = None): msg = '

Invalid token

' if error else "" return HTMLResponse(f""" {FASTAPI_TITLE} — Login
{msg}
""") @app.post("/login") async def login_submit(request: Request): form = await request.form() if form.get("token") != settings.AUTH_TOKEN: return RedirectResponse("/login?error=1", status_code=303) response = RedirectResponse("/", status_code=303) response.set_cookie( "auth_token", settings.AUTH_TOKEN, httponly=True, secure=True, samesite="lax", max_age=86400 * 30 ) return response # ── Web UI (behind auth) ──────────────────────────────────────────── _FRONTEND_DIR = Path(__file__).parent / "static" / "dist" @app.get("/", dependencies=[Depends(verify_auth)]) async def index(): index_path = _FRONTEND_DIR / "index.html" if index_path.is_file(): return FileResponse(str(index_path)) # Fallback to legacy static/index.html during dev return FileResponse("static/index.html") @app.get("/ui-config", dependencies=[Depends(verify_auth)]) async def ui_config(): return { "defaultProvider": default_selection().provider, "defaultModel": settings.DEFAULT_MODEL, "pricingSource": "https://openrouter.ai/api/v1/models", "providerOptions": build_provider_options(), "curatedModels": build_curated_model_options(), } @app.post("/client-log", dependencies=[Depends(verify_auth)]) async def client_log(request: Request): payload = await request.json() logger.info("client-log %s", payload) return {"ok": True} @app.post("/api/uploads", dependencies=[Depends(verify_auth)]) async def create_upload(request: Request): payload = await read_json(request) name = str(payload.get("name") or "image").strip() or "image" declared_mime_type = str(payload.get("mimeType") or payload.get("mime_type") or "").strip() data_url = str(payload.get("dataUrl") or payload.get("data_url") or "").strip() if not data_url: raise HTTPException(status_code=400, detail="Upload data is required") detected_mime_type, data = _decode_data_url(data_url) mime_type = declared_mime_type or detected_mime_type if not mime_type.lower().startswith("image/"): raise HTTPException(status_code=400, detail="Only image uploads are supported right now") return {"upload": web_store.save_upload(name=name, mime_type=mime_type, data=data)} @app.delete("/api/uploads/{file_id}", dependencies=[Depends(verify_auth)]) async def delete_upload(file_id: str): web_store.delete_upload(file_id) return {"ok": True} @app.get("/uploads/{file_id}", dependencies=[Depends(verify_auth)]) async def get_upload(file_id: str): try: metadata = media_store.get_meta(file_id) data = media_store.read_bytes(file_id) except KeyError as error: raise HTTPException(status_code=404, detail="Upload not found") from error filename = str(metadata.get("name") or "upload").replace('"', "") return Response( content=data, media_type=str(metadata.get("mime_type") or "application/octet-stream"), headers={ "Cache-Control": "private, max-age=86400", "Content-Disposition": f'inline; filename="{filename}"', }, ) @app.get("/api/threads", dependencies=[Depends(verify_auth)]) async def list_threads(q: str | None = None): if q and q.strip(): return {"threads": web_store.search_threads(q)} return {"threads": web_store.list_threads()} @app.post("/api/threads", dependencies=[Depends(verify_auth)]) async def create_thread(request: Request): payload = await read_json(request) title = str(payload.get("title") or "New chat").strip() or "New chat" model = str(payload.get("model") or settings.DEFAULT_MODEL).strip() or settings.DEFAULT_MODEL provider = str(payload.get("provider") or default_selection().provider).strip() or default_selection().provider try: selection = resolve_selection(model=model, provider=provider) except ModelSelectionError as error: raise HTTPException(status_code=400, detail=str(error)) from error return {"thread": web_store.create_thread(title=title, model=selection.model, provider=selection.provider)} @app.post("/api/threads/magic-rename-all", dependencies=[Depends(verify_auth)]) async def magic_rename_all_threads(): threads = web_store.list_threads() eligible_threads = [thread for thread in threads if _is_magic_rename_candidate(thread)] if not eligible_threads: return {"threads": threads, "renamedCount": 0, "skippedCount": 0} _get_magic_rename_client() renamed_count = 0 skipped_count = 0 for thread in eligible_threads: try: await _magic_rename_thread(thread["id"]) renamed_count += 1 except HTTPException as error: logger.info("Magic rename skipped for thread %s: %s", thread["id"], error.detail) skipped_count += 1 except Exception: logger.exception("Magic rename failed for thread %s", thread["id"]) skipped_count += 1 return { "threads": web_store.list_threads(), "renamedCount": renamed_count, "skippedCount": skipped_count, } @app.get("/api/threads/{thread_id}", dependencies=[Depends(verify_auth)]) async def get_thread_detail(thread_id: str): return {"thread": get_web_thread(thread_id)} @app.patch("/api/threads/{thread_id}", dependencies=[Depends(verify_auth)]) async def update_thread(thread_id: str, request: Request): payload = await read_json(request) title = payload.get("title") model = payload.get("model") provider = payload.get("provider") if title is None and model is None and provider is None: raise HTTPException(status_code=400, detail="Nothing to update") try: current_thread = get_web_thread(thread_id) update_kwargs = {"title": title} if model is not None or provider is not None: selection = resolve_selection( model=str(model) if model is not None else current_thread.get("model"), provider=str(provider) if provider is not None else current_thread.get("provider"), current=get_web_selection(current_thread), ) update_kwargs["model"] = selection.model update_kwargs["provider"] = selection.provider thread = web_store.update_thread(thread_id, **update_kwargs) except KeyError as error: raise HTTPException(status_code=404, detail="Thread not found") from error except ModelSelectionError as error: raise HTTPException(status_code=400, detail=str(error)) from error except ValueError as error: raise HTTPException(status_code=400, detail=str(error)) from error return {"thread": thread} @app.post("/api/threads/{thread_id}/magic-rename", dependencies=[Depends(verify_auth)]) async def magic_rename_thread(thread_id: str): thread = await _magic_rename_thread(thread_id) return {"thread": thread} @app.delete("/api/threads/{thread_id}", dependencies=[Depends(verify_auth)]) async def delete_thread(thread_id: str): try: web_store.delete_thread(thread_id) except KeyError as error: raise HTTPException(status_code=404, detail="Thread not found") from error return {"ok": True} @app.post("/api/threads/{thread_id}/messages", dependencies=[Depends(verify_auth)]) async def send_thread_message(thread_id: str, request: Request): thread = get_web_thread(thread_id) user = _resolve_web_user() payload = await read_json(request) content = str(payload.get("content") or "").strip() uploads = _resolve_uploaded_images(payload) if not content and not uploads: raise HTTPException(status_code=400, detail="Message content is required") lock = web_store.lock(thread_id) if lock.locked(): raise HTTPException(status_code=409, detail="Still processing the previous message") async with lock: user_message = web_store.add_message( thread_id, "user", content, parts=_build_user_message_parts(content, uploads), ) try: current_selection = get_web_selection(thread) assistant_text = _handle_web_thread_command(thread_id, content, current_selection) if not uploads else None if assistant_text is None: selection = current_selection history = web_store.build_agent_history(thread_id) prompt = format_prompt_with_history(history, content) result = await run_session( copilot, model=selection.model, provider_config=build_provider_config(selection), system_message=_build_web_system_message(thread_id, user=user), prompt=prompt, tools=build_integration_tools( user, thread_id=thread_id, system_prompt=_build_web_system_message(thread_id, user=user) ) or None, attachments=_build_blob_attachments(media_store, uploads), thread_id=thread_id, ) assistant_text = ( getattr(getattr(result, "data", None), "content", None) or getattr(result, "content", None) or "" ) usage = extract_usage_and_cost( selection.model, selection.provider, result, advisor_usage=_get_advisor_usage(thread_id), ) web_store.add_usage(thread_id, usage) if not assistant_text: assistant_text = ( "I finished that run but did not get a usable final reply back. " "Please retry, or narrow it to one repo, file, or command." ) # Extract learnings from this turn try: project_learnings, global_learnings = extract_learnings_from_turn(content, assistant_text) for fact, category in project_learnings: web_store.add_project_learning(thread_id, fact, category=category) for fact, category in global_learnings: web_store.add_global_learning(fact, category=category) except Exception: logger.warning("Learning extraction failed", exc_info=True) return _finalize_web_assistant(thread_id, assistant_text) except HTTPException: web_store.remove_message(thread_id, user_message["id"]) raise except ModelSelectionError as error: logger.exception("Model selection error for web thread %s", thread_id) web_store.remove_message(thread_id, user_message["id"]) raise HTTPException(status_code=400, detail=str(error)) from error except Exception as error: logger.exception("Agent error for web thread %s", thread_id) detail = format_session_error(surface="web", error=error) return _finalize_web_assistant(thread_id, detail) @app.post("/api/threads/{thread_id}/messages/stream", dependencies=[Depends(verify_auth)]) async def stream_thread_message(thread_id: str, request: Request): get_web_thread(thread_id) user = _resolve_web_user() payload = await read_json(request) content = str(payload.get("content") or "").strip() uploads = _resolve_uploaded_images(payload) if not content and not uploads: raise HTTPException(status_code=400, detail="Message content is required") lock = web_store.lock(thread_id) if lock.locked(): raise HTTPException(status_code=409, detail="Still processing the previous message") async def event_stream() -> AsyncIterator[str]: async with lock: user_message = web_store.add_message( thread_id, "user", content, parts=_build_user_message_parts(content, uploads), ) try: current_selection = get_web_selection(get_web_thread(thread_id)) assistant_text = ( _handle_web_thread_command(thread_id, content, current_selection) if not uploads else None ) if assistant_text is not None: yield _sse_event("final", _finalize_web_assistant(thread_id, assistant_text)) return history = web_store.build_agent_history(thread_id) prompt = format_prompt_with_history(history, content) attachments = _build_blob_attachments(media_store, uploads) reasoning_summaries: dict[str, str] = {} collected_events: list = [] turn_started_at = time.perf_counter() web_system_message = _build_web_system_message(thread_id, user=user) async for event in stream_session( copilot, model=current_selection.model, provider_config=build_provider_config(current_selection), system_message=web_system_message, prompt=prompt, tools=build_integration_tools(user, thread_id=thread_id, system_prompt=web_system_message) or None, attachments=attachments, thread_id=thread_id, ): collected_events.append(event) # Handle errors if event.type == SessionEventType.SESSION_ERROR: error_msg = ( event.data.message if event.data and isinstance(event.data.message, str) else "Unknown session error" ) yield _sse_event("error", {"message": error_msg}) return # Tool trace events trace_event = stream_trace_event(event) if trace_event: yield _sse_event("progress", trace_event) # Reasoning events reasoning_event = stream_reasoning_event(event) if reasoning_event: key, text, is_final = reasoning_event if is_final: reasoning_summaries[key] = text else: reasoning_summaries[key] = reasoning_summaries.get(key, "") + text reasoning_text = reasoning_summaries[key] if reasoning_text.strip(): yield _sse_event( "progress", { "kind": "reasoning", "text": reasoning_text, }, ) for status_text in stream_status_updates(event, include_reasoning_status=False): yield _sse_event( "progress", { "kind": "status", "text": append_elapsed_status( status_text, elapsed_seconds=time.perf_counter() - turn_started_at, ), }, ) assistant_text = extract_final_text(collected_events) usage = extract_usage_and_cost( current_selection.model, current_selection.provider, collected_events, advisor_usage=_get_advisor_usage(thread_id), ) web_store.add_usage(thread_id, usage) if not assistant_text: assistant_text = ( "I finished that run but did not get a usable final reply back. " "Please retry, or narrow it to one repo, file, or command." ) # Extract learnings from this turn try: project_learnings, global_learnings = extract_learnings_from_turn(content, assistant_text) for fact, category in project_learnings: web_store.add_project_learning(thread_id, fact, category=category) for fact, category in global_learnings: web_store.add_global_learning(fact, category=category) except Exception: logger.warning("Learning extraction failed", exc_info=True) # Emit advisor trace events if any try: from tools.advisor import get_advisor_traces for trace in get_advisor_traces(thread_id): yield _sse_event( "progress", { "kind": "advisor_trace", "question": trace.get("question", ""), "answer": trace.get("answer", ""), "stakes": trace.get("stakes", "medium"), "model": trace.get("model", ""), "duration_ms": trace.get("duration_ms", 0), }, ) except ImportError: pass yield _sse_event("final", _finalize_web_assistant(thread_id, assistant_text)) except HTTPException as error: web_store.remove_message(thread_id, user_message["id"]) yield _sse_event("error", {"message": str(error.detail)}) except ModelSelectionError as error: logger.exception("Model selection error for web thread %s", thread_id) web_store.remove_message(thread_id, user_message["id"]) yield _sse_event("error", {"message": str(error)}) except Exception as error: logger.exception("Streamed agent error for web thread %s", thread_id) detail = format_session_error(surface="web", error=error) yield _sse_event("final", _finalize_web_assistant(thread_id, detail)) return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, ) # ── AI SDK data stream endpoint ────────────────────────────────────── @app.post("/api/chat", dependencies=[Depends(verify_auth)]) async def ai_sdk_chat(request: Request): """Vercel AI SDK–compatible chat endpoint using the data stream protocol.""" user = _resolve_web_user() payload = await read_json(request) thread_id = str(payload.get("threadId") or "").strip() raw_messages = payload.get("messages") or [] # Extract content from the last UIMessage content = "" if raw_messages: last_msg = raw_messages[-1] if isinstance(raw_messages, list) else {} parts = last_msg.get("parts") or [] text_parts = [str(p.get("text") or "") for p in parts if isinstance(p, dict) and p.get("type") == "text"] content = "\n".join(t for t in text_parts if t).strip() # Fallback: custom `content` field sent by the frontend if not content: content = str(payload.get("content") or "").strip() raw_attachments = payload.get("attachments") uploads = _resolve_uploaded_images({"attachments": raw_attachments}) if raw_attachments else [] if not content and not uploads: raise HTTPException(status_code=400, detail="Message content is required") # Auto-create thread when threadId is missing if not thread_id: first_line = content.split("\n", 1)[0][:60].strip() or "New chat" selection = resolve_selection(model=settings.DEFAULT_MODEL) thread = web_store.create_thread(title=first_line, model=selection.model, provider=selection.provider) thread_id = thread["id"] else: get_web_thread(thread_id) thread = get_web_thread(thread_id) lock = web_store.lock(thread_id) if lock.locked(): raise HTTPException(status_code=409, detail="Still processing the previous message") async def event_stream() -> AsyncIterator[str]: async with lock: user_message = web_store.add_message( thread_id, "user", content, parts=_build_user_message_parts(content, uploads), ) msg_id = str(uuid.uuid4()) text_id = f"text-{msg_id}" in_text = False reasoning_open: set[str] = set() yield _ai_data({"type": "start", "messageId": msg_id}) yield _ai_data({"type": "start-step"}) try: current_selection = get_web_selection(thread) # Handle slash commands command_result = ( _handle_web_thread_command(thread_id, content, current_selection) if not uploads else None ) if command_result is not None: yield _ai_data({"type": "text-start", "id": text_id}) yield _ai_data({"type": "text-delta", "id": text_id, "delta": command_result}) yield _ai_data({"type": "text-end", "id": text_id}) yield _ai_data({"type": "finish-step"}) yield _ai_data({"type": "finish"}) yield "data: [DONE]\n\n" web_store.add_message(thread_id, "assistant", command_result) return history = web_store.build_agent_history(thread_id) prompt = format_prompt_with_history(history, content) attachments = _build_blob_attachments(media_store, uploads) collected_events: list = [] ai_system_message = _build_web_system_message(thread_id, user=user) async for event in stream_session( copilot, model=current_selection.model, provider_config=build_provider_config(current_selection), system_message=ai_system_message, prompt=prompt, tools=build_integration_tools(user, thread_id=thread_id, system_prompt=ai_system_message) or None, attachments=attachments, thread_id=thread_id, ): collected_events.append(event) # ── Error ── if event.type == SessionEventType.SESSION_ERROR: error_msg = ( event.data.message if event.data and isinstance(event.data.message, str) else "Unknown session error" ) if in_text: yield _ai_data({"type": "text-end", "id": text_id}) for rid in list(reasoning_open): yield _ai_data({"type": "reasoning-end", "id": rid}) yield _ai_data({"type": "error", "errorText": error_msg}) return # ── Text delta ── if event.type == SessionEventType.ASSISTANT_MESSAGE_DELTA: delta = (event.data and event.data.delta_content) or "" if delta: if not in_text: yield _ai_data({"type": "text-start", "id": text_id}) in_text = True yield _ai_data({"type": "text-delta", "id": text_id, "delta": delta}) continue # ── Tool start ── if event.type == SessionEventType.TOOL_EXECUTION_START: if in_text: yield _ai_data({"type": "text-end", "id": text_id}) in_text = False text_id = f"text-{uuid.uuid4()}" tool_name = _ai_tool_name(event) tool_call_id = (event.data and event.data.tool_call_id) or str(uuid.uuid4()) args = _ai_tool_args(event) yield _ai_data( { "type": "tool-input-start", "toolCallId": tool_call_id, "toolName": tool_name, } ) if args: yield _ai_data( { "type": "tool-input-delta", "toolCallId": tool_call_id, "inputTextDelta": json.dumps(args, ensure_ascii=False), } ) yield _ai_data( { "type": "tool-input-available", "toolCallId": tool_call_id, "toolName": tool_name, "input": args, } ) # ── Tool complete ── if event.type == SessionEventType.TOOL_EXECUTION_COMPLETE: tool_call_id = (event.data and event.data.tool_call_id) or "" output = _ai_tool_output(event) yield _ai_data( { "type": "tool-output-available", "toolCallId": tool_call_id, "output": output, } ) # ── Reasoning delta ── if event.type == SessionEventType.ASSISTANT_REASONING_DELTA: rid = (event.data and event.data.reasoning_id) or "reasoning" text = (event.data and event.data.reasoning_text) or "" if text: if rid not in reasoning_open: yield _ai_data({"type": "reasoning-start", "id": rid}) reasoning_open.add(rid) yield _ai_data({"type": "reasoning-delta", "id": rid, "delta": text}) # ── Reasoning complete ── if event.type == SessionEventType.ASSISTANT_REASONING: rid = (event.data and event.data.reasoning_id) or "reasoning" if rid in reasoning_open: yield _ai_data({"type": "reasoning-end", "id": rid}) reasoning_open.discard(rid) for status_text in stream_status_updates(event, include_reasoning_status=False): yield _ai_data({"type": "data-status", "data": {"text": status_text}}) # Close open reasoning blocks for rid in list(reasoning_open): yield _ai_data({"type": "reasoning-end", "id": rid}) # Close / create text block with final content if in_text: yield _ai_data({"type": "text-end", "id": text_id}) else: final_text = extract_final_text(collected_events) if not final_text: final_text = ( "I finished that run but did not get a usable final reply back. " "Please retry, or narrow it to one repo, file, or command." ) yield _ai_data({"type": "text-start", "id": text_id}) yield _ai_data({"type": "text-delta", "id": text_id, "delta": final_text}) yield _ai_data({"type": "text-end", "id": text_id}) # Store assistant reply assistant_text = extract_final_text(collected_events) or "No response." usage = extract_usage_and_cost( current_selection.model, current_selection.provider, collected_events, advisor_usage=_get_advisor_usage(thread_id), ) web_store.add_usage(thread_id, usage) web_store.add_message(thread_id, "assistant", assistant_text) # Extract learnings from this turn try: project_learnings, global_learnings = extract_learnings_from_turn(content, assistant_text) for fact, category in project_learnings: web_store.add_project_learning(thread_id, fact, category=category) for fact, category in global_learnings: web_store.add_global_learning(fact, category=category) except Exception: logger.warning("Learning extraction failed", exc_info=True) yield _ai_data({"type": "finish-step"}) yield _ai_data({"type": "finish"}) yield "data: [DONE]\n\n" except HTTPException as exc: web_store.remove_message(thread_id, user_message["id"]) if in_text: yield _ai_data({"type": "text-end", "id": text_id}) yield _ai_data({"type": "error", "errorText": str(exc.detail)}) except ModelSelectionError as exc: logger.exception("Model selection error for AI SDK chat thread %s", thread_id) web_store.remove_message(thread_id, user_message["id"]) if in_text: yield _ai_data({"type": "text-end", "id": text_id}) yield _ai_data({"type": "error", "errorText": str(exc)}) except Exception as exc: logger.exception("AI SDK chat error for thread %s", thread_id) detail = format_session_error(surface="web", error=exc) if in_text: yield _ai_data({"type": "text-end", "id": text_id}) yield _ai_data({"type": "error", "errorText": detail}) return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "x-vercel-ai-ui-message-stream": "v1", "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, ) # ── Telegram webhook (authenticated by secret path) ───────────────── @app.post("/telegram/webhook/{secret}") async def telegram_webhook(secret: str, request: Request): if not tg_app or secret != webhook_secret(): raise HTTPException(status_code=403) data = await request.json() update = Update.de_json(data, tg_app.bot) if update and web_store.is_tg_update_seen(update.update_id): logger.info("Skipping already-processed Telegram update %s", update.update_id) return {"ok": True} if update: web_store.mark_tg_update(update.update_id) await tg_app.process_update(update) return {"ok": True} # ── Static files (CSS, JS, etc.) ──────────────────────────────────── if _FRONTEND_DIR.is_dir(): app.mount("/assets", StaticFiles(directory=str(_FRONTEND_DIR / "assets")), name="frontend-assets") app.mount("/static", StaticFiles(directory="static"), name="static")