betterbot/provisioners/karakeep.py
Andre K e68c84424f
Some checks failed
Deploy BetterBot / deploy (push) Failing after 3s
Deploy BetterBot / notify (push) Successful in 3s
feat: fork from CodeAnywhere framework
Replace standalone Telegram bot with full CodeAnywhere framework fork.
BetterBot shares all framework code and customizes only:
- instance.py: BetterBot identity, system prompt, feature flags
- tools/site_editing/: list_files, read_file, write_file with auto git push
- .env: model defaults and site directory paths
- compose/: Docker setup with betterlifesg + memoraiz mounts
- deploy script: RackNerd with Infisical secrets
2026-04-19 08:01:27 +08:00

75 lines
2.4 KiB
Python

"""Karakeep service account provisioner (T031).
Karakeep does NOT expose an admin user-creation API via its REST surface.
The tRPC admin routes are not accessible through /api/v1/. Therefore
Karakeep remains owner-only: the owner's API key is migrated from env
vars during bootstrap, and no automated provisioning is available for
other users.
This provisioner stub exists so the registry can report Karakeep as
'available but not provisionable' during onboarding.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import httpx
from config import settings
from provisioners.base import ProvisionResult, ServiceProvisioner
if TYPE_CHECKING:
from user_store import ServiceCredential, User, UserStore
logger = logging.getLogger(__name__)
class KarakeepProvisioner(ServiceProvisioner):
@property
def service_name(self) -> str:
return "karakeep"
@property
def capabilities(self) -> list[str]:
return ["bookmarks"]
@property
def requires_consent(self) -> bool:
return True
async def provision(self, user: User, store: UserStore) -> ProvisionResult:
"""Karakeep does not support automatic user provisioning."""
store.log_provisioning(
user.id,
"karakeep",
"provision_failed",
'{"reason": "Karakeep admin API does not support user creation"}',
)
return ProvisionResult(
success=False,
error=(
"Karakeep doesn't support automatic account creation. "
"Ask the admin to create an account for you at "
f"{settings.KARAKEEP_API_URL.rstrip('/').replace('/api/v1', '')}, "
"then share your API key with me."
),
)
async def health_check(self, credential: ServiceCredential) -> bool:
"""Verify the stored API key still works."""
from user_store import decrypt
base_url = settings.KARAKEEP_API_URL.rstrip("/")
try:
token = decrypt(credential.encrypted_token)
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{base_url}/users/me",
headers={"Authorization": f"Bearer {token}"},
)
return resp.status_code == 200
except Exception:
logger.exception("Karakeep health check failed")
return False