betterbot/main.py

415 lines
15 KiB
Python

#!/usr/bin/env python3
"""BetterBot — a Telegram bot that edits the Better Life SG website via LLM."""
from __future__ import annotations
import json
import logging
import os
import pathlib
import subprocess
from openai import OpenAI
from telegram import BotCommand, Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s"
)
log = logging.getLogger("betterbot")
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
TG_BOT_TOKEN = os.environ["TG_BOT_TOKEN"]
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") or os.environ["VERCEL_AI_GATEWAY_KEY"]
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1")
MODEL = os.environ.get("MODEL", "gpt-4.1")
# Site directories
SITE_DIR = pathlib.Path(os.environ.get("SITE_DIR", "/site"))
MEMORAIZ_DIR = pathlib.Path(os.environ.get("MEMORAIZ_DIR", "/memoraiz"))
# Authorized users (Telegram user IDs)
ALLOWED_USERS: set[int] = set()
_raw = os.environ.get("ALLOWED_USERS", "876499264,417471802")
for _id in _raw.split(","):
_id = _id.strip()
if _id:
ALLOWED_USERS.add(int(_id))
log.info("Authorized users: %s", ALLOWED_USERS)
client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL)
# ---------------------------------------------------------------------------
# Project definitions
# ---------------------------------------------------------------------------
PROJECTS = {
"betterlifesg": {
"dir": SITE_DIR,
"label": "Better Life SG website",
"git_repo": SITE_DIR.parent, # /repo/betterlifesg/site -> /repo/betterlifesg
"deploy_cmd": None, # static files served directly by Caddy
},
"memoraiz": {
"dir": MEMORAIZ_DIR,
"label": "Memoraiz app (React frontend)",
"git_repo": MEMORAIZ_DIR.parent, # /repo/memoraiz/frontend -> /repo/memoraiz
"deploy_cmd": None, # deploy triggered externally after push
},
}
# ---------------------------------------------------------------------------
# Tools — read / write / list site files + deploy
# ---------------------------------------------------------------------------
TOOLS = [
{
"type": "function",
"function": {
"name": "list_files",
"description": "List all files in a project directory.",
"parameters": {
"type": "object",
"properties": {
"project": {
"type": "string",
"enum": list(PROJECTS.keys()),
"description": "Which project to list files for.",
},
"subdirectory": {
"type": "string",
"description": "Optional subdirectory to list, e.g. 'src/pages'. Defaults to root.",
"default": "",
},
},
"required": ["project"],
},
},
},
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read the full contents of a project file.",
"parameters": {
"type": "object",
"properties": {
"project": {
"type": "string",
"enum": list(PROJECTS.keys()),
"description": "Which project the file belongs to.",
},
"path": {
"type": "string",
"description": "Relative path inside the project directory.",
},
},
"required": ["project", "path"],
},
},
},
{
"type": "function",
"function": {
"name": "write_file",
"description": "Write (create or overwrite) a text file in a project directory.",
"parameters": {
"type": "object",
"properties": {
"project": {
"type": "string",
"enum": list(PROJECTS.keys()),
"description": "Which project the file belongs to.",
},
"path": {
"type": "string",
"description": "Relative path inside the project directory.",
},
"content": {
"type": "string",
"description": "The full file content to write.",
},
},
"required": ["project", "path", "content"],
},
},
},
]
SYSTEM_PROMPT = """\
You are BetterBot, a helpful assistant that manages two projects:
1. **Better Life SG website** (project: "betterlifesg")
- Static HTML site using Tailwind CSS (loaded via CDN)
- Key files: index.html, fresh-grads.html, prenatal.html, retirement.html, \
legacy.html, team.html, contact.html, images/ folder
- Brand color: teal (#00b49a)
- Changes go live immediately after writing
2. **Memoraiz app** (project: "memoraiz")
- React 19 + Vite 6 + Tailwind CSS 4 frontend
- Source code is under frontend/src/ (pages in frontend/src/pages/, \
components in frontend/src/components/)
- Changes require a rebuild to go live (handled automatically after you write)
When the user asks you to change something:
1. Ask which project if unclear (default to betterlifesg for website questions)
2. First read the relevant file(s) to understand the current state
3. Make the requested changes
4. Write the updated file back
5. Confirm what you changed
When writing a file, always write the COMPLETE file content — never partial.
Keep your responses concise and friendly. Always confirm changes after making them.
Do NOT change the overall page structure unless explicitly asked.
"""
def _resolve(base: pathlib.Path, path: str) -> pathlib.Path:
"""Resolve a relative path inside a base dir, preventing path traversal."""
resolved = (base / path).resolve()
if not str(resolved).startswith(str(base.resolve())):
raise ValueError(f"Path traversal blocked: {path}")
return resolved
def _git_push(project_key: str, changed_file: str) -> str:
"""Commit and push changes in the project's git repo."""
proj = PROJECTS[project_key]
repo = proj["git_repo"]
if not (repo / ".git").exists():
return "(no git repo — skipped push)"
try:
subprocess.run(["git", "add", "-A"], cwd=repo, check=True, capture_output=True)
subprocess.run(
["git", "commit", "-m", f"betterbot: update {changed_file}"],
cwd=repo,
check=True,
capture_output=True,
)
result = subprocess.run(
["git", "push", "origin", "HEAD"],
cwd=repo,
check=True,
capture_output=True,
text=True,
)
log.info("Git push for %s: %s", project_key, result.stderr.strip())
# Trigger deploy if needed
deploy_cmd = proj.get("deploy_cmd")
if deploy_cmd:
log.info("Running deploy: %s", deploy_cmd)
subprocess.run(deploy_cmd, shell=True, check=True, capture_output=True)
return f"Pushed and deployed {project_key}"
return f"Pushed {project_key} to git"
except subprocess.CalledProcessError as e:
log.error("Git/deploy error: %s\nstdout: %s\nstderr: %s", e, e.stdout, e.stderr)
return f"Push failed: {e.stderr or e.stdout or str(e)}"
def handle_tool_call(name: str, args: dict) -> str:
"""Execute a tool call and return the result as a string."""
project_key = args.get("project", "betterlifesg")
if project_key not in PROJECTS:
return f"Unknown project: {project_key}"
base = PROJECTS[project_key]["dir"]
if name == "list_files":
subdir = args.get("subdirectory", "")
target = _resolve(base, subdir) if subdir else base
files = []
for p in sorted(target.rglob("*")):
if p.is_file() and not any(
part in (".git", "node_modules", "__pycache__") for part in p.parts
):
files.append(str(p.relative_to(base)))
return "\n".join(files[:200]) if files else "(no files found)"
if name == "read_file":
path = _resolve(base, args["path"])
if not path.exists():
return f"Error: {args['path']} does not exist."
if path.suffix in (".png", ".jpg", ".jpeg", ".gif", ".webp", ".ico"):
return f"[Binary image file: {args['path']}, {path.stat().st_size} bytes]"
return path.read_text(encoding="utf-8")
if name == "write_file":
path = _resolve(base, args["path"])
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(args["content"], encoding="utf-8")
push_result = _git_push(project_key, args["path"])
return (
f"OK — wrote {len(args['content'])} chars to {args['path']}. {push_result}"
)
return f"Unknown tool: {name}"
# ---------------------------------------------------------------------------
# Conversation state (in-memory, per-chat)
# ---------------------------------------------------------------------------
conversations: dict[int, list[dict]] = {}
def get_messages(chat_id: int) -> list[dict]:
if chat_id not in conversations:
conversations[chat_id] = [{"role": "system", "content": SYSTEM_PROMPT}]
return conversations[chat_id]
# ---------------------------------------------------------------------------
# Auth check
# ---------------------------------------------------------------------------
def _is_authorized(user_id: int | None) -> bool:
if not ALLOWED_USERS:
return True
return user_id in ALLOWED_USERS
# ---------------------------------------------------------------------------
# Telegram handlers
# ---------------------------------------------------------------------------
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
if not _is_authorized(update.effective_user.id):
await update.message.reply_text("Sorry, you're not authorized to use this bot.")
return
await update.message.reply_text(
"Hi! I'm BetterBot 🤖\n\n"
"I manage two projects:\n"
"• **Better Life SG** website\n"
"• **Memoraiz** app\n\n"
"Just tell me what you'd like to change!\n\n"
"Examples:\n"
'"Change the WhatsApp number to 91234567"\n'
'"Update Hendri\'s title to Senior Consultant"\n'
'"Update the login page text in Memoraiz"\n\n'
"Type /reset to start a fresh conversation.",
parse_mode="Markdown",
)
async def cmd_reset(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
if not _is_authorized(update.effective_user.id):
return
conversations.pop(update.effective_chat.id, None)
await update.message.reply_text("Conversation reset! Send me a new request.")
async def handle_message(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
if not _is_authorized(update.effective_user.id):
return
chat_id = update.effective_chat.id
user_text = update.message.text
if not user_text:
return
# In group chats, only respond if bot is mentioned or replied to
if update.effective_chat.type in ("group", "supergroup"):
bot_username = ctx.bot.username
is_mentioned = bot_username and f"@{bot_username}" in user_text
is_reply = (
update.message.reply_to_message
and update.message.reply_to_message.from_user
and update.message.reply_to_message.from_user.id == ctx.bot.id
)
if not is_mentioned and not is_reply:
return
# Strip the @mention from the text
if bot_username:
user_text = user_text.replace(f"@{bot_username}", "").strip()
if not user_text:
return
messages = get_messages(chat_id)
messages.append({"role": "user", "content": user_text})
# Send "typing" indicator
await update.message.chat.send_action("typing")
# Run the LLM loop (with tool calls)
max_rounds = 10
for _ in range(max_rounds):
try:
response = client.chat.completions.create(
model=MODEL,
messages=messages,
tools=TOOLS,
tool_choice="auto",
)
except Exception as e:
log.error("OpenAI API error: %s", e)
await update.message.reply_text(f"Sorry, I hit an error: {e}")
return
choice = response.choices[0]
msg = choice.message
# Append assistant message
messages.append(msg.model_dump(exclude_none=True))
if msg.tool_calls:
for tc in msg.tool_calls:
args = (
json.loads(tc.function.arguments) if tc.function.arguments else {}
)
log.info("Tool call: %s(%s)", tc.function.name, list(args.keys()))
try:
result = handle_tool_call(tc.function.name, args)
except Exception as e:
result = f"Error: {e}"
messages.append(
{
"role": "tool",
"tool_call_id": tc.id,
"content": result,
}
)
# Continue the loop for the LLM to process tool results
await update.message.chat.send_action("typing")
continue
# No tool calls — we have a final text reply
if msg.content:
# Trim conversation to prevent unbounded growth
if len(messages) > 60:
messages[:] = messages[:1] + messages[-40:]
await update.message.reply_text(msg.content)
return
await update.message.reply_text(
"I ran out of steps — please try again with a simpler request."
)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
app = ApplicationBuilder().token(TG_BOT_TOKEN).build()
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("reset", cmd_reset))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Register bot commands for the / menu
async def post_init(application) -> None:
await application.bot.set_my_commands([
BotCommand("start", "Show welcome message"),
BotCommand("reset", "Reset conversation"),
])
log.info("Bot commands registered")
app.post_init = post_init
log.info("BetterBot starting (model=%s, sites=%s)", MODEL, list(PROJECTS.keys()))
app.run_polling(drop_pending_updates=True, allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()