betterbot/ux.py
Andre K e68c84424f
Some checks failed
Deploy BetterBot / deploy (push) Failing after 3s
Deploy BetterBot / notify (push) Successful in 3s
feat: fork from CodeAnywhere framework
Replace standalone Telegram bot with full CodeAnywhere framework fork.
BetterBot shares all framework code and customizes only:
- instance.py: BetterBot identity, system prompt, feature flags
- tools/site_editing/: list_files, read_file, write_file with auto git push
- .env: model defaults and site directory paths
- compose/: Docker setup with betterlifesg + memoraiz mounts
- deploy script: RackNerd with Infisical secrets
2026-04-19 08:01:27 +08:00

616 lines
22 KiB
Python

"""User-facing messaging helpers for Telegram and web chat surfaces."""
from __future__ import annotations
import html
import json
import re
from typing import Any, Literal, cast
from copilot.generated.session_events import Data, SessionEvent, SessionEventType, ToolRequest
Surface = Literal["telegram", "web"]
def extract_final_text(events: list[SessionEvent]) -> str:
"""Walk collected events and return the final assistant message text."""
# Prefer the last ASSISTANT_MESSAGE event
for event in reversed(events):
if event.type == SessionEventType.ASSISTANT_MESSAGE and event.data and event.data.content:
return event.data.content.strip()
# Fallback: concatenate deltas
parts: list[str] = []
for event in events:
if event.type == SessionEventType.ASSISTANT_MESSAGE_DELTA and event.data and event.data.delta_content:
parts.append(event.data.delta_content)
text: str = "".join(parts).strip()
return text
def working_message(*, surface: Surface) -> str:
if surface == "telegram":
return "Thinking ..."
return "Working on it"
def busy_message(*, surface: Surface) -> str:
if surface == "telegram":
return "Still working on the previous message in this chat. Wait for that reply, or send /new to reset."
return "Still working on the previous message. Wait for that reply before sending another one."
def format_session_error(*, surface: Surface, error: Exception | str | None = None) -> str:
parts: list[str] = ["Run failed with an internal exception."]
detail: str = _format_error_detail(error)
if detail:
parts.append(f"Exception: {detail}")
if _looks_image_unsupported(detail):
parts.append(
"This model does not support image inputs. "
"Switch to a vision model (e.g. gpt-4o, claude-sonnet, gemini-2.5-pro) or resend without the image."
)
elif _looks_rate_limited(detail):
parts.append(
"The provider is rate-limiting requests (HTTP 429). The SDK already retried several times before giving up."
)
elif _looks_retryable(detail):
parts.append("All automatic retries were exhausted.")
parts.append(_retry_guidance(surface))
return "\n\n".join(parts)
def extract_intent_from_tool(event: SessionEvent) -> str | None:
"""If the event is a report_intent tool call, return the intent text."""
if event.type != SessionEventType.TOOL_EXECUTION_START:
return None
tool_name: str = _event_tool_name(event)
if tool_name != "report_intent":
return None
args = event.data and event.data.arguments
if not args:
return None
if isinstance(args, str):
try:
args = json.loads(args)
except Exception:
return None
if isinstance(args, dict):
args_dict = cast(dict[str, Any], args)
intent = args_dict.get("intent", "")
if isinstance(intent, str) and intent.strip():
return intent.strip()
return None
def extract_tool_intent_summary(event: SessionEvent) -> str | None:
"""Extract intent_summary from tool_requests on any event.
The Copilot SDK can attach ``tool_requests`` to events (e.g. before tool
execution starts). Each tool request may carry an ``intent_summary``
describing *why* the agent wants to call that tool.
"""
data: Data | None = getattr(event, "data", None)
if data is None:
return None
tool_requests: list[ToolRequest] = getattr(data, "tool_requests") or []
if not tool_requests:
return None
try:
summary: str = "\n".join(
intent_summary
for request in tool_requests
if (intent_summary := getattr(request, "intent_summary", "").strip())
)
return summary or None
except (IndexError, TypeError, KeyError):
return None
return None
def stream_status_updates(event: Any, *, include_reasoning_status: bool = True) -> list[str]:
"""Return ordered, deduplicated user-facing status updates for a Copilot SDK event."""
event_type: SessionEventType = event.type
data: Data | None = getattr(event, "data", None)
updates: list[str] = []
seen: set[str] = set()
noise_texts: set[str] = {"tool done", "Thinking"}
ugly_texts: dict[str, str] = {
"Running view": "Viewing file(s)",
}
def add(text: Any, *, prefix: str | None = None, limit: int = 220) -> None:
if text in (None, ""):
return
elif isinstance(text, str) and text.strip().lower() in noise_texts:
return
elif isinstance(text, str):
text = ugly_texts.get(text.strip(), text)
normalized: str = _normalize_status_text(text, prefix=prefix, limit=limit)
if not normalized:
return
dedupe_key = normalized.casefold()
if dedupe_key in seen:
return
seen.add(dedupe_key)
updates.append(normalized)
add(getattr(data, "progress_message", None), limit=240)
add(extract_tool_intent_summary(event), limit=240)
if event_type == SessionEventType.TOOL_EXECUTION_START:
tool_name: str = _event_tool_name(event)
intent: str | None = extract_intent_from_tool(event)
if tool_name == "report_intent":
pass
elif not intent and not tool_name:
pass
else:
kwargs: dict = {"text": intent, "prefix": tool_name} if intent else {"text": f"Running {tool_name}"}
add(limit=160, **kwargs)
if event_type == SessionEventType.TOOL_EXECUTION_COMPLETE:
tool_name: str = _event_tool_name(event)
if tool_name != "report_intent":
add(f"{tool_name} done", limit=160)
if event_type == SessionEventType.SUBAGENT_SELECTED:
add(f"Routed to {_event_agent_name(event)}", limit=180)
if event_type == SessionEventType.SUBAGENT_STARTED:
add(f"{_event_agent_name(event)} working", limit=180)
if event_type == SessionEventType.SESSION_COMPACTION_START:
add("Compacting context", limit=120)
if event_type == SessionEventType.SESSION_COMPACTION_COMPLETE:
add("Context compacted", limit=120)
# if event_type == SessionEventType.ASSISTANT_TURN_START:
# add("Thinking", limit=80)
if event_type == SessionEventType.ASSISTANT_INTENT:
add(getattr(data, "intent", None), limit=240)
if include_reasoning_status and event_type in {
SessionEventType.ASSISTANT_REASONING,
SessionEventType.ASSISTANT_REASONING_DELTA,
}:
reasoning = (data and data.reasoning_text) or ""
if reasoning.strip():
first_line = reasoning.strip().splitlines()[0].strip()
if first_line.lower().startswith(("intent:", "intent ")):
add(first_line, limit=240)
add(getattr(data, "message", None), limit=240)
add(getattr(data, "title", None), prefix="Title", limit=200)
add(getattr(data, "summary", None), prefix="Summary", limit=240)
add(getattr(data, "summary_content", None), prefix="Context summary", limit=240)
add(getattr(data, "warning_type", None), prefix="Warning type", limit=160)
for warning in _iter_status_values(getattr(data, "warnings", None)):
add(warning, prefix="Warning", limit=240)
add(getattr(data, "error_reason", None), prefix="Error", limit=240)
for error in _iter_status_values(getattr(data, "errors", None)):
add(error, prefix="Error", limit=240)
add(getattr(data, "reason", None), prefix="Stop reason", limit=200)
add(_format_server_status(getattr(data, "status", None)), prefix="Server", limit=160)
add(getattr(data, "phase", None), prefix="Phase", limit=120)
add(getattr(data, "mcp_tool_name", None), prefix="MCP tool", limit=180)
add(_format_code_changes_status(getattr(data, "code_changes", None)), limit=200)
# add(_format_cache_status(data), limit=180)
# The SDK's `duration` is only a subtotal for the current API round-trip.
# Total turn runtime is tracked by the caller and surfaced as a live
# elapsed clock while the overall request is still running.
total_premium_requests = getattr(data, "total_premium_requests", None)
if total_premium_requests not in (None, ""):
add(f"Premium requests: {_format_metric_number(total_premium_requests)}", limit=140)
add(getattr(data, "branch", None), prefix="Branch", limit=160)
add(getattr(data, "cwd", None), prefix="CWD", limit=220)
add(getattr(data, "git_root", None), prefix="Git root", limit=220)
head_commit = getattr(data, "head_commit", None)
if head_commit:
add(f"Head: {str(head_commit).strip()[:12]}", limit=80)
if getattr(data, "reasoning_opaque", None):
add("Encrypted reasoning attached", limit=120)
if model := getattr(data, "model", None):
add(model, prefix="\n🤖", limit=160)
return updates
def stream_status_text(event: Any) -> str:
"""Return a single concatenated status string for compatibility call sites."""
return "\n".join(stream_status_updates(event))
def stream_trace_event(event: Any) -> dict[str, str] | None:
"""Extract structured trace entries for tool activity and subagent routing."""
event_type = event.type
if event_type == SessionEventType.TOOL_EXECUTION_START:
tool_name = _event_tool_name(event)
if tool_name == "report_intent":
return None # suppress report_intent from trace
tool_call_id = (event.data and event.data.tool_call_id) or ""
detail = _stringify_trace_detail(event.data and event.data.arguments)
return {
"kind": "trace",
"category": "tool_call",
"key": f"tool:{tool_call_id or tool_name}",
"tool_name": tool_name,
"title": f"Tool call: {tool_name}",
"summary": f"Called {tool_name}",
"text": f"Called {tool_name}",
"detail": detail or "No arguments exposed.",
}
if event_type == SessionEventType.TOOL_EXECUTION_COMPLETE:
tool_name = _event_tool_name(event)
if tool_name == "report_intent":
return None # suppress report_intent from trace
tool_call_id = (event.data and event.data.tool_call_id) or ""
output_detail = _stringify_trace_detail(event.data and event.data.output)
return {
"kind": "trace",
"category": "tool_call",
"key": f"tool:{tool_call_id or tool_name}",
"tool_name": tool_name,
"title": f"Tool call: {tool_name}",
"summary": f"{tool_name} done",
"text": f"{tool_name} done",
"output_detail": output_detail or "Tool finished with no readable output.",
}
if event_type == SessionEventType.SUBAGENT_SELECTED:
agent_name = _event_agent_name(event)
return {
"kind": "trace",
"category": "subagent",
"key": f"agent:{agent_name}",
"title": f"Subagent: {agent_name}",
"summary": f"Routed to {agent_name}",
"text": f"Routed to {agent_name}",
"detail": f"The run is now executing inside the {agent_name} subagent.",
}
return None
def stream_reasoning_event(event: Any) -> tuple[str, str, bool] | None:
"""Extract reasoning text from a Copilot SDK event when available."""
if event.type == SessionEventType.ASSISTANT_REASONING_DELTA:
reasoning_id = (event.data and event.data.reasoning_id) or "reasoning"
text = (event.data and event.data.reasoning_text) or ""
if text.strip():
return reasoning_id, text, False
return None
if event.type == SessionEventType.ASSISTANT_REASONING:
reasoning_id = (event.data and event.data.reasoning_id) or "reasoning"
text = (event.data and event.data.reasoning_text) or ""
if text.strip():
return reasoning_id, text, True
return None
return None
def format_tool_counts(tool_counts: dict[str, int], *, current_status: str = "") -> str:
"""Build a compact one-line summary of tool call counts."""
if not tool_counts:
return current_status or ""
parts: list[str] = []
for name, count in sorted(tool_counts.items(), key=lambda kv: -kv[1]):
if count <= 0:
continue
parts.append(f"{count} {name}")
lines: list[str] = []
if current_status:
lines.append(current_status)
if parts:
lines.append(f"🔧 {' · '.join(parts)}")
return "\n".join(lines)
def format_elapsed_status(elapsed_seconds: float) -> str:
"""Render a human-friendly turn runtime for live status displays."""
total_seconds = max(0, int(elapsed_seconds))
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if hours:
return f"Elapsed: {hours}h {minutes:02d}m {seconds:02d}s"
if minutes:
return f"Elapsed: {minutes}m {seconds:02d}s"
return f"Elapsed: {seconds}s"
def append_elapsed_status(text: Any, *, elapsed_seconds: float) -> str:
"""Append the current turn runtime to a status line without duplicating it."""
lines = [line for line in str(text or "").splitlines() if not line.strip().lower().startswith("elapsed:")]
base = "\n".join(lines).strip()
elapsed = format_elapsed_status(elapsed_seconds)
if not base:
return elapsed
return f"{base}\n{elapsed}"
async def safe_delete_message(message: Any) -> None:
if message is None:
return
try:
await message.delete()
except Exception:
return
def markdown_to_telegram_html(text: str) -> str:
"""Convert common Markdown to Telegram-compatible HTML.
Handles fenced code blocks, inline code, bold, italic, strikethrough,
and links. Falls back gracefully — anything it can't convert is
HTML-escaped and sent as plain text.
"""
# Split into fenced code blocks vs everything else
parts: list[str] = []
# Match ```lang\n...\n``` (with optional language tag)
code_block_re = re.compile(r"```(\w*)\n(.*?)```", re.DOTALL)
last = 0
for m in code_block_re.finditer(text):
# Process non-code text before this block
if m.start() > last:
parts.append(_md_inline_to_html(text[last : m.start()]))
lang = m.group(1)
code = html.escape(m.group(2).rstrip("\n"))
if lang:
parts.append(f'<pre><code class="language-{html.escape(lang)}">{code}</code></pre>')
else:
parts.append(f"<pre>{code}</pre>")
last = m.end()
# Remaining text after last code block
if last < len(text):
parts.append(_md_inline_to_html(text[last:]))
return "".join(parts)
def _md_inline_to_html(text: str) -> str:
"""Convert inline Markdown (outside code blocks) to Telegram HTML."""
# First, protect inline code spans so their contents aren't modified
inline_code_re = re.compile(r"`([^`]+)`")
placeholder = "\x00CODE\x00"
codes: list[str] = []
def _save_code(m: re.Match) -> str:
codes.append(html.escape(m.group(1)))
return f"{placeholder}{len(codes) - 1}{placeholder}"
text = inline_code_re.sub(_save_code, text)
# Escape HTML entities in the remaining text
text = html.escape(text)
# Bold: **text** or __text__
text = re.sub(r"\*\*(.+?)\*\*", r"<b>\1</b>", text)
text = re.sub(r"__(.+?)__", r"<b>\1</b>", text)
# Italic: *text* or _text_ (but not inside words like foo_bar)
text = re.sub(r"(?<!\w)\*([^*]+?)\*(?!\w)", r"<i>\1</i>", text)
text = re.sub(r"(?<!\w)_([^_]+?)_(?!\w)", r"<i>\1</i>", text)
# Strikethrough: ~~text~~
text = re.sub(r"~~(.+?)~~", r"<s>\1</s>", text)
# Links: [text](url)
text = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", r'<a href="\2">\1</a>', text)
# Restore inline code spans
for i, code_html in enumerate(codes):
text = text.replace(f"{placeholder}{i}{placeholder}", f"<code>{code_html}</code>")
return text
# ── Private helpers ──────────────────────────────────────────────────
def _event_tool_name(event: Any) -> str:
if event.data:
return event.data.tool_name or event.data.name or "tool"
return "tool"
def _event_agent_name(event: Any) -> str:
if event.data:
return event.data.agent_name or event.data.agent_display_name or "specialist"
return "specialist"
def _retry_guidance(surface: Surface) -> str:
if surface == "telegram":
return "Reply with a narrower follow-up, switch model/provider, or send /new to reset the session."
return "Retry with a narrower follow-up, switch model/provider, or start a fresh chat."
def _looks_retryable(detail: str) -> bool:
"""Check if the error detail matches a transient failure pattern."""
if not detail:
return False
if _looks_rate_limited(detail):
return False
lower = detail.lower()
return any(
p in lower
for p in (
"failed to get response",
"operation was aborted",
"timed out",
"timeout",
"502",
"503",
"504",
"service unavailable",
"overloaded",
)
)
def _looks_rate_limited(detail: str) -> bool:
"""Check if the error is specifically a 429 / rate-limit."""
if not detail:
return False
lower = detail.lower()
return any(p in lower for p in ("429", "rate limit", "rate_limit", "too many requests"))
def _looks_image_unsupported(detail: str) -> bool:
"""Check if the error indicates the model does not accept image inputs."""
if not detail:
return False
lower = detail.lower()
return any(
p in lower
for p in ("0 image(s) may be provided", "does not support image", "image input is not supported", "images are not supported")
)
def _format_error_detail(error: Exception | str | None) -> str:
if error is None:
return ""
if isinstance(error, str):
return error.strip()
name = type(error).__name__
message = str(error).strip()
if not message or message == name:
return name
return f"{name}: {message}"
def _normalize_status_text(text: Any, *, prefix: str | None = None, limit: int = 220) -> str:
if text in (None, ""):
return ""
rendered = " ".join(str(text).split())
if not rendered:
return ""
if prefix:
rendered = f"{prefix}: {rendered}"
if limit > 0 and len(rendered) > limit:
return f"{rendered[: limit - 3].rstrip()}..."
return rendered
def _format_metric_number(value: Any) -> str:
try:
number = float(value)
except (TypeError, ValueError):
return str(value).strip()
if number.is_integer():
return f"{int(number):,}"
return f"{number:,.2f}".rstrip("0").rstrip(".")
def _format_code_changes_status(code_changes: Any) -> str:
if not code_changes:
return ""
files_modified = getattr(code_changes, "files_modified", None)
if files_modified is None and isinstance(code_changes, dict):
files_modified = code_changes.get("files_modified")
lines_added = getattr(code_changes, "lines_added", None)
if lines_added is None and isinstance(code_changes, dict):
lines_added = code_changes.get("lines_added")
lines_removed = getattr(code_changes, "lines_removed", None)
if lines_removed is None and isinstance(code_changes, dict):
lines_removed = code_changes.get("lines_removed")
parts: list[str] = []
if isinstance(files_modified, (list, tuple, set)):
parts.append(f"{len(files_modified)} files")
elif files_modified:
parts.append("1 file")
if lines_added not in (None, "") or lines_removed not in (None, ""):
parts.append(f"+{_format_metric_number(lines_added or 0)}/-{_format_metric_number(lines_removed or 0)} lines")
if not parts:
return "Code changes recorded"
return f"Code changes: {', '.join(parts)}"
def _format_cache_status(data: Data | None) -> str:
if data is None:
return ""
cache_read_tokens = getattr(data, "cache_read_tokens", None)
cache_write_tokens = getattr(data, "cache_write_tokens", None)
if cache_read_tokens in (None, "") and cache_write_tokens in (None, ""):
return ""
parts: list[str] = []
if cache_read_tokens not in (None, ""):
parts.append(f"read {_format_metric_number(cache_read_tokens)}")
if cache_write_tokens not in (None, ""):
parts.append(f"wrote {_format_metric_number(cache_write_tokens)}")
return f"Prompt cache: {', '.join(parts)}"
def _format_server_status(status: Any) -> str:
if status in (None, ""):
return ""
if isinstance(status, str):
return status.strip()
for attr in ("value", "status", "name"):
value = getattr(status, attr, None)
if isinstance(value, str) and value.strip():
return value.strip()
return str(status).strip()
def _iter_status_values(value: Any) -> list[Any]:
if value in (None, ""):
return []
if isinstance(value, (list, tuple, set)):
return list(value)
return [value]
def _stringify_trace_detail(value: Any, *, limit: int = 1800) -> str:
if value in (None, ""):
return ""
rendered = ""
if isinstance(value, str):
candidate = value.strip()
if candidate:
if candidate[:1] in {"{", "["}:
try:
rendered = json.dumps(json.loads(candidate), indent=2, ensure_ascii=False)
except Exception:
rendered = candidate
else:
rendered = candidate
elif isinstance(value, (dict, list, tuple)):
rendered = json.dumps(value, indent=2, ensure_ascii=False)
else:
rendered = str(value).strip()
if len(rendered) <= limit:
return rendered
return f"{rendered[: limit - 3].rstrip()}..."