454 lines
16 KiB
Python
454 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""BetterBot — a Telegram bot that edits the Better Life SG website via LLM."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import subprocess
|
|
|
|
from openai import OpenAI
|
|
import telegram
|
|
from telegram import BotCommand, Update
|
|
from telegram.error import BadRequest, TimedOut
|
|
from telegram.ext import (
|
|
ApplicationBuilder,
|
|
CommandHandler,
|
|
ContextTypes,
|
|
MessageHandler,
|
|
filters,
|
|
)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s"
|
|
)
|
|
log = logging.getLogger("betterbot")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config
|
|
# ---------------------------------------------------------------------------
|
|
TG_BOT_TOKEN = os.environ["TG_BOT_TOKEN"]
|
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") or os.environ["VERCEL_AI_GATEWAY_KEY"]
|
|
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1")
|
|
MODEL = os.environ.get("MODEL", "gpt-4.1")
|
|
|
|
# Site directories
|
|
SITE_DIR = pathlib.Path(os.environ.get("SITE_DIR", "/site"))
|
|
MEMORAIZ_DIR = pathlib.Path(os.environ.get("MEMORAIZ_DIR", "/memoraiz"))
|
|
|
|
# Authorized users (Telegram user IDs)
|
|
ALLOWED_USERS: set[int] = set()
|
|
_raw = os.environ.get("ALLOWED_USERS", "876499264,417471802")
|
|
for _id in _raw.split(","):
|
|
_id = _id.strip()
|
|
if _id:
|
|
ALLOWED_USERS.add(int(_id))
|
|
log.info("Authorized users: %s", ALLOWED_USERS)
|
|
|
|
client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Project definitions
|
|
# ---------------------------------------------------------------------------
|
|
PROJECTS = {
|
|
"betterlifesg": {
|
|
"dir": SITE_DIR,
|
|
"label": "Better Life SG website",
|
|
"git_repo": SITE_DIR.parent, # /repo/betterlifesg/site -> /repo/betterlifesg
|
|
"deploy_cmd": None, # static files served directly by Caddy
|
|
},
|
|
"memoraiz": {
|
|
"dir": MEMORAIZ_DIR,
|
|
"label": "Memoraiz app (React frontend)",
|
|
"git_repo": MEMORAIZ_DIR.parent, # /repo/memoraiz/frontend -> /repo/memoraiz
|
|
"deploy_cmd": None, # deploy triggered externally after push
|
|
},
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tools — read / write / list site files + deploy
|
|
# ---------------------------------------------------------------------------
|
|
TOOLS = [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "list_files",
|
|
"description": "List all files in a project directory.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"project": {
|
|
"type": "string",
|
|
"enum": list(PROJECTS.keys()),
|
|
"description": "Which project to list files for.",
|
|
},
|
|
"subdirectory": {
|
|
"type": "string",
|
|
"description": "Optional subdirectory to list, e.g. 'src/pages'. Defaults to root.",
|
|
"default": "",
|
|
},
|
|
},
|
|
"required": ["project"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "read_file",
|
|
"description": "Read the full contents of a project file.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"project": {
|
|
"type": "string",
|
|
"enum": list(PROJECTS.keys()),
|
|
"description": "Which project the file belongs to.",
|
|
},
|
|
"path": {
|
|
"type": "string",
|
|
"description": "Relative path inside the project directory.",
|
|
},
|
|
},
|
|
"required": ["project", "path"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "write_file",
|
|
"description": "Write (create or overwrite) a text file in a project directory.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"project": {
|
|
"type": "string",
|
|
"enum": list(PROJECTS.keys()),
|
|
"description": "Which project the file belongs to.",
|
|
},
|
|
"path": {
|
|
"type": "string",
|
|
"description": "Relative path inside the project directory.",
|
|
},
|
|
"content": {
|
|
"type": "string",
|
|
"description": "The full file content to write.",
|
|
},
|
|
},
|
|
"required": ["project", "path", "content"],
|
|
},
|
|
},
|
|
},
|
|
]
|
|
|
|
SYSTEM_PROMPT = """\
|
|
You are BetterBot, a helpful assistant that manages two projects:
|
|
|
|
1. **Better Life SG website** (project: "betterlifesg")
|
|
- Static HTML site using Tailwind CSS (loaded via CDN)
|
|
- Key files: index.html, fresh-grads.html, prenatal.html, retirement.html, \
|
|
legacy.html, team.html, contact.html, images/ folder
|
|
- Brand color: teal (#00b49a)
|
|
- Changes go live immediately after writing
|
|
|
|
2. **Memoraiz app** (project: "memoraiz")
|
|
- React 19 + Vite 6 + Tailwind CSS 4 frontend
|
|
- Source code is under frontend/src/ (pages in frontend/src/pages/, \
|
|
components in frontend/src/components/)
|
|
- Changes require a rebuild to go live (handled automatically after you write)
|
|
|
|
When the user asks you to change something:
|
|
1. Ask which project if unclear (default to betterlifesg for website questions)
|
|
2. First read the relevant file(s) to understand the current state
|
|
3. Make the requested changes
|
|
4. Write the updated file back
|
|
5. Confirm what you changed
|
|
|
|
When writing a file, always write the COMPLETE file content — never partial.
|
|
Keep your responses concise and friendly. Always confirm changes after making them.
|
|
Do NOT change the overall page structure unless explicitly asked.
|
|
"""
|
|
|
|
|
|
def _resolve(base: pathlib.Path, path: str) -> pathlib.Path:
|
|
"""Resolve a relative path inside a base dir, preventing path traversal."""
|
|
resolved = (base / path).resolve()
|
|
if not str(resolved).startswith(str(base.resolve())):
|
|
raise ValueError(f"Path traversal blocked: {path}")
|
|
return resolved
|
|
|
|
|
|
def _git_push(project_key: str, changed_file: str) -> str:
|
|
"""Commit and push changes in the project's git repo."""
|
|
proj = PROJECTS[project_key]
|
|
repo = proj["git_repo"]
|
|
if not (repo / ".git").exists():
|
|
return "(no git repo — skipped push)"
|
|
try:
|
|
subprocess.run(["git", "add", "-A"], cwd=repo, check=True, capture_output=True)
|
|
subprocess.run(
|
|
["git", "commit", "-m", f"betterbot: update {changed_file}"],
|
|
cwd=repo,
|
|
check=True,
|
|
capture_output=True,
|
|
)
|
|
result = subprocess.run(
|
|
["git", "push", "origin", "HEAD"],
|
|
cwd=repo,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
log.info("Git push for %s: %s", project_key, result.stderr.strip())
|
|
|
|
# Trigger deploy if needed
|
|
deploy_cmd = proj.get("deploy_cmd")
|
|
if deploy_cmd:
|
|
log.info("Running deploy: %s", deploy_cmd)
|
|
subprocess.run(deploy_cmd, shell=True, check=True, capture_output=True)
|
|
return f"Pushed and deployed {project_key}"
|
|
return f"Pushed {project_key} to git"
|
|
except subprocess.CalledProcessError as e:
|
|
log.error("Git/deploy error: %s\nstdout: %s\nstderr: %s", e, e.stdout, e.stderr)
|
|
return f"Push failed: {e.stderr or e.stdout or str(e)}"
|
|
|
|
|
|
def handle_tool_call(name: str, args: dict) -> str:
|
|
"""Execute a tool call and return the result as a string."""
|
|
project_key = args.get("project", "betterlifesg")
|
|
if project_key not in PROJECTS:
|
|
return f"Unknown project: {project_key}"
|
|
base = PROJECTS[project_key]["dir"]
|
|
|
|
if name == "list_files":
|
|
subdir = args.get("subdirectory", "")
|
|
target = _resolve(base, subdir) if subdir else base
|
|
files = []
|
|
for p in sorted(target.rglob("*")):
|
|
if p.is_file() and not any(
|
|
part in (".git", "node_modules", "__pycache__") for part in p.parts
|
|
):
|
|
files.append(str(p.relative_to(base)))
|
|
return "\n".join(files[:200]) if files else "(no files found)"
|
|
|
|
if name == "read_file":
|
|
path = _resolve(base, args["path"])
|
|
if not path.exists():
|
|
return f"Error: {args['path']} does not exist."
|
|
if path.suffix in (".png", ".jpg", ".jpeg", ".gif", ".webp", ".ico"):
|
|
return f"[Binary image file: {args['path']}, {path.stat().st_size} bytes]"
|
|
return path.read_text(encoding="utf-8")
|
|
|
|
if name == "write_file":
|
|
path = _resolve(base, args["path"])
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(args["content"], encoding="utf-8")
|
|
push_result = _git_push(project_key, args["path"])
|
|
return (
|
|
f"OK — wrote {len(args['content'])} chars to {args['path']}. {push_result}"
|
|
)
|
|
|
|
return f"Unknown tool: {name}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Conversation state (in-memory, per-chat)
|
|
# ---------------------------------------------------------------------------
|
|
conversations: dict[int, list[dict]] = {}
|
|
|
|
|
|
def get_messages(chat_id: int) -> list[dict]:
|
|
if chat_id not in conversations:
|
|
conversations[chat_id] = [{"role": "system", "content": SYSTEM_PROMPT}]
|
|
return conversations[chat_id]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth check
|
|
# ---------------------------------------------------------------------------
|
|
def _is_authorized(user_id: int | None) -> bool:
|
|
if not ALLOWED_USERS:
|
|
return True
|
|
return user_id in ALLOWED_USERS
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Telegram handlers
|
|
# ---------------------------------------------------------------------------
|
|
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
if not _is_authorized(update.effective_user.id):
|
|
await update.message.reply_text("Sorry, you're not authorized to use this bot.")
|
|
return
|
|
await update.message.reply_text(
|
|
"Hi! I'm BetterBot 🤖\n\n"
|
|
"I manage two projects:\n"
|
|
"• **Better Life SG** website\n"
|
|
"• **Memoraiz** app\n\n"
|
|
"Just tell me what you'd like to change!\n\n"
|
|
"Examples:\n"
|
|
'• "Change the WhatsApp number to 91234567"\n'
|
|
'• "Update Hendri\'s title to Senior Consultant"\n'
|
|
'• "Update the login page text in Memoraiz"\n\n'
|
|
"Type /reset to start a fresh conversation.",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
|
|
async def cmd_reset(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
if not _is_authorized(update.effective_user.id):
|
|
return
|
|
conversations.pop(update.effective_chat.id, None)
|
|
await update.message.reply_text("Conversation reset! Send me a new request.")
|
|
|
|
|
|
async def handle_message(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
if not _is_authorized(update.effective_user.id):
|
|
return
|
|
chat_id = update.effective_chat.id
|
|
user_text = update.message.text
|
|
if not user_text:
|
|
return
|
|
|
|
# In group chats, only respond if bot is mentioned or replied to
|
|
if update.effective_chat.type in ("group", "supergroup"):
|
|
bot_username = ctx.bot.username
|
|
is_mentioned = bot_username and f"@{bot_username}" in user_text
|
|
is_reply = (
|
|
update.message.reply_to_message
|
|
and update.message.reply_to_message.from_user
|
|
and update.message.reply_to_message.from_user.id == ctx.bot.id
|
|
)
|
|
if not is_mentioned and not is_reply:
|
|
return
|
|
# Strip the @mention from the text
|
|
if bot_username:
|
|
user_text = user_text.replace(f"@{bot_username}", "").strip()
|
|
if not user_text:
|
|
return
|
|
|
|
messages = get_messages(chat_id)
|
|
messages.append({"role": "user", "content": user_text})
|
|
|
|
# Send immediate "Thinking ..." placeholder so user knows the bot read their message
|
|
thinking = await _safe_reply(update, "Thinking ...")
|
|
|
|
# Run the LLM loop (with tool calls)
|
|
max_rounds = 10
|
|
for _ in range(max_rounds):
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=MODEL,
|
|
messages=messages,
|
|
tools=TOOLS,
|
|
tool_choice="auto",
|
|
)
|
|
except Exception as e:
|
|
log.error("OpenAI API error: %s", e)
|
|
await _safe_edit(thinking, update, f"Sorry, I hit an error: {e}")
|
|
return
|
|
|
|
choice = response.choices[0]
|
|
msg = choice.message
|
|
|
|
# Append assistant message
|
|
messages.append(msg.model_dump(exclude_none=True))
|
|
|
|
if msg.tool_calls:
|
|
for tc in msg.tool_calls:
|
|
args = (
|
|
json.loads(tc.function.arguments) if tc.function.arguments else {}
|
|
)
|
|
log.info("Tool call: %s(%s)", tc.function.name, list(args.keys()))
|
|
# Update placeholder with current tool status
|
|
status = f"Working ... ({tc.function.name})"
|
|
await _safe_edit(thinking, update, status)
|
|
try:
|
|
result = handle_tool_call(tc.function.name, args)
|
|
except Exception as e:
|
|
result = f"Error: {e}"
|
|
messages.append(
|
|
{
|
|
"role": "tool",
|
|
"tool_call_id": tc.id,
|
|
"content": result,
|
|
}
|
|
)
|
|
# Continue the loop for the LLM to process tool results
|
|
continue
|
|
|
|
# No tool calls — we have a final text reply
|
|
if msg.content:
|
|
# Trim conversation to prevent unbounded growth
|
|
if len(messages) > 60:
|
|
messages[:] = messages[:1] + messages[-40:]
|
|
|
|
await _safe_edit(thinking, update, msg.content)
|
|
return
|
|
|
|
await _safe_edit(
|
|
thinking, update, "I ran out of steps — please try again with a simpler request."
|
|
)
|
|
|
|
|
|
async def _safe_reply(update: Update, text: str, **kwargs) -> "telegram.Message | None":
|
|
"""Reply to a message, falling back to send_message if the original is gone."""
|
|
try:
|
|
return await update.message.reply_text(text, **kwargs)
|
|
except BadRequest:
|
|
log.warning("Could not reply to message, sending without reply")
|
|
return await update.effective_chat.send_message(text, **kwargs)
|
|
|
|
|
|
async def _safe_edit(
|
|
thinking: "telegram.Message | None",
|
|
update: Update,
|
|
text: str,
|
|
) -> None:
|
|
"""Edit the placeholder message, falling back to a new message if it fails."""
|
|
if thinking:
|
|
try:
|
|
await thinking.edit_text(text)
|
|
return
|
|
except BadRequest:
|
|
log.warning("Could not edit placeholder, sending new message")
|
|
await update.effective_chat.send_message(text)
|
|
|
|
|
|
async def _error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Log errors and keep the bot running."""
|
|
if isinstance(context.error, TimedOut):
|
|
log.warning("Telegram request timed out")
|
|
return
|
|
log.error("Unhandled exception: %s", context.error, exc_info=context.error)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
def main() -> None:
|
|
app = ApplicationBuilder().token(TG_BOT_TOKEN).build()
|
|
app.add_handler(CommandHandler("start", cmd_start))
|
|
app.add_handler(CommandHandler("reset", cmd_reset))
|
|
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
|
|
app.add_error_handler(_error_handler)
|
|
|
|
# Register bot commands for the / menu
|
|
async def post_init(application) -> None:
|
|
await application.bot.set_my_commands(
|
|
[
|
|
BotCommand("start", "Show welcome message"),
|
|
BotCommand("reset", "Reset conversation"),
|
|
]
|
|
)
|
|
log.info("Bot commands registered")
|
|
|
|
app.post_init = post_init
|
|
log.info("BetterBot starting (model=%s, sites=%s)", MODEL, list(PROJECTS.keys()))
|
|
app.run_polling(drop_pending_updates=True, allowed_updates=Update.ALL_TYPES)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|