Replace standalone Telegram bot with full CodeAnywhere framework fork. BetterBot shares all framework code and customizes only: - instance.py: BetterBot identity, system prompt, feature flags - tools/site_editing/: list_files, read_file, write_file with auto git push - .env: model defaults and site directory paths - compose/: Docker setup with betterlifesg + memoraiz mounts - deploy script: RackNerd with Infisical secrets
2012 lines
75 KiB
Python
2012 lines
75 KiB
Python
"""Telegram bot — routes messages to Copilot SDK with persistent shared sessions."""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import re
|
|
from decimal import Decimal
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from copilot import define_tool
|
|
from copilot.generated.session_events import SessionEventType
|
|
from copilot.tools import ToolInvocation
|
|
from pydantic import BaseModel, Field
|
|
from telegram import (
|
|
BotCommand,
|
|
BotCommandScopeAllChatAdministrators,
|
|
BotCommandScopeAllGroupChats,
|
|
BotCommandScopeDefault,
|
|
InlineKeyboardButton,
|
|
InlineKeyboardMarkup,
|
|
InputFile,
|
|
Update,
|
|
User,
|
|
helpers,
|
|
)
|
|
from telegram.error import RetryAfter, TelegramError
|
|
from telegram.ext import (
|
|
Application,
|
|
ApplicationHandlerStop,
|
|
CallbackQueryHandler,
|
|
CommandHandler,
|
|
ContextTypes,
|
|
MessageHandler,
|
|
PicklePersistence,
|
|
TypeHandler,
|
|
filters,
|
|
)
|
|
|
|
import tools as _tools_init # noqa: F401 — registers tool sets
|
|
from background_tasks import BackgroundTaskManager
|
|
from config import settings
|
|
from copilot_runtime import copilot, format_prompt_with_history, stream_session
|
|
from instance import BASE_CONTEXT, SKILLS_CONTEXT, TELEGRAM_START_MESSAGE
|
|
from learning import extract_learnings_from_turn, format_learnings_for_prompt
|
|
from llm_costs import extract_usage_and_cost, format_cost_value, summarize_usage
|
|
|
|
|
|
def _get_advisor_usage(thread_id: str | None) -> dict | None:
|
|
"""Fetch advisor sidecar usage for *thread_id*, if the module is loaded."""
|
|
if not thread_id:
|
|
return None
|
|
try:
|
|
from tools.advisor import get_advisor_usage
|
|
return get_advisor_usage(thread_id)
|
|
except ImportError:
|
|
return None
|
|
from model_selection import (
|
|
ModelSelection,
|
|
ModelSelectionError,
|
|
build_provider_config,
|
|
default_selection,
|
|
format_known_models,
|
|
format_selection,
|
|
resolve_selection,
|
|
)
|
|
from prompt_utils import generate_diff
|
|
from tool_pipeline import (
|
|
active_toolset_names,
|
|
build_capability_fragment,
|
|
build_integration_tools,
|
|
build_onboarding_fragment,
|
|
)
|
|
from tool_registry import registry as tool_registry
|
|
from ux import (
|
|
append_elapsed_status,
|
|
busy_message,
|
|
extract_final_text,
|
|
format_session_error,
|
|
format_tool_counts,
|
|
markdown_to_telegram_html,
|
|
stream_status_updates,
|
|
stream_trace_event,
|
|
working_message,
|
|
)
|
|
from web_fallback_store import WebFallbackStore
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ── Telegram access control (dynamic owner-approval) ────────────
|
|
_OWNER_TG_CHAT_ID: int | None = None
|
|
if settings.OWNER_TELEGRAM_CHAT_ID.strip():
|
|
try:
|
|
_OWNER_TG_CHAT_ID = int(settings.OWNER_TELEGRAM_CHAT_ID.strip())
|
|
logger.info("Owner Telegram chat ID configured: %d", _OWNER_TG_CHAT_ID)
|
|
except ValueError:
|
|
logger.warning("OWNER_TELEGRAM_CHAT_ID is not a valid integer, approval gate disabled")
|
|
|
|
# Track pending approval requests so we don't spam the owner
|
|
_pending_approval_notified: set[int] = set()
|
|
|
|
|
|
async def _gate_telegram_access(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Gate Telegram access via owner-approval flow.
|
|
|
|
- Owner (matching OWNER_TELEGRAM_CHAT_ID) → always allowed.
|
|
- telegram_approved=True → allowed.
|
|
- telegram_approved=False → silently blocked (denied).
|
|
- telegram_approved=None (new/pending) → notify owner with approve/deny,
|
|
tell user "waiting for approval", then block.
|
|
- Callback queries for approval buttons are always let through.
|
|
"""
|
|
if not _OWNER_TG_CHAT_ID:
|
|
return # no owner configured, open access
|
|
|
|
# Always let approval callback queries through
|
|
if update.callback_query and update.callback_query.data and update.callback_query.data.startswith("tg_approve:"):
|
|
return
|
|
|
|
tg_user = update.effective_user
|
|
if not tg_user:
|
|
return # system update, let it through
|
|
|
|
# Owner is always allowed
|
|
if tg_user.id == _OWNER_TG_CHAT_ID:
|
|
return
|
|
|
|
# Look up approval status
|
|
from user_store import get_store as _get_user_store
|
|
|
|
store = _get_user_store()
|
|
db_user = store.resolve_or_create_user(
|
|
provider="telegram",
|
|
external_id=str(tg_user.id),
|
|
display_name=tg_user.full_name or str(tg_user.id),
|
|
)
|
|
|
|
if db_user.telegram_approved is True:
|
|
return # approved
|
|
|
|
if db_user.telegram_approved is False:
|
|
raise ApplicationHandlerStop() # denied — silent drop
|
|
|
|
# telegram_approved is None → pending approval
|
|
if tg_user.id not in _pending_approval_notified:
|
|
_pending_approval_notified.add(tg_user.id)
|
|
# Send approval request to owner
|
|
keyboard = InlineKeyboardMarkup(
|
|
[
|
|
[
|
|
InlineKeyboardButton("✅ Approve", callback_data=f"tg_approve:yes:{db_user.id}"),
|
|
InlineKeyboardButton("❌ Deny", callback_data=f"tg_approve:no:{db_user.id}"),
|
|
]
|
|
]
|
|
)
|
|
user_info = (
|
|
f"<b>New Telegram user requesting access:</b>\n"
|
|
f"• Name: {helpers.escape_markdown(tg_user.full_name or 'unknown')}\n"
|
|
f"• Username: @{tg_user.username or 'none'}\n"
|
|
f"• Telegram ID: <code>{tg_user.id}</code>\n"
|
|
f"• Internal ID: <code>{db_user.id}</code>"
|
|
)
|
|
try:
|
|
await context.bot.send_message(
|
|
chat_id=_OWNER_TG_CHAT_ID,
|
|
text=user_info,
|
|
parse_mode="HTML",
|
|
reply_markup=keyboard,
|
|
)
|
|
except TelegramError:
|
|
logger.warning("Failed to send approval request to owner", exc_info=True)
|
|
|
|
# Tell the user they're pending
|
|
msg = update.effective_message
|
|
if msg:
|
|
try:
|
|
await msg.reply_text("⏳ Your access request has been sent to the bot owner. Please wait for approval.")
|
|
except TelegramError:
|
|
pass
|
|
|
|
raise ApplicationHandlerStop()
|
|
|
|
|
|
async def _handle_approval_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle approve/deny inline keyboard callbacks from the owner."""
|
|
query = update.callback_query
|
|
if not query or not query.data:
|
|
return
|
|
await query.answer()
|
|
|
|
parts = query.data.split(":")
|
|
if len(parts) != 3:
|
|
return
|
|
_, decision, user_id = parts
|
|
|
|
from user_store import get_store as _get_user_store
|
|
|
|
store = _get_user_store()
|
|
db_user = store.get_user(user_id)
|
|
if not db_user:
|
|
await query.edit_message_text("⚠️ User not found in database.")
|
|
return
|
|
|
|
approved = decision == "yes"
|
|
store.set_telegram_approval(user_id, approved)
|
|
|
|
status = "✅ Approved" if approved else "❌ Denied"
|
|
await query.edit_message_text(f"{status}: {db_user.display_name} (Telegram)")
|
|
|
|
# If approved, notify the user
|
|
if approved:
|
|
# Find their Telegram ID from external_identities
|
|
row = store.conn.execute(
|
|
"SELECT external_id FROM external_identities WHERE user_id = ? AND provider = 'telegram'",
|
|
(user_id,),
|
|
).fetchone()
|
|
if row:
|
|
tg_id = int(row["external_id"])
|
|
_pending_approval_notified.discard(tg_id)
|
|
try:
|
|
await context.bot.send_message(
|
|
chat_id=tg_id,
|
|
text="✅ Your access has been approved! You can now use the bot.",
|
|
)
|
|
except TelegramError:
|
|
logger.warning("Failed to notify approved user %s", tg_id, exc_info=True)
|
|
|
|
|
|
MAX_HISTORY = 40
|
|
TG_MESSAGE_LIMIT = 4000
|
|
TG_TOPIC_NAME_LIMIT = 128
|
|
TG_STATUS_EDIT_INTERVAL = 5.0
|
|
TG_SYSTEM_PROMPT_LIMIT = 12000
|
|
SKILL_MENU_CALLBACK_PREFIX = "skill_menu:"
|
|
SKILL_MENU_ITEMS = [
|
|
("create-a-skill", "Create or update skills"),
|
|
]
|
|
|
|
|
|
_ACTIVE_RUNS_KEY = "active_runs"
|
|
_USER_MODEL_ALIASES_KEY = "user_model_aliases"
|
|
|
|
# ── Alias file persistence (belt-and-suspenders alongside PicklePersistence) ──
|
|
|
|
|
|
def _alias_file_path() -> Path:
|
|
return Path(settings.TG_PERSISTENCE_DIR or settings.DATA_DIR).expanduser() / "aliases.json"
|
|
|
|
|
|
def _load_alias_file() -> dict[str, dict[str, str]]:
|
|
"""Load all aliases from the JSON file. Returns {owner_key: {alias: target}}."""
|
|
p = _alias_file_path()
|
|
if not p.exists():
|
|
return {}
|
|
try:
|
|
data = json.loads(p.read_text("utf-8"))
|
|
if isinstance(data, dict):
|
|
# Filter out any flat alias entries that leaked into the top level;
|
|
# only keep entries where the value is a nested dict (owner -> aliases).
|
|
return {k: v for k, v in data.items() if isinstance(v, dict)}
|
|
except Exception:
|
|
logger.warning("Failed to read alias file %s, starting fresh", p, exc_info=True)
|
|
return {}
|
|
|
|
|
|
def _save_alias_file(all_aliases: dict[str, dict[str, str]]) -> None:
|
|
"""Atomically write all aliases to the JSON file."""
|
|
p = _alias_file_path()
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = p.with_suffix(".tmp")
|
|
try:
|
|
tmp.write_text(json.dumps(all_aliases, indent=2, sort_keys=True), "utf-8")
|
|
tmp.replace(p)
|
|
except Exception:
|
|
logger.warning("Failed to write alias file %s", p, exc_info=True)
|
|
|
|
|
|
def _normalize_alias_name(raw_name: str) -> str:
|
|
normalized = str(raw_name or "").strip().lower()
|
|
if not normalized:
|
|
raise ValueError("Alias name cannot be empty")
|
|
if ":" in normalized or any(ch.isspace() for ch in normalized):
|
|
raise ValueError("Alias names cannot contain spaces or `:`")
|
|
if not re.fullmatch(r"[a-z0-9][a-z0-9._-]*", normalized):
|
|
raise ValueError("Alias names may only use lowercase letters, numbers, `.`, `_`, and `-`")
|
|
return normalized
|
|
|
|
|
|
def _alias_owner_key(user: User | None) -> str | None:
|
|
if user is None:
|
|
return None
|
|
return str(user.id)
|
|
|
|
|
|
def _user_aliases(context: ContextTypes.DEFAULT_TYPE, user: User | None) -> dict[str, str]:
|
|
owner_key = _alias_owner_key(user)
|
|
if owner_key is None:
|
|
return {}
|
|
|
|
aliases = context.application.bot_data.get(_USER_MODEL_ALIASES_KEY)
|
|
if not isinstance(aliases, dict):
|
|
aliases = {}
|
|
context.application.bot_data[_USER_MODEL_ALIASES_KEY] = aliases
|
|
|
|
user_aliases = aliases.get(owner_key)
|
|
if not isinstance(user_aliases, dict):
|
|
user_aliases = {}
|
|
aliases[owner_key] = user_aliases
|
|
|
|
# Merge aliases from the durable JSON file (file wins for missing keys)
|
|
file_aliases = _load_alias_file().get(owner_key, {})
|
|
for k, v in file_aliases.items():
|
|
if k not in user_aliases:
|
|
user_aliases[k] = v
|
|
|
|
normalized: dict[str, str] = {}
|
|
changed = False
|
|
for raw_name, raw_target in list(user_aliases.items()):
|
|
try:
|
|
alias_name = _normalize_alias_name(str(raw_name))
|
|
except ValueError:
|
|
changed = True
|
|
continue
|
|
target = str(raw_target or "").strip()
|
|
if not target:
|
|
changed = True
|
|
continue
|
|
normalized[alias_name] = target
|
|
if raw_name != alias_name or raw_target != target:
|
|
changed = True
|
|
|
|
if changed or user_aliases is not normalized:
|
|
aliases[owner_key] = normalized
|
|
return aliases[owner_key]
|
|
|
|
|
|
def _resolve_user_model_alias(
|
|
context: ContextTypes.DEFAULT_TYPE, user: User | None, requested_model: str
|
|
) -> tuple[str, str | None]:
|
|
normalized_request = str(requested_model or "").strip()
|
|
if not normalized_request:
|
|
return normalized_request, None
|
|
|
|
aliases = _user_aliases(context, user)
|
|
alias_key = normalized_request.lower()
|
|
target = aliases.get(alias_key)
|
|
if not target:
|
|
return normalized_request, None
|
|
return target, alias_key
|
|
|
|
|
|
def _set_user_model_alias(
|
|
context: ContextTypes.DEFAULT_TYPE, user: User | None, alias_name: str, target_model: str
|
|
) -> tuple[str, str]:
|
|
owner_key = _alias_owner_key(user)
|
|
if owner_key is None:
|
|
raise ValueError("User identity is unavailable for alias storage")
|
|
|
|
normalized_alias = _normalize_alias_name(alias_name)
|
|
normalized_target = str(target_model or "").strip()
|
|
if not normalized_target:
|
|
raise ValueError("Model target cannot be empty")
|
|
|
|
aliases = _user_aliases(context, user)
|
|
aliases[normalized_alias] = normalized_target
|
|
|
|
# Immediately persist to JSON file (durable, survives crashes)
|
|
_persist_aliases_to_file(context)
|
|
|
|
return normalized_alias, normalized_target
|
|
|
|
|
|
def _persist_aliases_to_file(context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Write the full alias dict from bot_data to the JSON file."""
|
|
all_aliases = context.application.bot_data.get(_USER_MODEL_ALIASES_KEY)
|
|
if isinstance(all_aliases, dict):
|
|
_save_alias_file(all_aliases)
|
|
|
|
|
|
def _format_user_aliases(context: ContextTypes.DEFAULT_TYPE, user: User | None) -> str:
|
|
aliases = _user_aliases(context, user)
|
|
if not aliases:
|
|
return "No aliases saved yet.\nUsage: /alias cheap vercel:google/gemma-4-31b-it"
|
|
|
|
lines = ["Your model aliases"]
|
|
for name in sorted(aliases):
|
|
lines.append(f"- `{name}` → `{aliases[name]}`")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _active_runs(context: ContextTypes.DEFAULT_TYPE) -> dict[str, dict[str, Any]]:
|
|
runs = context.application.bot_data.get(_ACTIVE_RUNS_KEY)
|
|
if not isinstance(runs, dict):
|
|
runs = {}
|
|
context.application.bot_data[_ACTIVE_RUNS_KEY] = runs
|
|
return runs
|
|
|
|
|
|
def _register_active_run(context: ContextTypes.DEFAULT_TYPE, thread_id: str, run_state: dict[str, Any]) -> None:
|
|
_active_runs(context)[thread_id] = run_state
|
|
|
|
|
|
def _clear_active_run(context: ContextTypes.DEFAULT_TYPE, thread_id: str, run_state: dict[str, Any]) -> None:
|
|
runs = _active_runs(context)
|
|
if runs.get(thread_id) is run_state:
|
|
runs.pop(thread_id, None)
|
|
|
|
|
|
def _get_active_run(
|
|
update: Update, context: ContextTypes.DEFAULT_TYPE
|
|
) -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
|
|
thread = _current_thread(update, context)
|
|
if thread is None:
|
|
return None, None
|
|
return thread, _active_runs(context).get(thread["id"])
|
|
|
|
|
|
def _effective_topic_thread_id(update: Update) -> int:
|
|
message = update.effective_message
|
|
if message is None or not getattr(message, "is_topic_message", False):
|
|
return 0
|
|
return int(getattr(message, "message_thread_id", 0) or 0)
|
|
|
|
|
|
def _is_general_topic(update: Update) -> bool:
|
|
chat = update.effective_chat
|
|
if chat is None or not getattr(chat, "is_forum", False):
|
|
return False
|
|
return _effective_topic_thread_id(update) == 1
|
|
|
|
|
|
def _normalize_topic_name(raw_name: str) -> str:
|
|
normalized = " ".join(str(raw_name or "").split())
|
|
if not normalized:
|
|
raise ValueError("Topic name cannot be empty")
|
|
if len(normalized) > TG_TOPIC_NAME_LIMIT:
|
|
raise ValueError(f"Topic names must be {TG_TOPIC_NAME_LIMIT} characters or fewer")
|
|
return normalized
|
|
|
|
|
|
def _normalize_system_prompt_value(raw_prompt: str) -> str:
|
|
normalized = str(raw_prompt or "").strip()
|
|
if not normalized:
|
|
raise ValueError("System instructions cannot be empty")
|
|
if len(normalized) > TG_SYSTEM_PROMPT_LIMIT:
|
|
raise ValueError(f"System instructions must be {TG_SYSTEM_PROMPT_LIMIT} characters or fewer.")
|
|
return normalized
|
|
|
|
|
|
def _format_prompt_diff_message(diff: str) -> str:
|
|
header = "Proposed System Prompt Change:\n\n"
|
|
if not diff:
|
|
return header.rstrip()
|
|
|
|
suffix = "\n\n[diff truncated]"
|
|
max_diff_length = TG_MESSAGE_LIMIT - len(header)
|
|
if len(diff) <= max_diff_length:
|
|
return header + diff
|
|
|
|
truncated_length = max_diff_length - len(suffix)
|
|
preview = diff[: max(truncated_length, 0)].rstrip("\n")
|
|
return header + preview + suffix
|
|
|
|
|
|
def _topic_session_key(chat_id: int, thread_id: int) -> str:
|
|
return f"topic:{chat_id}:{thread_id}"
|
|
|
|
|
|
def _telegram_agent_context(update: Update) -> str | None:
|
|
chat = update.effective_chat
|
|
message = update.effective_message
|
|
if chat is None or not getattr(chat, "is_forum", False):
|
|
return None
|
|
|
|
if _is_general_topic(update):
|
|
current_location = "The current message is inside the General topic."
|
|
elif getattr(message, "is_topic_message", False):
|
|
current_location = "The current message is inside a named forum topic."
|
|
else:
|
|
current_location = "The current message is in a forum supergroup."
|
|
|
|
return (
|
|
"You are running inside a Telegram forum supergroup. "
|
|
f"{current_location} "
|
|
"Telegram topic-management tools are available in this run. "
|
|
"Use them when the user asks to create, rename, or organize forum topics. "
|
|
"If the user wants one topic per repo or project, inspect the directory first and then create the requested topics. "
|
|
"Your visible final reply still appears in the current chat or topic even if you create new topics elsewhere."
|
|
)
|
|
|
|
|
|
def _build_telegram_topic_tools(update: Update, context: ContextTypes.DEFAULT_TYPE) -> list[Any]:
|
|
chat = update.effective_chat
|
|
if chat is None or not getattr(chat, "is_forum", False):
|
|
return []
|
|
|
|
store = _store(context)
|
|
|
|
class CreateTopicsParams(BaseModel):
|
|
topic_names: list[str]
|
|
initial_message: str | None = None
|
|
|
|
async def _create_topics_handler(params: CreateTopicsParams, invocation: ToolInvocation) -> str:
|
|
unique_names: list[str] = []
|
|
seen_names: set[str] = set()
|
|
invalid_names: list[str] = []
|
|
|
|
for raw_name in params.topic_names:
|
|
try:
|
|
normalized_name = _normalize_topic_name(raw_name)
|
|
except ValueError as error:
|
|
invalid_names.append(f"{raw_name!r}: {error}")
|
|
continue
|
|
|
|
dedupe_key = normalized_name.casefold()
|
|
if dedupe_key in seen_names:
|
|
continue
|
|
seen_names.add(dedupe_key)
|
|
unique_names.append(normalized_name)
|
|
|
|
if not unique_names:
|
|
if invalid_names:
|
|
return "No valid topic names were provided. " + "; ".join(invalid_names)
|
|
return "No topic names were provided."
|
|
|
|
seeded_message = str(params.initial_message or "").strip()
|
|
selection = _get_selection(update, context)
|
|
created_topics: list[str] = []
|
|
|
|
for topic_name in unique_names:
|
|
forum_topic = await chat.create_forum_topic(name=topic_name)
|
|
thread_id = int(forum_topic.message_thread_id)
|
|
thread = store.get_or_create_session_thread(
|
|
session_key=_topic_session_key(chat.id, thread_id),
|
|
title=topic_name,
|
|
model=selection.model,
|
|
provider=selection.provider,
|
|
source="telegram",
|
|
session_label=f"Telegram topic {topic_name}",
|
|
)
|
|
store.update_thread(thread["id"], title=topic_name, title_source="manual")
|
|
|
|
if seeded_message:
|
|
await chat.send_message(seeded_message, message_thread_id=thread_id)
|
|
store.add_message(thread["id"], "assistant", seeded_message)
|
|
|
|
created_topics.append(f"{topic_name} (thread {thread_id})")
|
|
|
|
if invalid_names:
|
|
return "Created topics: " + ", ".join(created_topics) + ". Skipped: " + "; ".join(invalid_names)
|
|
return "Created topics: " + ", ".join(created_topics)
|
|
|
|
class RenameTopicParams(BaseModel):
|
|
topic_name: str
|
|
|
|
async def _rename_topic_handler(params: RenameTopicParams, invocation: ToolInvocation) -> str:
|
|
normalized_name = _normalize_topic_name(params.topic_name)
|
|
await _rename_forum_topic(update, normalized_name)
|
|
thread = _ensure_thread(update, context)
|
|
store.update_thread(thread["id"], title=normalized_name, title_source="manual")
|
|
return f"Renamed the current topic to {normalized_name}."
|
|
|
|
create_tool = define_tool(
|
|
name="telegram_create_topics",
|
|
description="Create one or more Telegram forum topics in the current chat. Use this when the user asks to start new topics, open one topic per item, or organize work into forum topics.",
|
|
handler=_create_topics_handler,
|
|
params_type=CreateTopicsParams,
|
|
)
|
|
rename_tool = define_tool(
|
|
name="telegram_rename_current_topic",
|
|
description="Rename the current Telegram forum topic. Use this when the user asks to rename the topic they are currently chatting in.",
|
|
handler=_rename_topic_handler,
|
|
params_type=RenameTopicParams,
|
|
)
|
|
|
|
class ProposePromptParams(BaseModel):
|
|
new_prompt: str
|
|
|
|
async def _propose_system_prompt(params: ProposePromptParams, invocation: ToolInvocation) -> str:
|
|
try:
|
|
normalized_prompt = _normalize_system_prompt_value(params.new_prompt)
|
|
except ValueError as error:
|
|
return f"❌ {error}"
|
|
|
|
session_key = _session_key(update)
|
|
current_record = store.get_topic_system_prompt(session_key)
|
|
old_prompt = current_record["prompt"] if current_record else ""
|
|
|
|
diff = generate_diff(old_prompt, normalized_prompt)
|
|
if not diff:
|
|
return "The proposed prompt is identical to the current one."
|
|
|
|
pending = context.application.bot_data.setdefault("pending_prompts", {})
|
|
pending[session_key] = normalized_prompt
|
|
|
|
keyboard = InlineKeyboardMarkup(
|
|
[
|
|
[
|
|
InlineKeyboardButton("✅ Approve", callback_data=f"prompt_app:yes:{session_key}"),
|
|
InlineKeyboardButton("❌ Decline", callback_data=f"prompt_app:no:{session_key}"),
|
|
]
|
|
]
|
|
)
|
|
|
|
chat = update.effective_chat
|
|
if chat is None:
|
|
raise RuntimeError("Telegram update missing chat context")
|
|
|
|
diff_text = _format_prompt_diff_message(diff)
|
|
|
|
send_kwargs: dict[str, Any] = {"reply_markup": keyboard}
|
|
thread_id = _effective_topic_thread_id(update)
|
|
if thread_id and not _is_general_topic(update):
|
|
send_kwargs["message_thread_id"] = thread_id
|
|
|
|
await chat.send_message(
|
|
diff_text,
|
|
**send_kwargs,
|
|
)
|
|
|
|
return "I have submitted the prompt change for approval."
|
|
|
|
propose_prompt_tool = define_tool(
|
|
name="propose_system_prompt",
|
|
description="Propose a change to the current topic's system prompt. This requires human approval via a diff message.",
|
|
handler=_propose_system_prompt,
|
|
params_type=ProposePromptParams,
|
|
)
|
|
|
|
return [create_tool, rename_tool, propose_prompt_tool]
|
|
|
|
|
|
async def _rename_forum_topic(update: Update, topic_name: str) -> None:
|
|
chat = update.effective_chat
|
|
message = update.effective_message
|
|
if chat is None or message is None:
|
|
raise RuntimeError("Telegram update missing chat context")
|
|
if not getattr(chat, "is_forum", False) or not getattr(message, "is_topic_message", False):
|
|
raise RuntimeError("This command only works inside a forum topic")
|
|
|
|
if _is_general_topic(update):
|
|
await chat.edit_general_forum_topic(name=topic_name)
|
|
return
|
|
|
|
thread_id = _effective_topic_thread_id(update)
|
|
if not thread_id:
|
|
raise RuntimeError("Could not determine the current forum topic")
|
|
await chat.edit_forum_topic(message_thread_id=thread_id, name=topic_name)
|
|
|
|
|
|
def _format_system_prompt(prompt: str) -> str:
|
|
compact = str(prompt or "").strip()
|
|
if len(compact) <= TG_MESSAGE_LIMIT:
|
|
return compact
|
|
return compact[: TG_MESSAGE_LIMIT - 3].rstrip() + "..."
|
|
|
|
|
|
def _topic_system_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str | None:
|
|
record = _store(context).get_topic_system_prompt(_session_key(update))
|
|
if not record:
|
|
return None
|
|
prompt = str(record.get("prompt") or "").strip()
|
|
return prompt or None
|
|
|
|
|
|
def _format_tg_recent_events(user) -> str | None:
|
|
"""Format recent events for injection into the Telegram system message (T028)."""
|
|
from datetime import datetime
|
|
from datetime import timezone as tz
|
|
|
|
from user_store import get_store as _us
|
|
|
|
store = _us()
|
|
events = store.get_recent_events(user.id, window_hours=24)
|
|
if not events:
|
|
return None
|
|
lines = []
|
|
now = datetime.now(tz.utc)
|
|
for ev in events[:15]:
|
|
try:
|
|
created = datetime.fromisoformat(ev.created_at)
|
|
delta = now - created
|
|
hours = int(delta.total_seconds() // 3600)
|
|
if hours < 1:
|
|
ago = f"{int(delta.total_seconds() // 60)}m ago"
|
|
elif hours < 24:
|
|
ago = f"{hours}h ago"
|
|
else:
|
|
ago = "yesterday"
|
|
except ValueError:
|
|
ago = "recently"
|
|
lines.append(f"- {ago}: {ev.summary}")
|
|
store.mark_events_consumed([ev.id for ev in events[:15]])
|
|
return "Recent activity for you:\n" + "\n".join(lines)
|
|
|
|
|
|
def _compose_system_message(
|
|
update: Update,
|
|
context: ContextTypes.DEFAULT_TYPE,
|
|
*,
|
|
background_summary: str | None = None,
|
|
user=None,
|
|
) -> str:
|
|
store = _store(context)
|
|
sections = [BASE_CONTEXT.rstrip(), SKILLS_CONTEXT.rstrip()]
|
|
|
|
# Integration tool prompt fragments
|
|
active_toolsets = active_toolset_names(user)
|
|
fragments = tool_registry.get_system_prompt_fragments(active_toolsets)
|
|
if fragments:
|
|
sections.extend(fragments)
|
|
|
|
topic_prompt = _topic_system_message(update, context)
|
|
if topic_prompt:
|
|
sections.append(f"Persistent topic instruction:\n{topic_prompt}")
|
|
|
|
extra_context = _telegram_agent_context(update)
|
|
if extra_context:
|
|
sections.append(extra_context.strip())
|
|
|
|
if background_summary:
|
|
sections.append(background_summary.strip())
|
|
|
|
# Inject learnings (global + project-scoped)
|
|
thread = _current_thread(update, context)
|
|
if thread:
|
|
project_learnings = store.get_project_learnings(thread["id"])
|
|
global_learnings = store.get_global_learnings()
|
|
learnings_text = format_learnings_for_prompt(project_learnings, global_learnings)
|
|
if learnings_text:
|
|
sections.append(learnings_text)
|
|
|
|
# Inject recent events (T028)
|
|
if user:
|
|
events_section = _format_tg_recent_events(user)
|
|
if events_section:
|
|
sections.append(events_section)
|
|
|
|
# Inject onboarding / capability status (T037)
|
|
if user:
|
|
onboarding = build_onboarding_fragment(user)
|
|
if onboarding:
|
|
sections.append(onboarding)
|
|
capability = build_capability_fragment(user)
|
|
if capability:
|
|
sections.append(capability)
|
|
|
|
return "\n\n".join(section for section in sections if section)
|
|
|
|
|
|
def _session_key(update: Update) -> str:
|
|
chat = update.effective_chat
|
|
if chat is None:
|
|
raise RuntimeError("Telegram update missing chat context")
|
|
thread_id = _effective_topic_thread_id(update)
|
|
if chat.type == "private":
|
|
return f"pm:{chat.id}"
|
|
if thread_id:
|
|
return f"topic:{chat.id}:{thread_id}"
|
|
return f"group:{chat.id}"
|
|
|
|
|
|
def _store(context: ContextTypes.DEFAULT_TYPE) -> WebFallbackStore:
|
|
store = getattr(context.application, "_runtime_store", None)
|
|
if not isinstance(store, WebFallbackStore):
|
|
raise RuntimeError("Telegram thread store is not configured")
|
|
return store
|
|
|
|
|
|
def _bg_manager(context: ContextTypes.DEFAULT_TYPE) -> BackgroundTaskManager:
|
|
mgr = getattr(context.application, "_runtime_bg_manager", None)
|
|
if not isinstance(mgr, BackgroundTaskManager):
|
|
raise RuntimeError("Background task manager is not configured")
|
|
return mgr
|
|
|
|
|
|
def _session_label(update: Update) -> str:
|
|
chat = update.effective_chat
|
|
if chat is None:
|
|
return "Telegram"
|
|
if chat.type == "private":
|
|
username = getattr(chat, "username", None)
|
|
if username:
|
|
return f"Telegram DM @{username}"
|
|
first_name = getattr(chat, "first_name", None) or ""
|
|
last_name = getattr(chat, "last_name", None) or ""
|
|
full_name = " ".join(part for part in [first_name, last_name] if part).strip()
|
|
return f"Telegram DM {full_name}" if full_name else "Telegram DM"
|
|
title = getattr(chat, "title", None) or f"chat {chat.id}"
|
|
if _effective_topic_thread_id(update):
|
|
return f"Telegram topic {title}"
|
|
return f"Telegram group {title}"
|
|
|
|
|
|
def _current_thread(update: Update, context: ContextTypes.DEFAULT_TYPE) -> dict | None:
|
|
return _store(context).get_session_thread(_session_key(update))
|
|
|
|
|
|
def _current_lock(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
thread = _current_thread(update, context)
|
|
if thread is None:
|
|
return None
|
|
return _store(context).lock(thread["id"])
|
|
|
|
|
|
def _get_selection(update: Update, context: ContextTypes.DEFAULT_TYPE) -> ModelSelection:
|
|
thread = _current_thread(update, context)
|
|
if thread is None:
|
|
return default_selection()
|
|
return resolve_selection(model=thread.get("model"), provider=thread.get("provider"))
|
|
|
|
|
|
def _ensure_thread(
|
|
update: Update,
|
|
context: ContextTypes.DEFAULT_TYPE,
|
|
*,
|
|
selection: ModelSelection | None = None,
|
|
) -> dict:
|
|
resolved_selection = selection or _get_selection(update, context)
|
|
return _store(context).get_or_create_session_thread(
|
|
session_key=_session_key(update),
|
|
title="New chat",
|
|
model=resolved_selection.model,
|
|
provider=resolved_selection.provider,
|
|
source="telegram",
|
|
session_label=_session_label(update),
|
|
)
|
|
|
|
|
|
def _apply_selection(update: Update, context: ContextTypes.DEFAULT_TYPE, selection: ModelSelection) -> None:
|
|
thread = _ensure_thread(update, context, selection=selection)
|
|
if thread.get("model") == selection.model and thread.get("provider") == selection.provider:
|
|
return
|
|
_store(context).update_thread(thread["id"], model=selection.model, provider=selection.provider)
|
|
|
|
|
|
def _skill_menu_markup() -> InlineKeyboardMarkup:
|
|
return InlineKeyboardMarkup(
|
|
[
|
|
[InlineKeyboardButton(label, callback_data=f"{SKILL_MENU_CALLBACK_PREFIX}{skill_name}")]
|
|
for skill_name, label in SKILL_MENU_ITEMS
|
|
]
|
|
)
|
|
|
|
|
|
def _render_skill_entry(skill_name: str) -> str:
|
|
skill_path = Path(settings.REPOS_DIR) / "code_anywhere" / ".agents" / "skills" / skill_name / "SKILL.md"
|
|
if not skill_path.exists():
|
|
return f"❌ Skill `{skill_name}` is not available."
|
|
|
|
body = skill_path.read_text("utf-8").strip()
|
|
return f"*Skill:* `{skill_name}`\n*Path:* `{skill_path}`\n\n```md\n{body}\n```"
|
|
|
|
|
|
async def cmd_skills(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None:
|
|
return
|
|
await message.reply_text(
|
|
"Choose a skill to inspect. The bot can also invoke matching skills during a run.",
|
|
reply_markup=_skill_menu_markup(),
|
|
)
|
|
|
|
|
|
async def handle_skill_menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
query = update.callback_query
|
|
if not query or not query.data:
|
|
return
|
|
await query.answer()
|
|
|
|
skill_name = query.data.removeprefix(SKILL_MENU_CALLBACK_PREFIX)
|
|
rendered = _render_skill_entry(skill_name)
|
|
await _send_long(update, rendered)
|
|
|
|
|
|
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None:
|
|
return
|
|
await message.reply_text(TELEGRAM_START_MESSAGE)
|
|
|
|
|
|
async def cmd_alias(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None or not message.text:
|
|
return
|
|
|
|
args = message.text.split(maxsplit=2)
|
|
if len(args) < 3:
|
|
await message.reply_text(_format_user_aliases(context, update.effective_user), parse_mode="Markdown")
|
|
return
|
|
|
|
alias_name = args[1].strip()
|
|
target_model = args[2].strip()
|
|
current = _get_selection(update, context)
|
|
|
|
try:
|
|
selection = resolve_selection(model=target_model, current=current)
|
|
normalized_alias, _ = _set_user_model_alias(context, update.effective_user, alias_name, selection.ref)
|
|
except (ModelSelectionError, ValueError) as error:
|
|
await message.reply_text(f"❌ {error}")
|
|
return
|
|
|
|
# Force PTB to flush bot_data to disk immediately
|
|
try:
|
|
await context.application.update_persistence()
|
|
except Exception:
|
|
logger.warning("Failed to flush persistence after alias set", exc_info=True)
|
|
|
|
await message.reply_text(
|
|
f"Saved alias `{normalized_alias}` → `{selection.ref}`",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
|
|
async def cmd_alias_export(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
user = update.effective_user
|
|
if message is None or user is None:
|
|
return
|
|
|
|
aliases = _user_aliases(context, user)
|
|
payload = {
|
|
"user_id": user.id,
|
|
"username": user.username or "",
|
|
"aliases": dict(sorted(aliases.items())),
|
|
}
|
|
data = json.dumps(payload, indent=2, sort_keys=True).encode("utf-8")
|
|
export_name = f"aliases-{user.id}.json"
|
|
await message.reply_document(
|
|
document=InputFile(BytesIO(data), filename=export_name),
|
|
caption=f"Alias backup with {len(aliases)} entr{'y' if len(aliases) == 1 else 'ies'}.",
|
|
)
|
|
|
|
|
|
async def cmd_model(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None or not message.text:
|
|
return
|
|
|
|
args = message.text.split(maxsplit=1)
|
|
current = _get_selection(update, context)
|
|
if len(args) < 2:
|
|
await message.reply_text(
|
|
(
|
|
f"{format_selection(current)}\n"
|
|
"Usage: /model gpt-5.4-mini\n"
|
|
"Usage: /model openrouter:anthropic/claude-sonnet-4.5"
|
|
),
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
requested_model = args[1].strip()
|
|
resolved_model, alias_name = _resolve_user_model_alias(context, update.effective_user, requested_model)
|
|
try:
|
|
selection = resolve_selection(model=resolved_model, current=current)
|
|
except ModelSelectionError as error:
|
|
await message.reply_text(f"❌ {error}")
|
|
return
|
|
|
|
_apply_selection(update, context, selection)
|
|
alias_note = f" via alias `{alias_name}`" if alias_name else ""
|
|
await message.reply_text(
|
|
f"Switched to provider `{selection.provider}` with model `{selection.model}`{alias_note}", parse_mode="Markdown"
|
|
)
|
|
|
|
|
|
async def cmd_provider(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None or not message.text:
|
|
return
|
|
|
|
current = _get_selection(update, context)
|
|
args = message.text.split(maxsplit=1)
|
|
if len(args) < 2:
|
|
await message.reply_text(
|
|
(f"Current provider: `{current.provider}`\nUsage: /provider openai\nUsage: /provider openrouter"),
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
try:
|
|
selection = resolve_selection(model=current.model, provider=args[1].strip(), current=current)
|
|
except ModelSelectionError as error:
|
|
await message.reply_text(f"❌ {error}")
|
|
return
|
|
|
|
_apply_selection(update, context, selection)
|
|
await message.reply_text(
|
|
f"Provider switched to `{selection.provider}`. Active model: `{selection.model}`", parse_mode="Markdown"
|
|
)
|
|
|
|
|
|
async def cmd_models(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None or not message.text:
|
|
return
|
|
args = message.text.split(maxsplit=1)
|
|
requested_provider = args[1].strip() if len(args) > 1 else None
|
|
|
|
try:
|
|
rendered = format_known_models(current=_get_selection(update, context), provider=requested_provider)
|
|
except ModelSelectionError as error:
|
|
await message.reply_text(f"❌ {error}")
|
|
return
|
|
|
|
await message.reply_text(rendered, parse_mode="Markdown")
|
|
|
|
|
|
async def cmd_system(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None or message.text is None:
|
|
return
|
|
lock = _current_lock(update, context)
|
|
if lock is not None and lock.locked():
|
|
await message.reply_text(busy_message(surface="telegram"))
|
|
return
|
|
|
|
store = _store(context)
|
|
args = message.text.split(maxsplit=1)
|
|
if len(args) < 2 or not args[1].strip():
|
|
current = store.get_topic_system_prompt(_session_key(update))
|
|
if not current:
|
|
await message.reply_text("No persistent system instruction is saved for this chat/topic.")
|
|
return
|
|
await _send_long(
|
|
update, "Current persistent system instruction:\n\n" + _format_system_prompt(current["prompt"])
|
|
)
|
|
return
|
|
|
|
raw_value = args[1].strip()
|
|
if raw_value.lower() == "clear":
|
|
cleared = store.clear_topic_system_prompt(_session_key(update))
|
|
if cleared:
|
|
await message.reply_text("Cleared the persistent system instruction for this chat/topic.")
|
|
else:
|
|
await message.reply_text("No persistent system instruction was set for this chat/topic.")
|
|
return
|
|
|
|
try:
|
|
normalized_prompt = _normalize_system_prompt_value(raw_value)
|
|
except ValueError as error:
|
|
await message.reply_text(f"❌ {error}")
|
|
return
|
|
|
|
store.set_topic_system_prompt(_session_key(update), normalized_prompt)
|
|
await _send_long(
|
|
update,
|
|
"Saved persistent system instruction for this chat/topic. It will be reused after /new.\n\n"
|
|
+ _format_system_prompt(normalized_prompt),
|
|
)
|
|
|
|
|
|
async def cmd_new(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None:
|
|
return
|
|
|
|
selection = _get_selection(update, context)
|
|
_store(context).start_new_session_thread(
|
|
session_key=_session_key(update),
|
|
title="New chat",
|
|
model=selection.model,
|
|
provider=selection.provider,
|
|
source="telegram",
|
|
session_label=_session_label(update),
|
|
)
|
|
await message.reply_text("Started a new session. Older sessions stay available in the web UI.")
|
|
|
|
|
|
async def cmd_current(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None:
|
|
return
|
|
await message.reply_text(format_selection(_get_selection(update, context)), parse_mode="Markdown")
|
|
|
|
|
|
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None:
|
|
return
|
|
thread = _current_thread(update, context)
|
|
if thread is None:
|
|
await message.reply_text("No active session in this chat.")
|
|
return
|
|
|
|
store = _store(context)
|
|
bg_manager = _bg_manager(context)
|
|
active_runs = _active_runs(context)
|
|
current_run = active_runs.get(thread["id"])
|
|
all_threads = store.list_threads(limit=500)
|
|
|
|
current_session_key = _session_key(update)
|
|
current_label = thread.get("session_label") or _session_label(update)
|
|
active_thread_ids = set(active_runs.keys())
|
|
active_threads = [item for item in all_threads if item.get("id") in active_thread_ids]
|
|
current_chat_threads = [item for item in all_threads if item.get("session_key") == current_session_key]
|
|
recent_threads = all_threads[:5]
|
|
|
|
status_lines = [
|
|
"📊 *System Status*",
|
|
f"Active runs now: `{len(active_runs)}`",
|
|
f"Saved sessions: `{len(all_threads)}`",
|
|
f"Current session: *{helpers.escape_markdown(str(current_label), version=2)}*",
|
|
f"Current model: `{helpers.escape_markdown(thread.get('model') or 'unknown', version=2)}` via `{helpers.escape_markdown(thread.get('provider') or 'unknown', version=2)}`",
|
|
f"Messages in current session: `{len(thread.get('messages', []))}`",
|
|
f"Current chat/topic sessions: `{len(current_chat_threads)}`",
|
|
]
|
|
|
|
if current_run:
|
|
started_by = str(current_run.get("started_by") or "").strip()
|
|
if started_by:
|
|
preview_source = started_by.replace(chr(10), " ")
|
|
preview = preview_source[:120]
|
|
if len(preview_source) > 120:
|
|
preview += "..."
|
|
status_lines.append(f"Current run prompt: _{helpers.escape_markdown(preview, version=2)}_")
|
|
status_lines.append("Current session state: `busy`")
|
|
else:
|
|
status_lines.append("Current session state: `idle`")
|
|
|
|
status_lines.extend(
|
|
[
|
|
"",
|
|
"🏃 *Active Runs*",
|
|
f"Concurrent active runs: `{len(active_threads)}`",
|
|
]
|
|
)
|
|
|
|
if active_threads:
|
|
for item in active_threads[:5]:
|
|
title = item.get("title") or item.get("session_label") or item.get("id") or "Untitled"
|
|
marker = " (here)" if item.get("id") == thread["id"] else ""
|
|
status_lines.append(f"• {helpers.escape_markdown(str(title), version=2)}{marker}")
|
|
if len(active_threads) > 5:
|
|
status_lines.append(f"• _and {len(active_threads) - 5} more active run(s)_")
|
|
else:
|
|
status_lines.append("No active runs.")
|
|
|
|
status_lines.extend(
|
|
[
|
|
"",
|
|
"🕒 *Background Tasks*",
|
|
helpers.escape_markdown(bg_manager.format_status(thread["id"]), version=2),
|
|
]
|
|
)
|
|
|
|
latest_bg = bg_manager.get_latest(thread["id"])
|
|
if latest_bg and latest_bg.started_at:
|
|
status_lines.append(f"Last background update: `{latest_bg.started_at.strftime('%Y-%m-%d %H:%M:%SZ')}`")
|
|
|
|
if recent_threads:
|
|
status_lines.extend(
|
|
[
|
|
"",
|
|
"🗂️ *Recent Sessions*",
|
|
]
|
|
)
|
|
for item in recent_threads:
|
|
title = item.get("title") or item.get("session_label") or item.get("id") or "Untitled"
|
|
updated_at = str(item.get("updated_at") or "")
|
|
marker = " (here)" if item.get("id") == thread["id"] else ""
|
|
line = f"• {helpers.escape_markdown(str(title), version=2)}"
|
|
if updated_at:
|
|
line += f" — `{helpers.escape_markdown(updated_at, version=2)}`"
|
|
line += marker
|
|
status_lines.append(line)
|
|
|
|
await message.reply_text("\n".join(status_lines), parse_mode="MarkdownV2")
|
|
|
|
|
|
async def cmd_usage(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None:
|
|
return
|
|
thread = _current_thread(update, context)
|
|
if thread is None:
|
|
await message.reply_text("No usage yet in this chat.")
|
|
return
|
|
usage = thread.get("usage") if isinstance(thread, dict) else None
|
|
summary = summarize_usage(usage)
|
|
if not summary:
|
|
await message.reply_text("No usage recorded yet in this chat.")
|
|
return
|
|
|
|
lines = [
|
|
f"Usage so far: {summary['total_tokens']:,} tok across {summary['request_count']} request(s)",
|
|
f"Input: {summary['prompt_tokens']:,} · Output: {summary['completion_tokens']:,}",
|
|
]
|
|
if summary["cached_tokens"]:
|
|
lines.append(f"Cached: {summary['cached_tokens']:,}")
|
|
if summary["reasoning_tokens"]:
|
|
lines.append(f"Reasoning: {summary['reasoning_tokens']:,}")
|
|
lines.append(f"Cost: ${format_cost_value(summary['cost_usd'])}")
|
|
await message.reply_text("\n".join(lines))
|
|
|
|
|
|
async def cmd_learnings(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Show project and global learnings for this thread."""
|
|
message = update.effective_message
|
|
if message is None:
|
|
return
|
|
|
|
store = _store(context)
|
|
thread = _current_thread(update, context)
|
|
|
|
lines = []
|
|
|
|
# Global learnings
|
|
global_learnings = store.get_global_learnings()
|
|
if global_learnings:
|
|
lines.append("🌐 *Global Learnings*")
|
|
for item in global_learnings:
|
|
cat = helpers.escape_markdown(str(item.get("category", "general")), version=2)
|
|
fact = helpers.escape_markdown(str(item.get("fact", "")), version=2)
|
|
lines.append(f" \\[{cat}] {fact}")
|
|
|
|
# Project learnings
|
|
if thread:
|
|
project_learnings = store.get_project_learnings(thread["id"])
|
|
if project_learnings:
|
|
lines.append("")
|
|
lines.append("📁 *Project Learnings*")
|
|
for item in project_learnings:
|
|
cat = helpers.escape_markdown(str(item.get("category", "general")), version=2)
|
|
fact = helpers.escape_markdown(str(item.get("fact", "")), version=2)
|
|
lines.append(f" \\[{cat}] {fact}")
|
|
|
|
if not lines:
|
|
await message.reply_text("No learnings recorded yet.")
|
|
return
|
|
|
|
await message.reply_text("\n".join(lines), parse_mode="MarkdownV2")
|
|
|
|
|
|
async def cmd_stop(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None:
|
|
return
|
|
thread, run_state = _get_active_run(update, context)
|
|
if thread is None:
|
|
await message.reply_text("No active session in this chat.")
|
|
return
|
|
if not isinstance(run_state, dict):
|
|
await message.reply_text("Nothing is currently running here.")
|
|
return
|
|
stop_event = run_state.get("stop_event")
|
|
if stop_event is None:
|
|
await message.reply_text("This run cannot be stopped cleanly.")
|
|
return
|
|
stop_event.set()
|
|
await message.reply_text("Stopping the current run…")
|
|
|
|
|
|
async def handle_prompt_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
query = update.callback_query
|
|
await query.answer()
|
|
|
|
data = query.data.split(":", 2)
|
|
if len(data) < 3 or data[0] != "prompt_app":
|
|
return
|
|
|
|
approved = data[1] == "yes"
|
|
session_key = data[2]
|
|
|
|
pending = context.application.bot_data.get("pending_prompts", {})
|
|
new_prompt = pending.get(session_key)
|
|
|
|
if not new_prompt:
|
|
try:
|
|
await query.edit_message_text("❌ Error: Pending prompt not found or expired.")
|
|
except (RetryAfter, TelegramError):
|
|
pass
|
|
return
|
|
|
|
if approved:
|
|
store = _store(context)
|
|
try:
|
|
store.set_topic_system_prompt(session_key, new_prompt)
|
|
except ValueError as error:
|
|
try:
|
|
await query.edit_message_text(f"❌ Error: {error}")
|
|
except (RetryAfter, TelegramError):
|
|
pass
|
|
else:
|
|
try:
|
|
await query.edit_message_text("✅ System prompt updated successfully!")
|
|
except (RetryAfter, TelegramError):
|
|
pass
|
|
else:
|
|
try:
|
|
await query.edit_message_text("❌ System prompt change declined.")
|
|
except (RetryAfter, TelegramError):
|
|
pass
|
|
|
|
# Cleanup pending
|
|
pending.pop(session_key, None)
|
|
context.application.bot_data["pending_prompts"] = pending
|
|
|
|
|
|
async def cmd_topic(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None or not message.text:
|
|
return
|
|
chat = update.effective_chat
|
|
|
|
args = message.text.split(maxsplit=1)
|
|
if len(args) < 2:
|
|
await message.reply_text("Usage: /topic New topic name")
|
|
return
|
|
|
|
if chat is None or not getattr(chat, "is_forum", False) or not getattr(message, "is_topic_message", False):
|
|
await message.reply_text("❌ This command only works inside a forum topic")
|
|
return
|
|
|
|
try:
|
|
topic_name = _normalize_topic_name(args[1])
|
|
except ValueError as error:
|
|
await message.reply_text(f"❌ {error}")
|
|
return
|
|
|
|
try:
|
|
thread = _ensure_thread(update, context)
|
|
except RuntimeError as error:
|
|
await message.reply_text(f"❌ {error}")
|
|
return
|
|
|
|
try:
|
|
await _rename_forum_topic(update, topic_name)
|
|
except (RuntimeError, TelegramError) as error:
|
|
await message.reply_text(f"❌ {error}")
|
|
return
|
|
|
|
_store(context).update_thread(thread["id"], title=topic_name, title_source="manual")
|
|
await message.reply_text(f"Renamed this topic to: {topic_name}")
|
|
|
|
|
|
async def cmd_newtopic(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Create a new forum topic in the current group."""
|
|
message = update.effective_message
|
|
if message is None or not message.text:
|
|
return
|
|
chat = update.effective_chat
|
|
|
|
args = message.text.split(maxsplit=1)
|
|
if len(args) < 2:
|
|
await message.reply_text("Usage: /newtopic Topic name")
|
|
return
|
|
|
|
if chat is None or not getattr(chat, "is_forum", False):
|
|
await message.reply_text("❌ This command only works in a forum supergroup")
|
|
return
|
|
|
|
try:
|
|
topic_name = _normalize_topic_name(args[1])
|
|
except ValueError as error:
|
|
await message.reply_text(f"❌ {error}")
|
|
return
|
|
|
|
try:
|
|
forum_topic = await chat.create_forum_topic(name=topic_name)
|
|
thread_id = int(forum_topic.message_thread_id)
|
|
selection = _get_selection(update, context)
|
|
store = _store(context)
|
|
store.get_or_create_session_thread(
|
|
session_key=_topic_session_key(chat.id, thread_id),
|
|
title=topic_name,
|
|
model=selection.model,
|
|
provider=selection.provider,
|
|
source="telegram",
|
|
session_label=f"Telegram topic {topic_name}",
|
|
)
|
|
await message.reply_text(f"Created new topic: {topic_name}")
|
|
except TelegramError as error:
|
|
await message.reply_text(f"❌ Failed to create topic: {error}")
|
|
except RuntimeError as error:
|
|
await message.reply_text(f"❌ {error}")
|
|
|
|
|
|
def _build_user_parts(text: str, uploads: list[dict]) -> list[dict]:
|
|
parts: list[dict] = []
|
|
if text:
|
|
parts.append({"type": "input_text", "text": text})
|
|
|
|
for upload in uploads:
|
|
parts.append(
|
|
{
|
|
"type": "input_image",
|
|
"file_id": upload["id"],
|
|
"name": upload.get("name") or "image",
|
|
"mime_type": upload.get("mime_type") or "image/jpeg",
|
|
"detail": "auto",
|
|
}
|
|
)
|
|
|
|
return parts
|
|
|
|
|
|
def _build_background_tools(
|
|
update: Update,
|
|
context: ContextTypes.DEFAULT_TYPE,
|
|
thread: dict,
|
|
selection: ModelSelection,
|
|
system_message: str,
|
|
) -> list[Any]:
|
|
"""Build tools that let the LLM spawn and query background agents."""
|
|
store = _store(context)
|
|
bg = _bg_manager(context)
|
|
bot = context.bot
|
|
chat_id = update.effective_chat.id
|
|
tg_thread_id = _effective_topic_thread_id(update)
|
|
is_general = _is_general_topic(update)
|
|
|
|
class StartBgParams(BaseModel):
|
|
task: str = Field(description="Detailed description of what the background agent should do.")
|
|
|
|
async def _start_bg_handler(params: StartBgParams, invocation: ToolInvocation) -> str:
|
|
active = bg.get_active(thread["id"])
|
|
if active:
|
|
return f"A background agent is already running: {active.description}. Wait for it to finish first."
|
|
|
|
async def _on_complete(bg_task) -> None:
|
|
output = bg_task.result or f"Background agent failed: {bg_task.error}"
|
|
store.add_message(thread["id"], "assistant", f"[Background agent]\n\n{output}")
|
|
# Track background agent usage
|
|
if bg_task.usage:
|
|
store.add_usage(thread["id"], bg_task.usage)
|
|
kwargs: dict[str, Any] = {}
|
|
if tg_thread_id and not is_general:
|
|
kwargs["message_thread_id"] = tg_thread_id
|
|
# Split long results across messages
|
|
prefix = "🔄 Background agent finished:\n\n"
|
|
for i in range(0, len(output), TG_MESSAGE_LIMIT - len(prefix)):
|
|
chunk = output[i : i + TG_MESSAGE_LIMIT - len(prefix)]
|
|
text = (prefix + chunk) if i == 0 else chunk
|
|
await bot.send_message(chat_id, text, **kwargs)
|
|
|
|
bg.start(
|
|
thread_id=thread["id"],
|
|
description=params.task,
|
|
selection=selection,
|
|
system_message=system_message,
|
|
on_complete=_on_complete,
|
|
)
|
|
return "Background agent started. Results will be sent to this chat when done."
|
|
|
|
class CheckStatusParams(BaseModel):
|
|
pass
|
|
|
|
async def _check_status_handler(params: CheckStatusParams, invocation: ToolInvocation) -> str:
|
|
return bg.format_status(thread["id"])
|
|
|
|
start_tool = define_tool(
|
|
"ask_agent",
|
|
description=(
|
|
"Start a background agent for a long-running task (research, multi-file changes, deep investigation). "
|
|
"The agent runs in a separate session and posts results to the chat when done. "
|
|
"Only one background agent per thread at a time."
|
|
),
|
|
handler=_start_bg_handler,
|
|
params_type=StartBgParams,
|
|
)
|
|
status_tool = define_tool(
|
|
"check_agent",
|
|
description="Check the status of the background agent in this thread.",
|
|
handler=_check_status_handler,
|
|
params_type=CheckStatusParams,
|
|
)
|
|
return [start_tool, status_tool]
|
|
|
|
|
|
# Module-level backoff tracker: tracks the earliest time we're allowed to
|
|
# edit a Telegram message after hitting flood control.
|
|
_flood_backoff_until: float = 0.0
|
|
|
|
|
|
async def _edit_status_message(message: Any, text: str) -> None:
|
|
"""Edit a Telegram status message, respecting flood-control backoff."""
|
|
global _flood_backoff_until
|
|
loop = asyncio.get_running_loop()
|
|
now = loop.time()
|
|
|
|
# If we're still in a backoff window, skip this edit entirely.
|
|
if now < _flood_backoff_until:
|
|
return
|
|
|
|
try:
|
|
await message.edit_text(text)
|
|
except RetryAfter as exc:
|
|
# Telegram says "retry in N seconds" — honour it and add a small buffer.
|
|
wait = max(float(exc.retry_after), 1.0) + 0.5
|
|
_flood_backoff_until = loop.time() + wait
|
|
logger.warning("Telegram flood control: backing off %.1fs", wait)
|
|
except TelegramError:
|
|
pass
|
|
|
|
|
|
async def _run_user_turn(
|
|
update: Update,
|
|
context: ContextTypes.DEFAULT_TYPE,
|
|
*,
|
|
text: str,
|
|
uploads: list[dict] | None = None,
|
|
) -> None:
|
|
message = update.effective_message
|
|
if message is None:
|
|
return
|
|
|
|
normalized_text = str(text or "").strip()
|
|
normalized_uploads = uploads or []
|
|
if not normalized_text and not normalized_uploads:
|
|
return
|
|
|
|
# Resolve user identity (T019)
|
|
from user_store import get_store as _get_user_store
|
|
|
|
tg_user = update.effective_user
|
|
if tg_user:
|
|
user_store = _get_user_store()
|
|
current_user = user_store.resolve_or_create_user(
|
|
provider="telegram",
|
|
external_id=str(tg_user.id),
|
|
display_name=tg_user.full_name or str(tg_user.id),
|
|
)
|
|
else:
|
|
current_user = None
|
|
|
|
store = _store(context)
|
|
selection = _get_selection(update, context)
|
|
thread = _ensure_thread(update, context, selection=selection)
|
|
lock = store.lock(thread["id"])
|
|
if lock.locked():
|
|
await message.reply_text(busy_message(surface="telegram"))
|
|
return
|
|
|
|
async with lock:
|
|
thinking = await message.reply_text(working_message(surface="telegram"))
|
|
latest_status = working_message(surface="telegram")
|
|
tool_counts: dict[str, int] = {}
|
|
stop_status = asyncio.Event()
|
|
stop_run = asyncio.Event()
|
|
run_state = {"stop_event": stop_run, "status_message": thinking, "started_by": normalized_text}
|
|
_register_active_run(context, thread["id"], run_state)
|
|
|
|
# Grab the running session cost so far (before this turn)
|
|
try:
|
|
_pre_usage = store.get_total_usage(thread["id"])
|
|
except Exception:
|
|
_pre_usage = {}
|
|
_pre_cost = Decimal(str(_pre_usage.get("cost_usd", "0") or "0"))
|
|
_pre_tokens = int(_pre_usage.get("total_tokens", 0) or 0)
|
|
|
|
def _cost_line() -> str:
|
|
"""Format session cost line for the status bubble.
|
|
|
|
Computes a *live* running cost from collected_events so far
|
|
(which accumulate ASSISTANT_USAGE events during streaming),
|
|
adds it to the pre-existing session cost, and returns the
|
|
combined figure. This keeps the 💰 line fresh as tool calls
|
|
stream in, rather than showing a stale number.
|
|
"""
|
|
try:
|
|
turn_usage = extract_usage_and_cost(
|
|
selection.model,
|
|
selection.provider,
|
|
collected_events,
|
|
)
|
|
except Exception:
|
|
turn_usage = {}
|
|
turn_cost = Decimal(str(turn_usage.get("cost_usd", "0") or "0"))
|
|
turn_tokens = int(turn_usage.get("total_tokens", 0) or 0)
|
|
cost = _pre_cost + turn_cost
|
|
tokens = _pre_tokens + turn_tokens
|
|
if not tokens and not cost:
|
|
return ""
|
|
return f"💰 Session: ${cost:.8f} · {tokens:,} tok"
|
|
|
|
status_changed = asyncio.Event()
|
|
pending_statuses: list[str] = []
|
|
pending_status_keys: set[str] = set()
|
|
turn_started_at = asyncio.get_running_loop().time()
|
|
|
|
def _queue_status(text: str | None) -> None:
|
|
nonlocal latest_status
|
|
|
|
normalized = str(text or "").strip()
|
|
if not normalized:
|
|
return
|
|
|
|
latest_status = normalized
|
|
dedupe_key = normalized.casefold()
|
|
if dedupe_key in pending_status_keys:
|
|
return
|
|
|
|
pending_status_keys.add(dedupe_key)
|
|
pending_statuses.append(normalized)
|
|
status_changed.set()
|
|
|
|
def _drain_status_batch() -> str:
|
|
if not pending_statuses:
|
|
return latest_status
|
|
|
|
batch: str = "...\n".join(pending_statuses)
|
|
pending_statuses.clear()
|
|
pending_status_keys.clear()
|
|
return batch
|
|
|
|
async def _status_loop() -> None:
|
|
"""Edit the Telegram placeholder when status changes, throttled to avoid rate-limits."""
|
|
last_sent = ""
|
|
last_edit = 0.0
|
|
last_elapsed_prefix = ""
|
|
loop = asyncio.get_running_loop()
|
|
while not stop_status.is_set():
|
|
if not pending_statuses:
|
|
status_changed.clear()
|
|
status_wait = asyncio.create_task(status_changed.wait())
|
|
stop_wait = asyncio.create_task(stop_status.wait())
|
|
pending_waits = {status_wait, stop_wait}
|
|
wait_timeout = (
|
|
None if last_edit == 0.0 else max(0.0, TG_STATUS_EDIT_INTERVAL - (loop.time() - last_edit))
|
|
)
|
|
try:
|
|
done, _ = await asyncio.wait(
|
|
pending_waits,
|
|
timeout=wait_timeout,
|
|
return_when=asyncio.FIRST_COMPLETED,
|
|
)
|
|
if stop_wait in done or stop_status.is_set():
|
|
break
|
|
finally:
|
|
for task in pending_waits:
|
|
if not task.done():
|
|
task.cancel()
|
|
await asyncio.gather(*pending_waits, return_exceptions=True)
|
|
|
|
elapsed = loop.time() - last_edit
|
|
if pending_statuses and last_edit and elapsed < TG_STATUS_EDIT_INTERVAL:
|
|
try:
|
|
await asyncio.wait_for(stop_status.wait(), timeout=TG_STATUS_EDIT_INTERVAL - elapsed)
|
|
break
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
|
|
# If flood backoff is active, wait it out before attempting an edit.
|
|
backoff_remaining = _flood_backoff_until - loop.time()
|
|
if backoff_remaining > 0:
|
|
try:
|
|
await asyncio.wait_for(stop_status.wait(), timeout=backoff_remaining)
|
|
break
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
|
|
display = format_tool_counts(tool_counts, current_status=_drain_status_batch())
|
|
display = append_elapsed_status(display, elapsed_seconds=loop.time() - turn_started_at)
|
|
cost = _cost_line()
|
|
if cost:
|
|
display = f"{display}\n{cost}" if display else cost
|
|
|
|
# Skip edit if only the elapsed timer changed (round to 10s to
|
|
# avoid trivial edits that trigger rate-limits during idle runs).
|
|
# Build a comparison key that ignores the exact seconds count.
|
|
elapsed_now = int(loop.time() - turn_started_at)
|
|
elapsed_bucket = elapsed_now // 10 # only changes every 10s
|
|
content_key = (display.split("\n")[0] if display else "", cost, elapsed_bucket)
|
|
if content_key == last_elapsed_prefix and not pending_statuses:
|
|
continue
|
|
|
|
if display and display != last_sent:
|
|
await _edit_status_message(thinking, display)
|
|
last_sent = display
|
|
last_edit = loop.time()
|
|
last_elapsed_prefix = content_key
|
|
|
|
status_task = asyncio.create_task(_status_loop())
|
|
try:
|
|
store.add_message(
|
|
thread["id"],
|
|
"user",
|
|
normalized_text,
|
|
parts=_build_user_parts(normalized_text, normalized_uploads),
|
|
)
|
|
# Log to identity-linked conversation store (T024)
|
|
_us_session = None
|
|
if current_user:
|
|
_us = _get_user_store()
|
|
_topic = str(_effective_topic_thread_id(update) or "")
|
|
_us_session = _us.get_or_create_session(current_user.id, "telegram", topic_id=_topic or None)
|
|
_us.log_message(_us_session.id, "user", normalized_text)
|
|
_queue_status(f"Inspecting request in {_session_label(update)}")
|
|
|
|
history = store.build_agent_history(thread["id"], limit=MAX_HISTORY * 2)
|
|
prompt = format_prompt_with_history(history, normalized_text)
|
|
|
|
# Build BlobAttachment dicts directly from the current Telegram uploads so
|
|
# the SDK receives the image bytes for this turn regardless of how prompt
|
|
# formatting collapses history into text.
|
|
current_attachments: list[dict[str, Any]] | None = None
|
|
if normalized_uploads and not selection.likely_supports_vision:
|
|
_queue_status(f"⚠️ {selection.model} doesn't support images — sending text only")
|
|
normalized_uploads = []
|
|
if normalized_uploads:
|
|
blob_attachments: list[dict[str, Any]] = []
|
|
for upload in normalized_uploads:
|
|
try:
|
|
data_url = store._media_store.build_data_url(upload["id"])
|
|
except KeyError:
|
|
continue
|
|
m = re.match(r"data:([^;]+);base64,(.+)", data_url, re.DOTALL)
|
|
if not m:
|
|
continue
|
|
blob_attachments.append(
|
|
{
|
|
"type": "blob",
|
|
"mimeType": m.group(1),
|
|
"data": m.group(2),
|
|
"displayName": str(upload.get("name") or "image.jpg"),
|
|
}
|
|
)
|
|
if blob_attachments:
|
|
current_attachments = blob_attachments
|
|
|
|
topic_tools = _build_telegram_topic_tools(update, context)
|
|
system_message = _compose_system_message(update, context, user=current_user)
|
|
|
|
bg_tools = _build_background_tools(update, context, thread, selection, system_message)
|
|
bg_context = _bg_manager(context).context_summary(thread["id"])
|
|
if bg_context:
|
|
system_message = _compose_system_message(
|
|
update, context, background_summary=bg_context, user=current_user
|
|
)
|
|
|
|
all_tools = (
|
|
(topic_tools or [])
|
|
+ bg_tools
|
|
+ build_integration_tools(current_user, thread_id=thread["id"], system_prompt=system_message)
|
|
)
|
|
|
|
latest_status = "Thinking ..."
|
|
collected_events: list = []
|
|
|
|
async for event in stream_session(
|
|
copilot,
|
|
model=selection.model,
|
|
provider_config=build_provider_config(selection),
|
|
system_message=system_message,
|
|
prompt=prompt,
|
|
tools=all_tools or None,
|
|
attachments=current_attachments,
|
|
thread_id=thread["id"],
|
|
):
|
|
if stop_run.is_set():
|
|
raise asyncio.CancelledError("Stopped by /stop")
|
|
|
|
collected_events.append(event)
|
|
|
|
# Handle session errors
|
|
if event.type == SessionEventType.SESSION_ERROR:
|
|
error_msg: str = (
|
|
event.data.message
|
|
if event.data and isinstance(event.data.message, str)
|
|
else "Unknown session error"
|
|
)
|
|
raise RuntimeError(f"Session error: {error_msg}")
|
|
|
|
# Track tool calls for collapsed count display
|
|
trace = stream_trace_event(event)
|
|
if trace:
|
|
tool_name = trace.get("tool_name") or ""
|
|
category = trace.get("category", "")
|
|
if category == "tool_call" and tool_name:
|
|
# Don't count report_intent in tool counts
|
|
if tool_name != "report_intent" and not trace.get("output_detail"):
|
|
tool_counts[tool_name] = tool_counts.get(tool_name, 0) + 1
|
|
elif category == "subagent":
|
|
tool_counts["handoffs"] = tool_counts.get("handoffs", 0) + 1
|
|
|
|
# Update current status text from stream
|
|
status_updates = stream_status_updates(event)
|
|
if status_updates:
|
|
for status_text in status_updates:
|
|
_queue_status(status_text)
|
|
elif trace:
|
|
status_changed.set()
|
|
|
|
_queue_status("Sending final reply")
|
|
reply = extract_final_text(collected_events)
|
|
if not reply:
|
|
reply = (
|
|
"I finished that run but did not get a usable final reply back. "
|
|
"Please send the request again, or narrow it to one repo, file, or command."
|
|
)
|
|
usage = extract_usage_and_cost(
|
|
selection.model, selection.provider, collected_events,
|
|
advisor_usage=_get_advisor_usage(thread["id"]),
|
|
)
|
|
total_usage = store.add_usage(thread["id"], usage)
|
|
cost_footer = ""
|
|
if total_usage:
|
|
total_cost = total_usage.get("cost_usd", "0")
|
|
total_tokens = total_usage.get("total_tokens", 0)
|
|
cost_footer = f"\n\n---\nTotal Session Cost: ${total_cost}\nTotal Session Tokens: {total_tokens:,}"
|
|
|
|
stop_status.set()
|
|
# await safe_delete_message(thinking)
|
|
store.add_message(thread["id"], "assistant", reply)
|
|
if _us_session and current_user:
|
|
_get_user_store().log_message(_us_session.id, "assistant", reply)
|
|
|
|
await _send_long(update, reply + cost_footer)
|
|
except asyncio.CancelledError:
|
|
stop_status.set()
|
|
# await safe_delete_message(thinking)
|
|
partial_usage = extract_usage_and_cost(
|
|
selection.model, selection.provider, collected_events,
|
|
advisor_usage=_get_advisor_usage(thread["id"]),
|
|
)
|
|
if (
|
|
partial_usage.get("request_count")
|
|
or partial_usage.get("total_tokens")
|
|
or partial_usage.get("cost_usd") != "0"
|
|
):
|
|
store.add_usage(thread["id"], partial_usage)
|
|
await _send_long(update, "Stopped current run.")
|
|
except Exception as error:
|
|
logger.exception("Agent error for session %s", _session_key(update))
|
|
stop_status.set()
|
|
# await safe_delete_message(thinking)
|
|
detail = format_session_error(surface="telegram", error=error)
|
|
store.add_message(thread["id"], "assistant", detail)
|
|
await _send_long(update, detail)
|
|
finally:
|
|
_clear_active_run(context, thread["id"], run_state)
|
|
stop_status.set()
|
|
|
|
# Extract learnings from this turn (runs even on error so user
|
|
# facts are never lost — assistant_message may be empty on failure)
|
|
try:
|
|
assistant_msg = reply if "reply" in locals() else ""
|
|
project_learnings, global_learnings = extract_learnings_from_turn(
|
|
normalized_text,
|
|
assistant_msg,
|
|
)
|
|
for fact, category in project_learnings:
|
|
store.add_project_learning(thread["id"], fact, category=category)
|
|
for fact, category in global_learnings:
|
|
store.add_global_learning(fact, category=category)
|
|
except Exception:
|
|
logger.warning("Learning extraction failed", exc_info=True)
|
|
|
|
status_task.cancel()
|
|
try:
|
|
await status_task
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None or not message.text:
|
|
logger.info("Ignoring non-text Telegram update: %s", update)
|
|
return
|
|
|
|
await _run_user_turn(update, context, text=message.text)
|
|
|
|
|
|
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
message = update.effective_message
|
|
if message is None or not message.photo:
|
|
logger.info("Ignoring Telegram photo update without image payload: %s", update)
|
|
return
|
|
|
|
try:
|
|
photo = message.photo[-1]
|
|
telegram_file = await photo.get_file()
|
|
photo_bytes = bytes(await telegram_file.download_as_bytearray())
|
|
upload = _store(context).save_upload(
|
|
name=f"telegram-photo-{message.message_id}.jpg",
|
|
data=photo_bytes,
|
|
mime_type="image/jpeg",
|
|
)
|
|
except Exception as error:
|
|
logger.exception("Failed to download Telegram photo for session %s", _session_key(update))
|
|
await message.reply_text(format_session_error(surface="telegram", error=error))
|
|
return
|
|
|
|
await _run_user_turn(update, context, text=message.caption or "", uploads=[upload])
|
|
|
|
|
|
async def _send_long(update: Update, text: str) -> None:
|
|
"""Send a message as Telegram HTML, falling back to plain text."""
|
|
thread_id = _effective_topic_thread_id(update)
|
|
kwargs: dict[str, Any] = {}
|
|
if thread_id and not _is_general_topic(update):
|
|
kwargs["message_thread_id"] = thread_id
|
|
|
|
for index in range(0, len(text), TG_MESSAGE_LIMIT):
|
|
chunk = text[index : index + TG_MESSAGE_LIMIT]
|
|
try:
|
|
await update.effective_chat.send_message(markdown_to_telegram_html(chunk), parse_mode="HTML", **kwargs)
|
|
except TelegramError:
|
|
# HTML rejected — send as plain text instead
|
|
await update.effective_chat.send_message(chunk, **kwargs)
|
|
|
|
|
|
async def handle_error(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
logger.exception("Telegram application error", exc_info=context.error)
|
|
|
|
|
|
def _telegram_persistence_path() -> Path:
|
|
base_dir = Path(settings.TG_PERSISTENCE_DIR or settings.DATA_DIR).expanduser()
|
|
return base_dir / "telegram-bot-state.pkl"
|
|
|
|
|
|
def build_telegram_app(store: WebFallbackStore) -> Application:
|
|
if not settings.TG_BOT_TOKEN:
|
|
raise RuntimeError("TG_BOT_TOKEN is not configured")
|
|
|
|
persistence_path = _telegram_persistence_path()
|
|
persistence_path.parent.mkdir(parents=True, exist_ok=True)
|
|
persistence = PicklePersistence(filepath=str(persistence_path), update_interval=10)
|
|
|
|
async def _combined_post_init(app: Application) -> None:
|
|
# Store runtime objects as app attributes — NOT in bot_data.
|
|
# bot_data is persisted via PicklePersistence, and these objects
|
|
# contain unpicklable items (locks, asyncio state) which would
|
|
# corrupt the pickle file and wipe persisted aliases.
|
|
app._runtime_store = store # type: ignore[attr-defined]
|
|
app._runtime_bg_manager = BackgroundTaskManager() # type: ignore[attr-defined]
|
|
|
|
# Restore aliases from durable JSON file if bot_data lost them
|
|
file_aliases = _load_alias_file()
|
|
if file_aliases:
|
|
bd_aliases = app.bot_data.get(_USER_MODEL_ALIASES_KEY)
|
|
if not isinstance(bd_aliases, dict) or not bd_aliases:
|
|
app.bot_data[_USER_MODEL_ALIASES_KEY] = file_aliases
|
|
logger.info("Restored %d alias owner(s) from JSON file", len(file_aliases))
|
|
else:
|
|
# Merge: file fills in missing owners/keys
|
|
for owner, owner_aliases in file_aliases.items():
|
|
if owner not in bd_aliases:
|
|
bd_aliases[owner] = owner_aliases
|
|
else:
|
|
for k, v in owner_aliases.items():
|
|
bd_aliases.setdefault(k, v)
|
|
app.bot_data[_USER_MODEL_ALIASES_KEY] = bd_aliases
|
|
|
|
await _post_init(app)
|
|
|
|
app = (
|
|
Application.builder()
|
|
.token(settings.TG_BOT_TOKEN)
|
|
.persistence(persistence)
|
|
.post_init(_combined_post_init)
|
|
.build()
|
|
)
|
|
app.add_handler(TypeHandler(Update, _gate_telegram_access), group=-1)
|
|
app.add_handler(CommandHandler(["start", "help"], cmd_start))
|
|
app.add_handler(CommandHandler("models", cmd_models))
|
|
app.add_handler(CommandHandler("alias", cmd_alias))
|
|
app.add_handler(CommandHandler("alias_export", cmd_alias_export))
|
|
app.add_handler(CommandHandler("model", cmd_model))
|
|
app.add_handler(CommandHandler("provider", cmd_provider))
|
|
app.add_handler(CommandHandler(["topic", "rename"], cmd_topic))
|
|
app.add_handler(CommandHandler("newtopic", cmd_newtopic))
|
|
app.add_handler(CommandHandler("skills", cmd_skills))
|
|
app.add_handler(CommandHandler("system", cmd_system))
|
|
app.add_handler(CommandHandler("new", cmd_new))
|
|
app.add_handler(CommandHandler("current", cmd_current))
|
|
app.add_handler(CommandHandler("status", cmd_status))
|
|
app.add_handler(CommandHandler("usage", cmd_usage))
|
|
app.add_handler(CommandHandler("learnings", cmd_learnings))
|
|
app.add_handler(CommandHandler("stop", cmd_stop))
|
|
app.add_handler(CallbackQueryHandler(handle_prompt_callback, pattern="^prompt_app:"))
|
|
app.add_handler(CallbackQueryHandler(_handle_approval_callback, pattern="^tg_approve:"))
|
|
app.add_handler(CallbackQueryHandler(handle_skill_menu_callback, pattern=f"^{SKILL_MENU_CALLBACK_PREFIX}"))
|
|
app.add_handler(MessageHandler(filters.PHOTO, handle_photo))
|
|
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
|
|
app.add_error_handler(handle_error)
|
|
return app
|
|
|
|
|
|
def webhook_secret() -> str:
|
|
return hashlib.sha256(settings.TG_BOT_TOKEN.encode()).hexdigest()[:32]
|
|
|
|
|
|
async def _post_init(app: Application) -> None:
|
|
commands = [
|
|
BotCommand("start", "Show help"),
|
|
BotCommand("help", "Show help"),
|
|
BotCommand("models", "List model IDs"),
|
|
BotCommand("alias", "Save personal model alias"),
|
|
BotCommand("alias_export", "Download personal alias backup"),
|
|
BotCommand("model", "Switch model"),
|
|
BotCommand("provider", "Switch provider"),
|
|
BotCommand("topic", "Rename current topic"),
|
|
BotCommand("newtopic", "Create a new forum topic"),
|
|
BotCommand("skills", "Browse available skills"),
|
|
BotCommand("system", "Set or show topic instruction"),
|
|
BotCommand("new", "Start a new session"),
|
|
BotCommand("current", "Show current model"),
|
|
BotCommand("status", "Show system and task status"),
|
|
BotCommand("usage", "Show accumulated usage"),
|
|
BotCommand("learnings", "Show project and global learnings"),
|
|
BotCommand("stop", "Stop the current run"),
|
|
]
|
|
# Clear stale commands for all scopes, then re-register
|
|
for scope in (BotCommandScopeDefault(), BotCommandScopeAllGroupChats(), BotCommandScopeAllChatAdministrators()):
|
|
await app.bot.delete_my_commands(scope=scope)
|
|
await app.bot.set_my_commands(commands, scope=scope)
|