"""User-facing messaging helpers for Telegram and web chat surfaces.""" from __future__ import annotations import html import json import re from typing import Any, Literal, cast from copilot.generated.session_events import Data, SessionEvent, SessionEventType, ToolRequest Surface = Literal["telegram", "web"] def extract_final_text(events: list[SessionEvent]) -> str: """Walk collected events and return the final assistant message text.""" # Prefer the last ASSISTANT_MESSAGE event for event in reversed(events): if event.type == SessionEventType.ASSISTANT_MESSAGE and event.data and event.data.content: return event.data.content.strip() # Fallback: concatenate deltas parts: list[str] = [] for event in events: if event.type == SessionEventType.ASSISTANT_MESSAGE_DELTA and event.data and event.data.delta_content: parts.append(event.data.delta_content) text: str = "".join(parts).strip() return text def working_message(*, surface: Surface) -> str: if surface == "telegram": return "Thinking ..." return "Working on it" def busy_message(*, surface: Surface) -> str: if surface == "telegram": return "Still working on the previous message in this chat. Wait for that reply, or send /new to reset." return "Still working on the previous message. Wait for that reply before sending another one." def format_session_error(*, surface: Surface, error: Exception | str | None = None) -> str: parts: list[str] = ["Run failed with an internal exception."] detail: str = _format_error_detail(error) if detail: parts.append(f"Exception: {detail}") if _looks_image_unsupported(detail): parts.append( "This model does not support image inputs. " "Switch to a vision model (e.g. gpt-4o, claude-sonnet, gemini-2.5-pro) or resend without the image." ) elif _looks_rate_limited(detail): parts.append( "The provider is rate-limiting requests (HTTP 429). The SDK already retried several times before giving up." ) elif _looks_retryable(detail): parts.append("All automatic retries were exhausted.") parts.append(_retry_guidance(surface)) return "\n\n".join(parts) def extract_intent_from_tool(event: SessionEvent) -> str | None: """If the event is a report_intent tool call, return the intent text.""" if event.type != SessionEventType.TOOL_EXECUTION_START: return None tool_name: str = _event_tool_name(event) if tool_name != "report_intent": return None args = event.data and event.data.arguments if not args: return None if isinstance(args, str): try: args = json.loads(args) except Exception: return None if isinstance(args, dict): args_dict = cast(dict[str, Any], args) intent = args_dict.get("intent", "") if isinstance(intent, str) and intent.strip(): return intent.strip() return None def extract_tool_intent_summary(event: SessionEvent) -> str | None: """Extract intent_summary from tool_requests on any event. The Copilot SDK can attach ``tool_requests`` to events (e.g. before tool execution starts). Each tool request may carry an ``intent_summary`` describing *why* the agent wants to call that tool. """ data: Data | None = getattr(event, "data", None) if data is None: return None tool_requests: list[ToolRequest] = getattr(data, "tool_requests") or [] if not tool_requests: return None try: summary: str = "\n".join( intent_summary for request in tool_requests if (intent_summary := getattr(request, "intent_summary", "").strip()) ) return summary or None except (IndexError, TypeError, KeyError): return None return None def stream_status_updates(event: Any, *, include_reasoning_status: bool = True) -> list[str]: """Return ordered, deduplicated user-facing status updates for a Copilot SDK event.""" event_type: SessionEventType = event.type data: Data | None = getattr(event, "data", None) updates: list[str] = [] seen: set[str] = set() noise_texts: set[str] = {"tool done", "Thinking"} ugly_texts: dict[str, str] = { "Running view": "Viewing file(s)", } def add(text: Any, *, prefix: str | None = None, limit: int = 220) -> None: if text in (None, ""): return elif isinstance(text, str) and text.strip().lower() in noise_texts: return elif isinstance(text, str): text = ugly_texts.get(text.strip(), text) normalized: str = _normalize_status_text(text, prefix=prefix, limit=limit) if not normalized: return dedupe_key = normalized.casefold() if dedupe_key in seen: return seen.add(dedupe_key) updates.append(normalized) add(getattr(data, "progress_message", None), limit=240) add(extract_tool_intent_summary(event), limit=240) if event_type == SessionEventType.TOOL_EXECUTION_START: tool_name: str = _event_tool_name(event) intent: str | None = extract_intent_from_tool(event) if tool_name == "report_intent": pass elif not intent and not tool_name: pass else: kwargs: dict = {"text": intent, "prefix": tool_name} if intent else {"text": f"Running {tool_name}"} add(limit=160, **kwargs) if event_type == SessionEventType.TOOL_EXECUTION_COMPLETE: tool_name: str = _event_tool_name(event) if tool_name != "report_intent": add(f"{tool_name} done", limit=160) if event_type == SessionEventType.SUBAGENT_SELECTED: add(f"Routed to {_event_agent_name(event)}", limit=180) if event_type == SessionEventType.SUBAGENT_STARTED: add(f"{_event_agent_name(event)} working", limit=180) if event_type == SessionEventType.SESSION_COMPACTION_START: add("Compacting context", limit=120) if event_type == SessionEventType.SESSION_COMPACTION_COMPLETE: add("Context compacted", limit=120) # if event_type == SessionEventType.ASSISTANT_TURN_START: # add("Thinking", limit=80) if event_type == SessionEventType.ASSISTANT_INTENT: add(getattr(data, "intent", None), limit=240) if include_reasoning_status and event_type in { SessionEventType.ASSISTANT_REASONING, SessionEventType.ASSISTANT_REASONING_DELTA, }: reasoning = (data and data.reasoning_text) or "" if reasoning.strip(): first_line = reasoning.strip().splitlines()[0].strip() if first_line.lower().startswith(("intent:", "intent ")): add(first_line, limit=240) add(getattr(data, "message", None), limit=240) add(getattr(data, "title", None), prefix="Title", limit=200) add(getattr(data, "summary", None), prefix="Summary", limit=240) add(getattr(data, "summary_content", None), prefix="Context summary", limit=240) add(getattr(data, "warning_type", None), prefix="Warning type", limit=160) for warning in _iter_status_values(getattr(data, "warnings", None)): add(warning, prefix="Warning", limit=240) add(getattr(data, "error_reason", None), prefix="Error", limit=240) for error in _iter_status_values(getattr(data, "errors", None)): add(error, prefix="Error", limit=240) add(getattr(data, "reason", None), prefix="Stop reason", limit=200) add(_format_server_status(getattr(data, "status", None)), prefix="Server", limit=160) add(getattr(data, "phase", None), prefix="Phase", limit=120) add(getattr(data, "mcp_tool_name", None), prefix="MCP tool", limit=180) add(_format_code_changes_status(getattr(data, "code_changes", None)), limit=200) # add(_format_cache_status(data), limit=180) # The SDK's `duration` is only a subtotal for the current API round-trip. # Total turn runtime is tracked by the caller and surfaced as a live # elapsed clock while the overall request is still running. total_premium_requests = getattr(data, "total_premium_requests", None) if total_premium_requests not in (None, ""): add(f"Premium requests: {_format_metric_number(total_premium_requests)}", limit=140) add(getattr(data, "branch", None), prefix="Branch", limit=160) add(getattr(data, "cwd", None), prefix="CWD", limit=220) add(getattr(data, "git_root", None), prefix="Git root", limit=220) head_commit = getattr(data, "head_commit", None) if head_commit: add(f"Head: {str(head_commit).strip()[:12]}", limit=80) if getattr(data, "reasoning_opaque", None): add("Encrypted reasoning attached", limit=120) if model := getattr(data, "model", None): add(model, prefix="\n๐ค", limit=160) return updates def stream_status_text(event: Any) -> str: """Return a single concatenated status string for compatibility call sites.""" return "\n".join(stream_status_updates(event)) def stream_trace_event(event: Any) -> dict[str, str] | None: """Extract structured trace entries for tool activity and subagent routing.""" event_type = event.type if event_type == SessionEventType.TOOL_EXECUTION_START: tool_name = _event_tool_name(event) if tool_name == "report_intent": return None # suppress report_intent from trace tool_call_id = (event.data and event.data.tool_call_id) or "" detail = _stringify_trace_detail(event.data and event.data.arguments) return { "kind": "trace", "category": "tool_call", "key": f"tool:{tool_call_id or tool_name}", "tool_name": tool_name, "title": f"Tool call: {tool_name}", "summary": f"Called {tool_name}", "text": f"Called {tool_name}", "detail": detail or "No arguments exposed.", } if event_type == SessionEventType.TOOL_EXECUTION_COMPLETE: tool_name = _event_tool_name(event) if tool_name == "report_intent": return None # suppress report_intent from trace tool_call_id = (event.data and event.data.tool_call_id) or "" output_detail = _stringify_trace_detail(event.data and event.data.output) return { "kind": "trace", "category": "tool_call", "key": f"tool:{tool_call_id or tool_name}", "tool_name": tool_name, "title": f"Tool call: {tool_name}", "summary": f"{tool_name} done", "text": f"{tool_name} done", "output_detail": output_detail or "Tool finished with no readable output.", } if event_type == SessionEventType.SUBAGENT_SELECTED: agent_name = _event_agent_name(event) return { "kind": "trace", "category": "subagent", "key": f"agent:{agent_name}", "title": f"Subagent: {agent_name}", "summary": f"Routed to {agent_name}", "text": f"Routed to {agent_name}", "detail": f"The run is now executing inside the {agent_name} subagent.", } return None def stream_reasoning_event(event: Any) -> tuple[str, str, bool] | None: """Extract reasoning text from a Copilot SDK event when available.""" if event.type == SessionEventType.ASSISTANT_REASONING_DELTA: reasoning_id = (event.data and event.data.reasoning_id) or "reasoning" text = (event.data and event.data.reasoning_text) or "" if text.strip(): return reasoning_id, text, False return None if event.type == SessionEventType.ASSISTANT_REASONING: reasoning_id = (event.data and event.data.reasoning_id) or "reasoning" text = (event.data and event.data.reasoning_text) or "" if text.strip(): return reasoning_id, text, True return None return None def format_tool_counts(tool_counts: dict[str, int], *, current_status: str = "") -> str: """Build a compact one-line summary of tool call counts.""" if not tool_counts: return current_status or "" parts: list[str] = [] for name, count in sorted(tool_counts.items(), key=lambda kv: -kv[1]): if count <= 0: continue parts.append(f"{count} {name}") lines: list[str] = [] if current_status: lines.append(current_status) if parts: lines.append(f"๐ง {' ยท '.join(parts)}") return "\n".join(lines) def format_elapsed_status(elapsed_seconds: float) -> str: """Render a human-friendly turn runtime for live status displays.""" total_seconds = max(0, int(elapsed_seconds)) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) if hours: return f"Elapsed: {hours}h {minutes:02d}m {seconds:02d}s" if minutes: return f"Elapsed: {minutes}m {seconds:02d}s" return f"Elapsed: {seconds}s" def append_elapsed_status(text: Any, *, elapsed_seconds: float) -> str: """Append the current turn runtime to a status line without duplicating it.""" lines = [line for line in str(text or "").splitlines() if not line.strip().lower().startswith("elapsed:")] base = "\n".join(lines).strip() elapsed = format_elapsed_status(elapsed_seconds) if not base: return elapsed return f"{base}\n{elapsed}" async def safe_delete_message(message: Any) -> None: if message is None: return try: await message.delete() except Exception: return def markdown_to_telegram_html(text: str) -> str: """Convert common Markdown to Telegram-compatible HTML. Handles fenced code blocks, inline code, bold, italic, strikethrough, and links. Falls back gracefully โ anything it can't convert is HTML-escaped and sent as plain text. """ # Split into fenced code blocks vs everything else parts: list[str] = [] # Match ```lang\n...\n``` (with optional language tag) code_block_re = re.compile(r"```(\w*)\n(.*?)```", re.DOTALL) last = 0 for m in code_block_re.finditer(text): # Process non-code text before this block if m.start() > last: parts.append(_md_inline_to_html(text[last : m.start()])) lang = m.group(1) code = html.escape(m.group(2).rstrip("\n")) if lang: parts.append(f'
{code}')
else:
parts.append(f"{code}")
last = m.end()
# Remaining text after last code block
if last < len(text):
parts.append(_md_inline_to_html(text[last:]))
return "".join(parts)
def _md_inline_to_html(text: str) -> str:
"""Convert inline Markdown (outside code blocks) to Telegram HTML."""
# First, protect inline code spans so their contents aren't modified
inline_code_re = re.compile(r"`([^`]+)`")
placeholder = "\x00CODE\x00"
codes: list[str] = []
def _save_code(m: re.Match) -> str:
codes.append(html.escape(m.group(1)))
return f"{placeholder}{len(codes) - 1}{placeholder}"
text = inline_code_re.sub(_save_code, text)
# Escape HTML entities in the remaining text
text = html.escape(text)
# Bold: **text** or __text__
text = re.sub(r"\*\*(.+?)\*\*", r"\1", text)
text = re.sub(r"__(.+?)__", r"\1", text)
# Italic: *text* or _text_ (but not inside words like foo_bar)
text = re.sub(r"(?\1", text)
text = re.sub(r"(?\1", text)
# Strikethrough: ~~text~~
text = re.sub(r"~~(.+?)~~", r"{code_html}")
return text
# โโ Private helpers โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def _event_tool_name(event: Any) -> str:
if event.data:
return event.data.tool_name or event.data.name or "tool"
return "tool"
def _event_agent_name(event: Any) -> str:
if event.data:
return event.data.agent_name or event.data.agent_display_name or "specialist"
return "specialist"
def _retry_guidance(surface: Surface) -> str:
if surface == "telegram":
return "Reply with a narrower follow-up, switch model/provider, or send /new to reset the session."
return "Retry with a narrower follow-up, switch model/provider, or start a fresh chat."
def _looks_retryable(detail: str) -> bool:
"""Check if the error detail matches a transient failure pattern."""
if not detail:
return False
if _looks_rate_limited(detail):
return False
lower = detail.lower()
return any(
p in lower
for p in (
"failed to get response",
"operation was aborted",
"timed out",
"timeout",
"502",
"503",
"504",
"service unavailable",
"overloaded",
)
)
def _looks_rate_limited(detail: str) -> bool:
"""Check if the error is specifically a 429 / rate-limit."""
if not detail:
return False
lower = detail.lower()
return any(p in lower for p in ("429", "rate limit", "rate_limit", "too many requests"))
def _looks_image_unsupported(detail: str) -> bool:
"""Check if the error indicates the model does not accept image inputs."""
if not detail:
return False
lower = detail.lower()
return any(
p in lower
for p in ("0 image(s) may be provided", "does not support image", "image input is not supported", "images are not supported")
)
def _format_error_detail(error: Exception | str | None) -> str:
if error is None:
return ""
if isinstance(error, str):
return error.strip()
name = type(error).__name__
message = str(error).strip()
if not message or message == name:
return name
return f"{name}: {message}"
def _normalize_status_text(text: Any, *, prefix: str | None = None, limit: int = 220) -> str:
if text in (None, ""):
return ""
rendered = " ".join(str(text).split())
if not rendered:
return ""
if prefix:
rendered = f"{prefix}: {rendered}"
if limit > 0 and len(rendered) > limit:
return f"{rendered[: limit - 3].rstrip()}..."
return rendered
def _format_metric_number(value: Any) -> str:
try:
number = float(value)
except (TypeError, ValueError):
return str(value).strip()
if number.is_integer():
return f"{int(number):,}"
return f"{number:,.2f}".rstrip("0").rstrip(".")
def _format_code_changes_status(code_changes: Any) -> str:
if not code_changes:
return ""
files_modified = getattr(code_changes, "files_modified", None)
if files_modified is None and isinstance(code_changes, dict):
files_modified = code_changes.get("files_modified")
lines_added = getattr(code_changes, "lines_added", None)
if lines_added is None and isinstance(code_changes, dict):
lines_added = code_changes.get("lines_added")
lines_removed = getattr(code_changes, "lines_removed", None)
if lines_removed is None and isinstance(code_changes, dict):
lines_removed = code_changes.get("lines_removed")
parts: list[str] = []
if isinstance(files_modified, (list, tuple, set)):
parts.append(f"{len(files_modified)} files")
elif files_modified:
parts.append("1 file")
if lines_added not in (None, "") or lines_removed not in (None, ""):
parts.append(f"+{_format_metric_number(lines_added or 0)}/-{_format_metric_number(lines_removed or 0)} lines")
if not parts:
return "Code changes recorded"
return f"Code changes: {', '.join(parts)}"
def _format_cache_status(data: Data | None) -> str:
if data is None:
return ""
cache_read_tokens = getattr(data, "cache_read_tokens", None)
cache_write_tokens = getattr(data, "cache_write_tokens", None)
if cache_read_tokens in (None, "") and cache_write_tokens in (None, ""):
return ""
parts: list[str] = []
if cache_read_tokens not in (None, ""):
parts.append(f"read {_format_metric_number(cache_read_tokens)}")
if cache_write_tokens not in (None, ""):
parts.append(f"wrote {_format_metric_number(cache_write_tokens)}")
return f"Prompt cache: {', '.join(parts)}"
def _format_server_status(status: Any) -> str:
if status in (None, ""):
return ""
if isinstance(status, str):
return status.strip()
for attr in ("value", "status", "name"):
value = getattr(status, attr, None)
if isinstance(value, str) and value.strip():
return value.strip()
return str(status).strip()
def _iter_status_values(value: Any) -> list[Any]:
if value in (None, ""):
return []
if isinstance(value, (list, tuple, set)):
return list(value)
return [value]
def _stringify_trace_detail(value: Any, *, limit: int = 1800) -> str:
if value in (None, ""):
return ""
rendered = ""
if isinstance(value, str):
candidate = value.strip()
if candidate:
if candidate[:1] in {"{", "["}:
try:
rendered = json.dumps(json.loads(candidate), indent=2, ensure_ascii=False)
except Exception:
rendered = candidate
else:
rendered = candidate
elif isinstance(value, (dict, list, tuple)):
rendered = json.dumps(value, indent=2, ensure_ascii=False)
else:
rendered = str(value).strip()
if len(rendered) <= limit:
return rendered
return f"{rendered[: limit - 3].rstrip()}..."