Replace standalone Telegram bot with full CodeAnywhere framework fork. BetterBot shares all framework code and customizes only: - instance.py: BetterBot identity, system prompt, feature flags - tools/site_editing/: list_files, read_file, write_file with auto git push - .env: model defaults and site directory paths - compose/: Docker setup with betterlifesg + memoraiz mounts - deploy script: RackNerd with Infisical secrets
616 lines
22 KiB
Python
616 lines
22 KiB
Python
"""User-facing messaging helpers for Telegram and web chat surfaces."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import html
|
|
import json
|
|
import re
|
|
from typing import Any, Literal, cast
|
|
|
|
from copilot.generated.session_events import Data, SessionEvent, SessionEventType, ToolRequest
|
|
|
|
Surface = Literal["telegram", "web"]
|
|
|
|
|
|
def extract_final_text(events: list[SessionEvent]) -> str:
|
|
"""Walk collected events and return the final assistant message text."""
|
|
# Prefer the last ASSISTANT_MESSAGE event
|
|
for event in reversed(events):
|
|
if event.type == SessionEventType.ASSISTANT_MESSAGE and event.data and event.data.content:
|
|
return event.data.content.strip()
|
|
|
|
# Fallback: concatenate deltas
|
|
parts: list[str] = []
|
|
for event in events:
|
|
if event.type == SessionEventType.ASSISTANT_MESSAGE_DELTA and event.data and event.data.delta_content:
|
|
parts.append(event.data.delta_content)
|
|
text: str = "".join(parts).strip()
|
|
return text
|
|
|
|
|
|
def working_message(*, surface: Surface) -> str:
|
|
if surface == "telegram":
|
|
return "Thinking ..."
|
|
return "Working on it"
|
|
|
|
|
|
def busy_message(*, surface: Surface) -> str:
|
|
if surface == "telegram":
|
|
return "Still working on the previous message in this chat. Wait for that reply, or send /new to reset."
|
|
return "Still working on the previous message. Wait for that reply before sending another one."
|
|
|
|
|
|
def format_session_error(*, surface: Surface, error: Exception | str | None = None) -> str:
|
|
parts: list[str] = ["Run failed with an internal exception."]
|
|
detail: str = _format_error_detail(error)
|
|
if detail:
|
|
parts.append(f"Exception: {detail}")
|
|
if _looks_image_unsupported(detail):
|
|
parts.append(
|
|
"This model does not support image inputs. "
|
|
"Switch to a vision model (e.g. gpt-4o, claude-sonnet, gemini-2.5-pro) or resend without the image."
|
|
)
|
|
elif _looks_rate_limited(detail):
|
|
parts.append(
|
|
"The provider is rate-limiting requests (HTTP 429). The SDK already retried several times before giving up."
|
|
)
|
|
elif _looks_retryable(detail):
|
|
parts.append("All automatic retries were exhausted.")
|
|
parts.append(_retry_guidance(surface))
|
|
return "\n\n".join(parts)
|
|
|
|
|
|
def extract_intent_from_tool(event: SessionEvent) -> str | None:
|
|
"""If the event is a report_intent tool call, return the intent text."""
|
|
if event.type != SessionEventType.TOOL_EXECUTION_START:
|
|
return None
|
|
tool_name: str = _event_tool_name(event)
|
|
if tool_name != "report_intent":
|
|
return None
|
|
args = event.data and event.data.arguments
|
|
if not args:
|
|
return None
|
|
if isinstance(args, str):
|
|
try:
|
|
args = json.loads(args)
|
|
except Exception:
|
|
return None
|
|
if isinstance(args, dict):
|
|
args_dict = cast(dict[str, Any], args)
|
|
intent = args_dict.get("intent", "")
|
|
if isinstance(intent, str) and intent.strip():
|
|
return intent.strip()
|
|
return None
|
|
|
|
|
|
def extract_tool_intent_summary(event: SessionEvent) -> str | None:
|
|
"""Extract intent_summary from tool_requests on any event.
|
|
|
|
The Copilot SDK can attach ``tool_requests`` to events (e.g. before tool
|
|
execution starts). Each tool request may carry an ``intent_summary``
|
|
describing *why* the agent wants to call that tool.
|
|
"""
|
|
data: Data | None = getattr(event, "data", None)
|
|
if data is None:
|
|
return None
|
|
tool_requests: list[ToolRequest] = getattr(data, "tool_requests") or []
|
|
if not tool_requests:
|
|
return None
|
|
try:
|
|
summary: str = "\n".join(
|
|
intent_summary
|
|
for request in tool_requests
|
|
if (intent_summary := getattr(request, "intent_summary", "").strip())
|
|
)
|
|
return summary or None
|
|
except (IndexError, TypeError, KeyError):
|
|
return None
|
|
return None
|
|
|
|
|
|
def stream_status_updates(event: Any, *, include_reasoning_status: bool = True) -> list[str]:
|
|
"""Return ordered, deduplicated user-facing status updates for a Copilot SDK event."""
|
|
event_type: SessionEventType = event.type
|
|
data: Data | None = getattr(event, "data", None)
|
|
updates: list[str] = []
|
|
seen: set[str] = set()
|
|
noise_texts: set[str] = {"tool done", "Thinking"}
|
|
ugly_texts: dict[str, str] = {
|
|
"Running view": "Viewing file(s)",
|
|
}
|
|
|
|
def add(text: Any, *, prefix: str | None = None, limit: int = 220) -> None:
|
|
if text in (None, ""):
|
|
return
|
|
elif isinstance(text, str) and text.strip().lower() in noise_texts:
|
|
return
|
|
elif isinstance(text, str):
|
|
text = ugly_texts.get(text.strip(), text)
|
|
|
|
normalized: str = _normalize_status_text(text, prefix=prefix, limit=limit)
|
|
if not normalized:
|
|
return
|
|
dedupe_key = normalized.casefold()
|
|
if dedupe_key in seen:
|
|
return
|
|
seen.add(dedupe_key)
|
|
updates.append(normalized)
|
|
|
|
add(getattr(data, "progress_message", None), limit=240)
|
|
add(extract_tool_intent_summary(event), limit=240)
|
|
|
|
if event_type == SessionEventType.TOOL_EXECUTION_START:
|
|
tool_name: str = _event_tool_name(event)
|
|
intent: str | None = extract_intent_from_tool(event)
|
|
if tool_name == "report_intent":
|
|
pass
|
|
elif not intent and not tool_name:
|
|
pass
|
|
else:
|
|
kwargs: dict = {"text": intent, "prefix": tool_name} if intent else {"text": f"Running {tool_name}"}
|
|
add(limit=160, **kwargs)
|
|
|
|
if event_type == SessionEventType.TOOL_EXECUTION_COMPLETE:
|
|
tool_name: str = _event_tool_name(event)
|
|
if tool_name != "report_intent":
|
|
add(f"{tool_name} done", limit=160)
|
|
|
|
if event_type == SessionEventType.SUBAGENT_SELECTED:
|
|
add(f"Routed to {_event_agent_name(event)}", limit=180)
|
|
|
|
if event_type == SessionEventType.SUBAGENT_STARTED:
|
|
add(f"{_event_agent_name(event)} working", limit=180)
|
|
|
|
if event_type == SessionEventType.SESSION_COMPACTION_START:
|
|
add("Compacting context", limit=120)
|
|
|
|
if event_type == SessionEventType.SESSION_COMPACTION_COMPLETE:
|
|
add("Context compacted", limit=120)
|
|
|
|
# if event_type == SessionEventType.ASSISTANT_TURN_START:
|
|
# add("Thinking", limit=80)
|
|
|
|
if event_type == SessionEventType.ASSISTANT_INTENT:
|
|
add(getattr(data, "intent", None), limit=240)
|
|
|
|
if include_reasoning_status and event_type in {
|
|
SessionEventType.ASSISTANT_REASONING,
|
|
SessionEventType.ASSISTANT_REASONING_DELTA,
|
|
}:
|
|
reasoning = (data and data.reasoning_text) or ""
|
|
if reasoning.strip():
|
|
first_line = reasoning.strip().splitlines()[0].strip()
|
|
if first_line.lower().startswith(("intent:", "intent ")):
|
|
add(first_line, limit=240)
|
|
|
|
add(getattr(data, "message", None), limit=240)
|
|
add(getattr(data, "title", None), prefix="Title", limit=200)
|
|
add(getattr(data, "summary", None), prefix="Summary", limit=240)
|
|
add(getattr(data, "summary_content", None), prefix="Context summary", limit=240)
|
|
add(getattr(data, "warning_type", None), prefix="Warning type", limit=160)
|
|
|
|
for warning in _iter_status_values(getattr(data, "warnings", None)):
|
|
add(warning, prefix="Warning", limit=240)
|
|
|
|
add(getattr(data, "error_reason", None), prefix="Error", limit=240)
|
|
|
|
for error in _iter_status_values(getattr(data, "errors", None)):
|
|
add(error, prefix="Error", limit=240)
|
|
|
|
add(getattr(data, "reason", None), prefix="Stop reason", limit=200)
|
|
add(_format_server_status(getattr(data, "status", None)), prefix="Server", limit=160)
|
|
add(getattr(data, "phase", None), prefix="Phase", limit=120)
|
|
add(getattr(data, "mcp_tool_name", None), prefix="MCP tool", limit=180)
|
|
add(_format_code_changes_status(getattr(data, "code_changes", None)), limit=200)
|
|
# add(_format_cache_status(data), limit=180)
|
|
|
|
# The SDK's `duration` is only a subtotal for the current API round-trip.
|
|
# Total turn runtime is tracked by the caller and surfaced as a live
|
|
# elapsed clock while the overall request is still running.
|
|
|
|
total_premium_requests = getattr(data, "total_premium_requests", None)
|
|
if total_premium_requests not in (None, ""):
|
|
add(f"Premium requests: {_format_metric_number(total_premium_requests)}", limit=140)
|
|
|
|
add(getattr(data, "branch", None), prefix="Branch", limit=160)
|
|
add(getattr(data, "cwd", None), prefix="CWD", limit=220)
|
|
add(getattr(data, "git_root", None), prefix="Git root", limit=220)
|
|
|
|
head_commit = getattr(data, "head_commit", None)
|
|
if head_commit:
|
|
add(f"Head: {str(head_commit).strip()[:12]}", limit=80)
|
|
|
|
if getattr(data, "reasoning_opaque", None):
|
|
add("Encrypted reasoning attached", limit=120)
|
|
|
|
if model := getattr(data, "model", None):
|
|
add(model, prefix="\n🤖", limit=160)
|
|
return updates
|
|
|
|
|
|
def stream_status_text(event: Any) -> str:
|
|
"""Return a single concatenated status string for compatibility call sites."""
|
|
return "\n".join(stream_status_updates(event))
|
|
|
|
|
|
def stream_trace_event(event: Any) -> dict[str, str] | None:
|
|
"""Extract structured trace entries for tool activity and subagent routing."""
|
|
event_type = event.type
|
|
|
|
if event_type == SessionEventType.TOOL_EXECUTION_START:
|
|
tool_name = _event_tool_name(event)
|
|
if tool_name == "report_intent":
|
|
return None # suppress report_intent from trace
|
|
tool_call_id = (event.data and event.data.tool_call_id) or ""
|
|
detail = _stringify_trace_detail(event.data and event.data.arguments)
|
|
return {
|
|
"kind": "trace",
|
|
"category": "tool_call",
|
|
"key": f"tool:{tool_call_id or tool_name}",
|
|
"tool_name": tool_name,
|
|
"title": f"Tool call: {tool_name}",
|
|
"summary": f"Called {tool_name}",
|
|
"text": f"Called {tool_name}",
|
|
"detail": detail or "No arguments exposed.",
|
|
}
|
|
|
|
if event_type == SessionEventType.TOOL_EXECUTION_COMPLETE:
|
|
tool_name = _event_tool_name(event)
|
|
if tool_name == "report_intent":
|
|
return None # suppress report_intent from trace
|
|
tool_call_id = (event.data and event.data.tool_call_id) or ""
|
|
output_detail = _stringify_trace_detail(event.data and event.data.output)
|
|
return {
|
|
"kind": "trace",
|
|
"category": "tool_call",
|
|
"key": f"tool:{tool_call_id or tool_name}",
|
|
"tool_name": tool_name,
|
|
"title": f"Tool call: {tool_name}",
|
|
"summary": f"{tool_name} done",
|
|
"text": f"{tool_name} done",
|
|
"output_detail": output_detail or "Tool finished with no readable output.",
|
|
}
|
|
|
|
if event_type == SessionEventType.SUBAGENT_SELECTED:
|
|
agent_name = _event_agent_name(event)
|
|
return {
|
|
"kind": "trace",
|
|
"category": "subagent",
|
|
"key": f"agent:{agent_name}",
|
|
"title": f"Subagent: {agent_name}",
|
|
"summary": f"Routed to {agent_name}",
|
|
"text": f"Routed to {agent_name}",
|
|
"detail": f"The run is now executing inside the {agent_name} subagent.",
|
|
}
|
|
|
|
return None
|
|
|
|
|
|
def stream_reasoning_event(event: Any) -> tuple[str, str, bool] | None:
|
|
"""Extract reasoning text from a Copilot SDK event when available."""
|
|
if event.type == SessionEventType.ASSISTANT_REASONING_DELTA:
|
|
reasoning_id = (event.data and event.data.reasoning_id) or "reasoning"
|
|
text = (event.data and event.data.reasoning_text) or ""
|
|
if text.strip():
|
|
return reasoning_id, text, False
|
|
return None
|
|
|
|
if event.type == SessionEventType.ASSISTANT_REASONING:
|
|
reasoning_id = (event.data and event.data.reasoning_id) or "reasoning"
|
|
text = (event.data and event.data.reasoning_text) or ""
|
|
if text.strip():
|
|
return reasoning_id, text, True
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
def format_tool_counts(tool_counts: dict[str, int], *, current_status: str = "") -> str:
|
|
"""Build a compact one-line summary of tool call counts."""
|
|
if not tool_counts:
|
|
return current_status or ""
|
|
|
|
parts: list[str] = []
|
|
for name, count in sorted(tool_counts.items(), key=lambda kv: -kv[1]):
|
|
if count <= 0:
|
|
continue
|
|
parts.append(f"{count} {name}")
|
|
|
|
lines: list[str] = []
|
|
if current_status:
|
|
lines.append(current_status)
|
|
if parts:
|
|
lines.append(f"🔧 {' · '.join(parts)}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_elapsed_status(elapsed_seconds: float) -> str:
|
|
"""Render a human-friendly turn runtime for live status displays."""
|
|
total_seconds = max(0, int(elapsed_seconds))
|
|
hours, remainder = divmod(total_seconds, 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
|
|
if hours:
|
|
return f"Elapsed: {hours}h {minutes:02d}m {seconds:02d}s"
|
|
if minutes:
|
|
return f"Elapsed: {minutes}m {seconds:02d}s"
|
|
return f"Elapsed: {seconds}s"
|
|
|
|
|
|
def append_elapsed_status(text: Any, *, elapsed_seconds: float) -> str:
|
|
"""Append the current turn runtime to a status line without duplicating it."""
|
|
lines = [line for line in str(text or "").splitlines() if not line.strip().lower().startswith("elapsed:")]
|
|
base = "\n".join(lines).strip()
|
|
elapsed = format_elapsed_status(elapsed_seconds)
|
|
if not base:
|
|
return elapsed
|
|
return f"{base}\n{elapsed}"
|
|
|
|
|
|
async def safe_delete_message(message: Any) -> None:
|
|
if message is None:
|
|
return
|
|
try:
|
|
await message.delete()
|
|
except Exception:
|
|
return
|
|
|
|
|
|
def markdown_to_telegram_html(text: str) -> str:
|
|
"""Convert common Markdown to Telegram-compatible HTML.
|
|
|
|
Handles fenced code blocks, inline code, bold, italic, strikethrough,
|
|
and links. Falls back gracefully — anything it can't convert is
|
|
HTML-escaped and sent as plain text.
|
|
"""
|
|
# Split into fenced code blocks vs everything else
|
|
parts: list[str] = []
|
|
# Match ```lang\n...\n``` (with optional language tag)
|
|
code_block_re = re.compile(r"```(\w*)\n(.*?)```", re.DOTALL)
|
|
last = 0
|
|
for m in code_block_re.finditer(text):
|
|
# Process non-code text before this block
|
|
if m.start() > last:
|
|
parts.append(_md_inline_to_html(text[last : m.start()]))
|
|
lang = m.group(1)
|
|
code = html.escape(m.group(2).rstrip("\n"))
|
|
if lang:
|
|
parts.append(f'<pre><code class="language-{html.escape(lang)}">{code}</code></pre>')
|
|
else:
|
|
parts.append(f"<pre>{code}</pre>")
|
|
last = m.end()
|
|
# Remaining text after last code block
|
|
if last < len(text):
|
|
parts.append(_md_inline_to_html(text[last:]))
|
|
return "".join(parts)
|
|
|
|
|
|
def _md_inline_to_html(text: str) -> str:
|
|
"""Convert inline Markdown (outside code blocks) to Telegram HTML."""
|
|
# First, protect inline code spans so their contents aren't modified
|
|
inline_code_re = re.compile(r"`([^`]+)`")
|
|
placeholder = "\x00CODE\x00"
|
|
codes: list[str] = []
|
|
|
|
def _save_code(m: re.Match) -> str:
|
|
codes.append(html.escape(m.group(1)))
|
|
return f"{placeholder}{len(codes) - 1}{placeholder}"
|
|
|
|
text = inline_code_re.sub(_save_code, text)
|
|
|
|
# Escape HTML entities in the remaining text
|
|
text = html.escape(text)
|
|
|
|
# Bold: **text** or __text__
|
|
text = re.sub(r"\*\*(.+?)\*\*", r"<b>\1</b>", text)
|
|
text = re.sub(r"__(.+?)__", r"<b>\1</b>", text)
|
|
|
|
# Italic: *text* or _text_ (but not inside words like foo_bar)
|
|
text = re.sub(r"(?<!\w)\*([^*]+?)\*(?!\w)", r"<i>\1</i>", text)
|
|
text = re.sub(r"(?<!\w)_([^_]+?)_(?!\w)", r"<i>\1</i>", text)
|
|
|
|
# Strikethrough: ~~text~~
|
|
text = re.sub(r"~~(.+?)~~", r"<s>\1</s>", text)
|
|
|
|
# Links: [text](url)
|
|
text = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", r'<a href="\2">\1</a>', text)
|
|
|
|
# Restore inline code spans
|
|
for i, code_html in enumerate(codes):
|
|
text = text.replace(f"{placeholder}{i}{placeholder}", f"<code>{code_html}</code>")
|
|
|
|
return text
|
|
|
|
|
|
# ── Private helpers ──────────────────────────────────────────────────
|
|
|
|
|
|
def _event_tool_name(event: Any) -> str:
|
|
if event.data:
|
|
return event.data.tool_name or event.data.name or "tool"
|
|
return "tool"
|
|
|
|
|
|
def _event_agent_name(event: Any) -> str:
|
|
if event.data:
|
|
return event.data.agent_name or event.data.agent_display_name or "specialist"
|
|
return "specialist"
|
|
|
|
|
|
def _retry_guidance(surface: Surface) -> str:
|
|
if surface == "telegram":
|
|
return "Reply with a narrower follow-up, switch model/provider, or send /new to reset the session."
|
|
return "Retry with a narrower follow-up, switch model/provider, or start a fresh chat."
|
|
|
|
|
|
def _looks_retryable(detail: str) -> bool:
|
|
"""Check if the error detail matches a transient failure pattern."""
|
|
if not detail:
|
|
return False
|
|
if _looks_rate_limited(detail):
|
|
return False
|
|
lower = detail.lower()
|
|
return any(
|
|
p in lower
|
|
for p in (
|
|
"failed to get response",
|
|
"operation was aborted",
|
|
"timed out",
|
|
"timeout",
|
|
"502",
|
|
"503",
|
|
"504",
|
|
"service unavailable",
|
|
"overloaded",
|
|
)
|
|
)
|
|
|
|
|
|
def _looks_rate_limited(detail: str) -> bool:
|
|
"""Check if the error is specifically a 429 / rate-limit."""
|
|
if not detail:
|
|
return False
|
|
lower = detail.lower()
|
|
return any(p in lower for p in ("429", "rate limit", "rate_limit", "too many requests"))
|
|
|
|
|
|
def _looks_image_unsupported(detail: str) -> bool:
|
|
"""Check if the error indicates the model does not accept image inputs."""
|
|
if not detail:
|
|
return False
|
|
lower = detail.lower()
|
|
return any(
|
|
p in lower
|
|
for p in ("0 image(s) may be provided", "does not support image", "image input is not supported", "images are not supported")
|
|
)
|
|
|
|
|
|
def _format_error_detail(error: Exception | str | None) -> str:
|
|
if error is None:
|
|
return ""
|
|
if isinstance(error, str):
|
|
return error.strip()
|
|
name = type(error).__name__
|
|
message = str(error).strip()
|
|
if not message or message == name:
|
|
return name
|
|
return f"{name}: {message}"
|
|
|
|
|
|
def _normalize_status_text(text: Any, *, prefix: str | None = None, limit: int = 220) -> str:
|
|
if text in (None, ""):
|
|
return ""
|
|
|
|
rendered = " ".join(str(text).split())
|
|
if not rendered:
|
|
return ""
|
|
if prefix:
|
|
rendered = f"{prefix}: {rendered}"
|
|
if limit > 0 and len(rendered) > limit:
|
|
return f"{rendered[: limit - 3].rstrip()}..."
|
|
return rendered
|
|
|
|
|
|
def _format_metric_number(value: Any) -> str:
|
|
try:
|
|
number = float(value)
|
|
except (TypeError, ValueError):
|
|
return str(value).strip()
|
|
|
|
if number.is_integer():
|
|
return f"{int(number):,}"
|
|
return f"{number:,.2f}".rstrip("0").rstrip(".")
|
|
|
|
|
|
def _format_code_changes_status(code_changes: Any) -> str:
|
|
if not code_changes:
|
|
return ""
|
|
|
|
files_modified = getattr(code_changes, "files_modified", None)
|
|
if files_modified is None and isinstance(code_changes, dict):
|
|
files_modified = code_changes.get("files_modified")
|
|
|
|
lines_added = getattr(code_changes, "lines_added", None)
|
|
if lines_added is None and isinstance(code_changes, dict):
|
|
lines_added = code_changes.get("lines_added")
|
|
|
|
lines_removed = getattr(code_changes, "lines_removed", None)
|
|
if lines_removed is None and isinstance(code_changes, dict):
|
|
lines_removed = code_changes.get("lines_removed")
|
|
|
|
parts: list[str] = []
|
|
if isinstance(files_modified, (list, tuple, set)):
|
|
parts.append(f"{len(files_modified)} files")
|
|
elif files_modified:
|
|
parts.append("1 file")
|
|
|
|
if lines_added not in (None, "") or lines_removed not in (None, ""):
|
|
parts.append(f"+{_format_metric_number(lines_added or 0)}/-{_format_metric_number(lines_removed or 0)} lines")
|
|
|
|
if not parts:
|
|
return "Code changes recorded"
|
|
return f"Code changes: {', '.join(parts)}"
|
|
|
|
|
|
def _format_cache_status(data: Data | None) -> str:
|
|
if data is None:
|
|
return ""
|
|
|
|
cache_read_tokens = getattr(data, "cache_read_tokens", None)
|
|
cache_write_tokens = getattr(data, "cache_write_tokens", None)
|
|
if cache_read_tokens in (None, "") and cache_write_tokens in (None, ""):
|
|
return ""
|
|
|
|
parts: list[str] = []
|
|
if cache_read_tokens not in (None, ""):
|
|
parts.append(f"read {_format_metric_number(cache_read_tokens)}")
|
|
if cache_write_tokens not in (None, ""):
|
|
parts.append(f"wrote {_format_metric_number(cache_write_tokens)}")
|
|
return f"Prompt cache: {', '.join(parts)}"
|
|
|
|
|
|
def _format_server_status(status: Any) -> str:
|
|
if status in (None, ""):
|
|
return ""
|
|
if isinstance(status, str):
|
|
return status.strip()
|
|
|
|
for attr in ("value", "status", "name"):
|
|
value = getattr(status, attr, None)
|
|
if isinstance(value, str) and value.strip():
|
|
return value.strip()
|
|
|
|
return str(status).strip()
|
|
|
|
|
|
def _iter_status_values(value: Any) -> list[Any]:
|
|
if value in (None, ""):
|
|
return []
|
|
if isinstance(value, (list, tuple, set)):
|
|
return list(value)
|
|
return [value]
|
|
|
|
|
|
def _stringify_trace_detail(value: Any, *, limit: int = 1800) -> str:
|
|
if value in (None, ""):
|
|
return ""
|
|
|
|
rendered = ""
|
|
if isinstance(value, str):
|
|
candidate = value.strip()
|
|
if candidate:
|
|
if candidate[:1] in {"{", "["}:
|
|
try:
|
|
rendered = json.dumps(json.loads(candidate), indent=2, ensure_ascii=False)
|
|
except Exception:
|
|
rendered = candidate
|
|
else:
|
|
rendered = candidate
|
|
elif isinstance(value, (dict, list, tuple)):
|
|
rendered = json.dumps(value, indent=2, ensure_ascii=False)
|
|
else:
|
|
rendered = str(value).strip()
|
|
|
|
if len(rendered) <= limit:
|
|
return rendered
|
|
return f"{rendered[: limit - 3].rstrip()}..."
|