Zion Boggan zionboggan.com ↗

Initial commit: Perseus multi-agent AI orchestration platform

20-agent task routing, multi-LLM integration (Grok/GPT/Ollama),
SSH infrastructure management, Discord command and control,
PostgreSQL analytics, Brave web search, and health monitoring.
fd00e30   Zion Boggan committed on Mar 23, 2026 (3 months ago)
.env.example +47 -0
@@ -0,0 +1,47 @@
+# Perseus AI Platform - Environment Variables
+# Copy this to .env and fill in your values
+
+# Discord
+DISCORD_TOKEN=your_discord_bot_token_here
+DISCORD_CHANNEL_ID=your_channel_id_here
+OWNER_DISCORD_ID=your_discord_user_id_here
+
+# AI Models
+XAI_API_KEY=your_xai_api_key_here
+OPENAI_API_KEY=your_openai_api_key_here
+BRAVE_API_KEY=your_brave_search_api_key_here
+
+# Ollama (local LLM)
+OLLAMA_URL=http://localhost:11434
+OLLAMA_MODEL=qwen3:8b
+
+# X/Twitter (OAuth 1.0a)
+X_CONSUMER_KEY=your_consumer_key_here
+X_CONSUMER_SECRET=your_consumer_secret_here
+X_ACCESS_TOKEN=your_access_token_here
+X_ACCESS_TOKEN_SECRET=your_access_token_secret_here
+
+# Analytics / Ad Platform API
+ANALYTICS_API_KEY=your_analytics_api_key_here
+ANALYTICS_BASE_URL=https://api.your-ad-platform.com
+
+# PostgreSQL
+PG_DBNAME=perseus
+PG_USER=perseus
+PG_PASSWORD=your_postgres_password_here
+PG_HOST=localhost
+
+# Budget
+MONTHLY_BUDGET=10.00
+
+# Infrastructure IPs (customize for your homelab)
+NODE1_IP=REDACTED-IP
+NODE2_IP=REDACTED-IP
+PERSEUS_IP=REDACTED-IP
+VPN_IP=REDACTED-IP
+DNS_IP=REDACTED-IP
+BACKUP_IP=REDACTED-IP
+GPU_IP=REDACTED-IP
+
+# Video API
+JWT_SECRET=your_jwt_secret_here
.gitignore +11 -0
@@ -0,0 +1,11 @@
+.env
+*.pem
+*.key
+*.db
+__pycache__/
+*.pyc
+backups/
+data/
+node_modules/
+*.log
+.claude/
README.md +150 -0
@@ -0,0 +1,150 @@
+# Perseus -- Multi-Agent AI Orchestration Platform
+
+> 20-agent task routing system with multi-LLM integration, infrastructure management, and Discord command and control
+
+## Overview
+
+Perseus is an AI orchestration platform that routes natural language commands to specialized agents across a self-hosted Proxmox homelab. It integrates multiple LLM providers (Grok/xAI, OpenAI, and local Ollama) with runtime model switching, SSH-based infrastructure management, web search, social media posting, and real-time health monitoring -- all controllable via Discord.
+
+The system acts as a centralized command layer for homelab operations: from GPU monitoring and database backups to content generation and competitive analysis.
+
+## Architecture
+
+```
++------------------+ +------------------+
+| Discord Bot | | FastAPI REST |
+| (DM + Channel) | | (Port 3002) |
++--------+---------+ +--------+---------+
+ | |
+ +------------+-------------+
+ |
+ +-------v--------+
+ | Chat Router |
+ | (Intent Match) |
+ +-------+--------+
+ |
+ +----------------+----------------+
+ | | | | |
++----v--+ +---v--+ +--v---+ +-v-----+ +v---------+
+|System | |Infra | |Agent | |Search | |General |
+|Cmds | |SSH | |Route | |Brave | |Assistant |
++-------+ +------+ +--+---+ +------+ +----------+
+ |
+ +------------+------------+
+ | | | | |
+ social trend content comply revenue
+ post hunt curator scan track
+```
+
+## Features
+
+### 20-Agent Task Registry
+Each agent has a designated LLM backend (cloud or local) and trigger phrases:
+- **social_publisher** -- Social media content generation with post queue
+- **trend_hunter** -- Market trend analysis and niche discovery
+- **content_curator** -- Content filtering, deduplication, and ranking
+- **content_rewriter** -- SEO-optimized metadata generation
+- **compliance_scanner** -- Policy and copyright risk assessment (GPT-4o-mini)
+- **revenue_tracker** -- Ad platform analytics with Grok analysis
+- **infra_monitor** -- Full-stack health monitoring across all nodes
+- **backup_manager** -- Automated vzdump, pg_dump, and config archival
+- **command_executor** -- SSH command execution with safety controls
+- **general_assistant** -- Catchall brain with web search augmentation
+
+### Multi-LLM Integration
+- **Grok/xAI** -- Primary analysis engine ($2/$10 per 1M tokens)
+- **GPT-4o-mini** -- Compliance scanning ($0.15/$0.60 per 1M tokens)
+- **Ollama (local)** -- Free inference for content agents, trend analysis
+- Runtime switching via `!model grok` / `!model gpt` / `!model ollama [name]`
+- Per-request cost tracking against a monthly budget ceiling
+
+### Infrastructure Management
+- **SSH execution** across all homelab nodes with three safety tiers:
+ - Blocklist: destructive commands rejected immediately
+ - Auto-approve: read-only operations execute without confirmation
+ - Manual approval: everything else requires explicit `!exec`
+- **Health monitoring**: CPU, RAM, disk, GPU utilization, temperature, Ollama model status
+- **Backup system**: Proxmox vzdump snapshots, PostgreSQL dumps, config archives with retention policies
+- **Infrastructure map** with aliases (e.g., "gpu" resolves to the correct node)
+
+### AI Response Processing
+The AI can emit structured directives that Perseus interprets and executes:
+- `EXECUTE:hostname:command` -- runs SSH commands inline during conversation
+- `SEARCH:query` -- triggers Brave web search and injects results
+
+### Discord Command and Control
+- Natural language chat routed to the active brain
+- DM-based admin control with owner-only authentication
+- Rate limiting per user
+- Long message chunking (auto-splits at 2000 chars)
+- Conversation context carried across exchanges
+
+## Tech Stack
+
+| Component | Purpose |
+|-----------|---------|
+| Python 3.11 | Core runtime |
+| FastAPI | REST API (port 3002) |
+| Discord.py | Bot interface with DM admin |
+| PostgreSQL | Agent activity logging, cost analytics |
+| Grok/xAI API | Primary AI brain with web + X search |
+| OpenAI API (GPT-4o-mini) | Compliance scanning |
+| Ollama | Free local LLM inference on GPU |
+| Brave Search API | Web search integration |
+| X/Twitter API | OAuth 1.0a social posting |
+| Node.js + Express | Video library microservice API |
+| SSH + subprocess | Cross-node command execution |
+
+## Setup
+
+### Prerequisites
+- Python 3.10+
+- PostgreSQL
+- Ollama with at least one model pulled
+- Discord bot token
+- SSH key access to your infrastructure nodes
+
+### Installation
+
+```bash
+git clone https://github.com/YOUR_USERNAME/perseus-ai-platform.git
+cd perseus-ai-platform
+
+pip install fastapi uvicorn requests discord.py psycopg2-binary
+
+# Set up PostgreSQL
+createdb perseus
+psql perseus -c "CREATE TABLE metadata (id SERIAL PRIMARY KEY, agent TEXT, timestamp TEXT, command TEXT, result TEXT, cost_usd REAL DEFAULT 0);"
+
+# Configure environment
+cp .env.example .env
+# Edit .env with your API keys, IPs, and credentials
+
+# For video API (optional)
+cd video-api && npm install && cd ..
+```
+
+### Running
+
+```bash
+# Start Perseus (Discord + FastAPI + all agents)
+uvicorn main:app --host 0.0.0.0 --port 3002
+
+# Start video API (optional, separate process)
+node video-api.js
+```
+
+## Configuration
+
+See `.env.example` for all environment variables. Key settings:
+
+| Variable | Description |
+|----------|-------------|
+| `XAI_API_KEY` | Grok/xAI API key (primary brain) |
+| `OPENAI_API_KEY` | OpenAI API key (compliance scanner) |
+| `BRAVE_API_KEY` | Brave Search API key |
+| `DISCORD_TOKEN` | Discord bot token |
+| `OLLAMA_URL` | Ollama server URL |
+| `PG_*` | PostgreSQL connection details |
+| `NODE*_IP` | Infrastructure node IP addresses |
+| `MONTHLY_BUDGET` | API spend ceiling (default $10) |
main.py +1299 -0
@@ -0,0 +1,1299 @@
+import os
+import time
+import subprocess
+from datetime import datetime, timedelta
+from fastapi import FastAPI, Body
+from fastapi.responses import JSONResponse
+import json
+import psycopg2
+import requests
+import discord
+from discord.ext import commands
+import asyncio
+import threading
+import hmac
+import hashlib
+import base64
+import urllib.parse
+import uuid
+
+# =============================================================================
+# CONFIGURATION - loaded from environment variables
+# =============================================================================
+
+def load_env(path=None):
+ if path is None:
+ path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")
+ if os.path.exists(path):
+ with open(path) as f:
+ for line in f:
+ line = line.strip()
+ if line and not line.startswith("#") and "=" in line:
+ k, _, v = line.partition("=")
+ os.environ.setdefault(k.strip(), v.strip().strip('"').strip("'"))
+
+load_env()
+
+OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
+XAI_API_KEY = os.getenv("XAI_API_KEY", "")
+BRAVE_API_KEY = os.getenv("BRAVE_API_KEY", "")
+DISCORD_TOKEN = os.getenv("DISCORD_TOKEN", "")
+DISCORD_CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID", "0"))
+OWNER_DISCORD_ID = int(os.getenv("OWNER_DISCORD_ID", "0"))
+
+OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434")
+OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3:8b")
+ACTIVE_BRAIN = "grok" # grok | gpt | ollama
+
+X_CONSUMER_KEY = os.getenv("X_CONSUMER_KEY", "")
+X_CONSUMER_SECRET = os.getenv("X_CONSUMER_SECRET", "")
+X_ACCESS_TOKEN = os.getenv("X_ACCESS_TOKEN", "")
+X_ACCESS_TOKEN_SECRET = os.getenv("X_ACCESS_TOKEN_SECRET", "")
+
+ANALYTICS_API_KEY = os.getenv("ANALYTICS_API_KEY", "")
+ANALYTICS_BASE = os.getenv("ANALYTICS_BASE_URL", "https://api.example.com")
+
+MODEL_COSTS = {
+ "gpt-4o-mini": {"input_per_1m": 0.15, "output_per_1m": 0.60},
+ "grok-4-1-fast-reasoning": {"input_per_1m": 2.00, "output_per_1m": 10.00},
+ "ollama": {"input_per_1m": 0.0, "output_per_1m": 0.0},
+}
+MONTHLY_BUDGET_CEILING = float(os.getenv("MONTHLY_BUDGET", "10.00"))
+
+RATE_LIMIT = {}
+RATE_LIMIT_SECONDS = 5
+CHANNEL_LAST_RESULT = {}
+STARTUP_TIME = datetime.utcnow()
+PENDING_POSTS = []
+
+app = FastAPI(title="Perseus")
+DB_CONFIG = {
+ "dbname": os.getenv("PG_DBNAME", "perseus"),
+ "user": os.getenv("PG_USER", "perseus"),
+ "password": os.getenv("PG_PASSWORD", ""),
+ "host": os.getenv("PG_HOST", "localhost"),
+}
+
+# =============================================================================
+# INFRASTRUCTURE MAP - configure via environment or edit this section
+# =============================================================================
+
+INFRA_MAP = {
+ "node-1": {"ip": os.getenv("NODE1_IP", "REDACTED-IP"), "type": "proxmox_host", "desc": "Proxmox host 1"},
+ "node-2": {"ip": os.getenv("NODE2_IP", "REDACTED-IP"), "type": "proxmox_host", "desc": "Proxmox host 2 (GPU)"},
+ "perseus": {"ip": os.getenv("PERSEUS_IP", "REDACTED-IP"), "type": "lxc", "ct": 120, "node": "node-1", "desc": "Perseus main (FastAPI + Discord)"},
+ "vpn": {"ip": os.getenv("VPN_IP", "REDACTED-IP"), "type": "lxc", "ct": 200, "node": "node-1", "desc": "WireGuard VPN"},
+ "dns": {"ip": os.getenv("DNS_IP", "REDACTED-IP"), "type": "lxc", "ct": 201, "node": "node-1", "desc": "Pi-hole DNS"},
+ "backup": {"ip": os.getenv("BACKUP_IP", "REDACTED-IP"), "type": "lxc", "ct": 203, "node": "node-1", "desc": "Proxmox Backup Server"},
+ "gpu": {"ip": os.getenv("GPU_IP", "REDACTED-IP"), "type": "lxc", "ct": 205, "node": "node-2", "desc": "Ollama + GPU inference"},
+}
+
+HOST_ALIASES = {
+ "gpu node": "node-2", "compute": "node-2",
+ "pi-hole": "dns", "pihole": "dns", "adblock": "dns",
+ "wireguard": "vpn", "wg": "vpn",
+ "backup server": "backup", "pbs": "backup",
+ "ollama": "gpu", "llm": "gpu",
+ "main": "perseus", "self": "perseus", "local": "perseus",
+}
+
+BLOCKED_COMMANDS = ["rm -rf /", "rm -rf /*", "mkfs", "dd if=/dev/zero", ":(){ :|:& };:",
+ "chmod -R 777 /", "shutdown", "reboot", "poweroff", "init 0", "init 6", "halt"]
+
+AUTO_APPROVE = ["ls", "cat", "grep", "find", "df", "du", "free", "top", "uptime", "hostname",
+ "whoami", "id", "ps", "systemctl status", "journalctl", "ip a", "ip addr", "ifconfig",
+ "ping", "nvidia-smi", "ollama list", "ollama ps", "pct list", "qm list", "pvesh",
+ "pihole", "docker ps", "head", "tail", "wc", "sort", "date", "cal", "echo", "pwd", "env"]
+
+AGENT_REGISTRY = {
+ "scraper": {"model": "langgraph", "local": True},
+ "content_curator": {"model": "ollama", "local": True},
+ "trend_hunter": {"model": "ollama", "local": True},
+ "social_publisher": {"model": "ollama", "local": True},
+ "compliance_scanner": {"model": "gpt4o-mini", "local": False},
+ "ab_testing_orchestrator": {"model": "gpt4o-mini", "local": False},
+ "analytics_reporter": {"model": "langgraph", "local": True},
+ "backup_manager": {"model": "langgraph", "local": True},
+ "competitor_intelligence": {"model": "grok-fast", "local": False},
+ "content_rewriter": {"model": "ollama", "local": True},
+ "engagement_analyst": {"model": "ollama", "local": True},
+ "funnel_optimizer": {"model": "claude", "local": False},
+ "traffic_router": {"model": "langgraph", "local": True},
+ "scheduler_intelligence": {"model": "langgraph", "local": True},
+ "model_router": {"model": "langgraph", "local": True},
+ "rate_limit_resilience": {"model": "langgraph", "local": True},
+ "infra_monitor": {"model": "langgraph", "local": True},
+ "compliance_approval": {"model": "langgraph", "local": True},
+ "capability_builder": {"model": "langgraph", "local": True},
+ "revenue_tracker": {"model": "grok-fast", "local": False},
+}
+
+# =============================================================================
+# DISCORD BOT
+# =============================================================================
+
+intents = discord.Intents.default()
+intents.message_content = True
+bot = commands.Bot(command_prefix='!', intents=intents)
+bot_loop = None
+
+def allowed_ctx(ctx):
+ """Check if command is from group channel or owner DM."""
+ if ctx.channel.id == DISCORD_CHANNEL_ID:
+ return True
+ if isinstance(ctx.channel, discord.DMChannel) and ctx.author.id == OWNER_DISCORD_ID:
+ return True
+ return False
+
+@bot.event
+async def on_ready():
+ global bot_loop
+ bot_loop = asyncio.get_event_loop()
+ print(f"Discord bot logged in as {bot.user}")
+
+async def send_discord_alert(msg):
+ try:
+ ch = bot.get_channel(DISCORD_CHANNEL_ID)
+ if ch:
+ await ch.send(msg)
+ except Exception as e:
+ print(f"Alert fail: {e}")
+
+def send_alert_sync(msg):
+ if bot_loop and bot_loop.is_running():
+ asyncio.run_coroutine_threadsafe(send_discord_alert(msg), bot_loop)
+
+@bot.event
+async def on_message(message):
+ if message.author == bot.user:
+ return
+ is_group = message.channel.id == DISCORD_CHANNEL_ID
+ is_owner_dm = isinstance(message.channel, discord.DMChannel) and message.author.id == OWNER_DISCORD_ID
+ if not is_group and not is_owner_dm:
+ return
+ if message.content.startswith("!"):
+ await bot.process_commands(message)
+ return
+ if not message.content.strip():
+ return
+ uid = str(message.author.id)
+ now = time.time()
+ if uid in RATE_LIMIT and (now - RATE_LIMIT[uid]) < RATE_LIMIT_SECONDS:
+ await message.reply(f"Rate limited -- wait {RATE_LIMIT_SECONDS - (now - RATE_LIMIT[uid]):.0f}s")
+ return
+ RATE_LIMIT[uid] = now
+
+ async with message.channel.typing():
+ try:
+ payload = {"command": message.content}
+ last = CHANNEL_LAST_RESULT.get(message.channel.id)
+ if last:
+ payload["previous_result"] = last
+ resp = requests.post("http://localhost:3002/chat", json=payload, timeout=120)
+ data = resp.json()
+ agent = data.get("agent", "unknown")
+ result = data.get("result", "No result")
+ model = data.get("model", "")
+ CHANNEL_LAST_RESULT[message.channel.id] = {"agent": agent, "result": result}
+ model_sig = f" . `{model}`" if model else ""
+ header = f"**[{agent}]{model_sig}**\n"
+ full = header + result
+ chunks = []
+ while len(full) > 1990:
+ split_at = full.rfind("\n", 0, 1990)
+ if split_at < 500:
+ split_at = 1990
+ chunks.append(full[:split_at])
+ full = full[split_at:].lstrip("\n")
+ chunks.append(full)
+ for i, chunk in enumerate(chunks):
+ if i == 0:
+ await message.reply(chunk)
+ else:
+ await message.channel.send(chunk)
+ except Exception as e:
+ await message.reply(f"[WARN] {str(e)}")
+
+def run_discord_bot():
+ asyncio.run(bot.start(DISCORD_TOKEN))
+
+threading.Thread(target=run_discord_bot, daemon=True).start()
+
+# =============================================================================
+# DATABASE - PostgreSQL for agent activity logging and cost analytics
+# =============================================================================
+
+def save_to_db(agent, command, result, cost=0.0, model_used=""):
+ try:
+ conn = psycopg2.connect(**DB_CONFIG)
+ cur = conn.cursor()
+ cur.execute("INSERT INTO metadata (agent, timestamp, command, result, cost_usd) VALUES (%s, %s, %s, %s, %s)",
+ (agent, datetime.utcnow().isoformat(), command, json.dumps(result), cost))
+ conn.commit()
+ cur.close()
+ conn.close()
+ except Exception as e:
+ print(f"DB: {e}")
+
+def get_cost_summary():
+ try:
+ conn = psycopg2.connect(**DB_CONFIG)
+ cur = conn.cursor()
+ today = datetime.utcnow().strftime('%Y-%m-%d')
+ month_start = datetime.utcnow().strftime('%Y-%m-01')
+ cur.execute("SELECT COALESCE(SUM(cost_usd),0) FROM metadata WHERE timestamp::text >= %s", (today,))
+ td = float(cur.fetchone()[0])
+ cur.execute("SELECT COALESCE(SUM(cost_usd),0) FROM metadata WHERE timestamp::text >= %s", (month_start,))
+ mo = float(cur.fetchone()[0])
+ cur.execute("SELECT COALESCE(SUM(cost_usd),0) FROM metadata")
+ tot = float(cur.fetchone()[0])
+ cur.execute("SELECT agent, COUNT(*) FROM metadata WHERE timestamp::text >= %s GROUP BY agent ORDER BY COUNT(*) DESC LIMIT 8", (today,))
+ ac = cur.fetchall()
+ cur.close()
+ conn.close()
+ return {"today": td, "month": mo, "total": tot, "budget_remaining": MONTHLY_BUDGET_CEILING - mo, "agents": ac}
+ except Exception as e:
+ return {"error": str(e)}
+
+def get_recent_commands(n=5):
+ try:
+ conn = psycopg2.connect(**DB_CONFIG)
+ cur = conn.cursor()
+ cur.execute("SELECT agent, command, timestamp FROM metadata ORDER BY timestamp DESC LIMIT %s", (n,))
+ rows = cur.fetchall()
+ cur.close()
+ conn.close()
+ return rows
+ except:
+ return []
+
+# =============================================================================
+# SSH - remote command execution with safety controls
+# =============================================================================
+
+def resolve_host(name):
+ n = name.lower().strip()
+ if n in INFRA_MAP:
+ return n
+ return HOST_ALIASES.get(n, None)
+
+def is_blocked(cmd):
+ for b in BLOCKED_COMMANDS:
+ if b in cmd.lower():
+ return True
+ return False
+
+def is_safe(cmd):
+ first = cmd.lower().split()[0] if cmd.split() else ""
+ return any(cmd.lower().startswith(s) or first == s for s in AUTO_APPROVE)
+
+def run_ssh(host, cmd, timeout=30):
+ try:
+ r = subprocess.run(["ssh", "-o", "ConnectTimeout=10", host, cmd],
+ capture_output=True, text=True, timeout=timeout)
+ return {"ok": r.returncode == 0, "out": (r.stdout.strip() or r.stderr.strip() or "(empty)"), "host": host}
+ except subprocess.TimeoutExpired:
+ return {"ok": False, "out": f"Timeout ({timeout}s)", "host": host}
+ except Exception as e:
+ return {"ok": False, "out": str(e), "host": host}
+
+def run_local(cmd, timeout=30):
+ try:
+ r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
+ return {"ok": r.returncode == 0, "out": (r.stdout.strip() or r.stderr.strip() or "(empty)"), "host": "perseus"}
+ except subprocess.TimeoutExpired:
+ return {"ok": False, "out": f"Timeout ({timeout}s)", "host": "perseus"}
+ except Exception as e:
+ return {"ok": False, "out": str(e), "host": "perseus"}
+
+# =============================================================================
+# BRAVE SEARCH - web search integration
+# =============================================================================
+
+def brave_search(query, count=3):
+ try:
+ resp = requests.get("https://api.search.brave.com/res/v1/web/search",
+ headers={"X-Subscription-Token": BRAVE_API_KEY, "Accept": "application/json"},
+ params={"q": query, "count": count}, timeout=10)
+ if resp.status_code == 200:
+ results = resp.json().get("web", {}).get("results", [])
+ return [{"title": r.get("title", ""), "url": r.get("url", ""), "desc": r.get("description", "")[:200]} for r in results]
+ return []
+ except:
+ return []
+
+# =============================================================================
+# AI MODELS - multi-LLM routing with runtime switching
+# =============================================================================
+
+SYSTEM_PROMPT = """You are Perseus, the command AI for a Proxmox homelab infrastructure. You are direct, efficient, and technically competent.
+
+Infrastructure you control (all have SSH access):
+- node-1: Proxmox host 1 (primary compute)
+- node-2: Proxmox host 2 (GPU compute node)
+- CT 120 / perseus: Your home -- FastAPI, PostgreSQL, Discord bot
+- CT 200 / vpn: WireGuard VPN
+- CT 201 / dns: Pi-hole DNS/ad blocking
+- CT 203 / backup: Proxmox Backup Server
+- CT 205 / gpu: Ollama + local LLM inference on GPU
+
+When asked to DO something on infrastructure, include commands as:
+EXECUTE:hostname:command (one per line)
+
+Examples:
+"check disk on dns" -> EXECUTE:dns:df -h
+"GPU status" -> EXECUTE:gpu:nvidia-smi
+"whitelist google.com on pihole" -> EXECUTE:dns:pihole -w google.com
+
+When asked about weather, news, or current events, you have web search. Include:
+SEARCH:query
+and the results will be appended.
+
+For general questions, just answer. Keep responses concise."""
+
+def call_grok(prompt, sys=None):
+ msgs = []
+ if sys:
+ msgs.append({"role": "system", "content": sys})
+ msgs.append({"role": "user", "content": prompt})
+ try:
+ r = requests.post("https://api.x.ai/v1/chat/completions",
+ headers={"Authorization": f"Bearer {XAI_API_KEY}", "Content-Type": "application/json"},
+ json={"model": "grok-4-1-fast-reasoning", "messages": msgs, "temperature": 0.7, "max_tokens": 1500}, timeout=60)
+ if r.status_code == 200:
+ d = r.json()
+ txt = d["choices"][0]["message"]["content"].strip()
+ u = d.get("usage", {})
+ cost = (u.get("prompt_tokens", 0) / 1e6 * 2.0) + (u.get("completion_tokens", 0) / 1e6 * 10.0)
+ return {"ok": True, "result": txt, "cost": cost, "model": "grok-4-1-fast-reasoning"}
+ return {"ok": False, "result": f"Grok HTTP {r.status_code}", "cost": 0, "model": "grok"}
+ except Exception as e:
+ return {"ok": False, "result": f"Grok: {e}", "cost": 0, "model": "grok"}
+
+def call_gpt(prompt, sys=None):
+ msgs = []
+ if sys:
+ msgs.append({"role": "system", "content": sys})
+ msgs.append({"role": "user", "content": prompt})
+ try:
+ r = requests.post("https://api.openai.com/v1/chat/completions",
+ headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
+ json={"model": "gpt-4o-mini", "messages": msgs, "temperature": 0.7, "max_tokens": 1500}, timeout=60)
+ if r.status_code == 200:
+ d = r.json()
+ txt = d["choices"][0]["message"]["content"].strip()
+ u = d.get("usage", {})
+ cost = (u.get("prompt_tokens", 0) / 1e6 * 0.15) + (u.get("completion_tokens", 0) / 1e6 * 0.60)
+ return {"ok": True, "result": txt, "cost": cost, "model": "gpt-4o-mini"}
+ return {"ok": False, "result": f"GPT HTTP {r.status_code}", "cost": 0, "model": "gpt"}
+ except Exception as e:
+ return {"ok": False, "result": f"GPT: {e}", "cost": 0, "model": "gpt"}
+
+def call_ollama(prompt):
+ try:
+ r = requests.post(f"{OLLAMA_URL}/api/chat",
+ json={"model": OLLAMA_MODEL, "messages": [{"role": "user", "content": prompt}],
+ "stream": False, "options": {"temperature": 0.7}}, timeout=90)
+ return {"ok": True, "result": r.json().get("message", {}).get("content", "").strip(), "cost": 0, "model": OLLAMA_MODEL}
+ except Exception as e:
+ return {"ok": False, "result": f"Ollama: {e}", "cost": 0, "model": OLLAMA_MODEL}
+
+def call_brain(prompt, sys=None):
+ if ACTIVE_BRAIN == "grok":
+ return call_grok(prompt, sys)
+ elif ACTIVE_BRAIN == "gpt":
+ return call_gpt(prompt, sys)
+ else:
+ full = f"{sys}\n\nUser: {prompt}" if sys else prompt
+ return call_ollama(full)
+
+def process_ai_response(text):
+ """Handle EXECUTE: and SEARCH: directives in AI responses."""
+ lines = text.split("\n")
+ out_lines = []
+ exec_results = []
+
+ for line in lines:
+ s = line.strip()
+ if s.startswith("EXECUTE:"):
+ parts = s.replace("EXECUTE:", "").split(":", 1)
+ if len(parts) == 2:
+ host = resolve_host(parts[0].strip())
+ cmd = parts[1].strip()
+ if not host:
+ exec_results.append(f"[FAIL] Unknown host: `{parts[0]}`")
+ elif is_blocked(cmd):
+ exec_results.append(f"[BLOCKED] `{cmd}`")
+ elif is_safe(cmd):
+ r = run_local(cmd) if host == "perseus" else run_ssh(host, cmd)
+ st = "[OK]" if r["ok"] else "[FAIL]"
+ exec_results.append(f"{st} **{r['host']}** `{cmd}`\n```\n{r['out'][:800]}\n```")
+ else:
+ exec_results.append(f"[PENDING] **Needs approval:** `{cmd}` on `{host}`\nRun: `!exec {host} {cmd}`")
+ else:
+ out_lines.append(line)
+ elif s.startswith("SEARCH:"):
+ query = s.replace("SEARCH:", "").strip()
+ results = brave_search(query)
+ if results:
+ search_text = "\n".join(f"- **{r['title']}**: {r['desc']} ([link]({r['url']}))" for r in results)
+ exec_results.append(f"**Search: {query}**\n{search_text}")
+ else:
+ exec_results.append(f"No results for: {query}")
+ else:
+ out_lines.append(line)
+
+ final = "\n".join(out_lines).strip()
+ if exec_results:
+ final += "\n\n" + "\n".join(exec_results)
+ return final
+
+# =============================================================================
+# X/TWITTER - OAuth 1.0a posting
+# =============================================================================
+
+def x_sign(method, url, params, cs, ts):
+ sp = "&".join(f"{urllib.parse.quote(k,safe='')}={urllib.parse.quote(v,safe='')}" for k, v in sorted(params.items()))
+ bs = f"{method}&{urllib.parse.quote(url,safe='')}&{urllib.parse.quote(sp,safe='')}"
+ sk = f"{urllib.parse.quote(cs,safe='')}&{urllib.parse.quote(ts,safe='')}"
+ return base64.b64encode(hmac.new(sk.encode(), bs.encode(), hashlib.sha1).digest()).decode()
+
+def post_tweet(text):
+ url = "https://api.x.com/2/tweets"
+ op = {"oauth_consumer_key": X_CONSUMER_KEY, "oauth_token": X_ACCESS_TOKEN,
+ "oauth_signature_method": "HMAC-SHA1", "oauth_timestamp": str(int(time.time())),
+ "oauth_nonce": uuid.uuid4().hex, "oauth_version": "1.0"}
+ op["oauth_signature"] = x_sign("POST", url, op, X_CONSUMER_SECRET, X_ACCESS_TOKEN_SECRET)
+ ah = "OAuth " + ", ".join(f'{urllib.parse.quote(k,safe="")}="{urllib.parse.quote(v,safe="")}"' for k, v in sorted(op.items()))
+ try:
+ r = requests.post(url, headers={"Authorization": ah, "Content-Type": "application/json"},
+ json={"text": text}, timeout=30)
+ if r.status_code in [200, 201]:
+ tid = r.json().get("data", {}).get("id", "?")
+ return {"ok": True, "url": f"https://x.com/status/{tid}"}
+ return {"ok": False, "error": f"HTTP {r.status_code}: {r.text[:300]}"}
+ except Exception as e:
+ return {"ok": False, "error": str(e)}
+
+# =============================================================================
+# ANALYTICS API - publisher revenue tracking
+# =============================================================================
+
+def analytics_get(endpoint, extra_params=None):
+ """Generic GET for analytics/ad platform API."""
+ params = {"key": ANALYTICS_API_KEY}
+ if extra_params:
+ params.update(extra_params)
+ try:
+ r = requests.get(f"{ANALYTICS_BASE}{endpoint}", params=params, timeout=15)
+ if r.status_code == 200:
+ data = r.json()
+ if data.get("status") == "success":
+ return {"ok": True, "result": data.get("result", {})}
+ return {"ok": False, "error": f"API error: {data}"}
+ return {"ok": False, "error": f"HTTP {r.status_code}"}
+ except Exception as e:
+ return {"ok": False, "error": str(e)}
+
+def analytics_balance():
+ return analytics_get("/publisher/balance")
+
+def analytics_stats(date=None, date2=None, group="date"):
+ params = {"group": group}
+ if date:
+ params["date"] = date
+ if date2:
+ params["date2"] = date2
+ return analytics_get("/publisher/listStats", params)
+
+def analytics_full_report():
+ """Pull balance + yesterday stats + 7-day stats."""
+ today = datetime.utcnow().strftime('%Y-%m-%d')
+ yesterday = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d')
+ week_ago = (datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%d')
+ balance = analytics_balance()
+ daily = analytics_stats(date=yesterday)
+ weekly = analytics_stats(date=week_ago, date2=yesterday, group="date")
+ return {
+ "balance": balance.get("result", {}) if balance.get("ok") else {"error": balance.get("error")},
+ "yesterday": daily.get("result", {}) if daily.get("ok") else {"error": daily.get("error")},
+ "weekly": weekly.get("result", {}) if weekly.get("ok") else {"error": weekly.get("error")},
+ }
+
+def format_analytics_report(report):
+ """Format analytics data for Discord display."""
+ bal = report.get("balance", {})
+ bal_str = f"${bal.get('balance', 'N/A')} {bal.get('currency', '')}" if "error" not in bal else f"[FAIL] {bal['error']}"
+ yest = report.get("yesterday", {})
+ y_revenue = y_impressions = y_clicks = y_cpm = "N/A"
+ if "error" not in yest:
+ try:
+ total_rev = 0.0
+ total_imp = 0
+ total_clicks = 0
+ found = False
+ def walk_stats(obj):
+ nonlocal total_rev, total_imp, total_clicks, found
+ if isinstance(obj, dict):
+ if "revenue" in obj:
+ found = True
+ total_rev += float(obj.get("revenue", 0))
+ total_imp += int(obj.get("impressions", 0))
+ total_clicks += int(obj.get("clicks", 0))
+ else:
+ for v in obj.values():
+ walk_stats(v)
+ elif isinstance(obj, list):
+ for item in obj:
+ walk_stats(item)
+ walk_stats(yest)
+ if found:
+ y_revenue = f"${total_rev:.4f}"
+ y_impressions = f"{total_imp:,}"
+ y_clicks = f"{total_clicks:,}"
+ y_cpm = f"${(total_rev / total_imp * 1000):.4f}" if total_imp > 0 else "N/A"
+ except:
+ pass
+ week = report.get("weekly", {})
+ w_revenue = "N/A"
+ w_days = []
+ if "error" not in week:
+ try:
+ total_rev_w = 0.0
+ def walk_weekly(obj, date_key=None):
+ nonlocal total_rev_w, w_days
+ if isinstance(obj, dict):
+ if "revenue" in obj:
+ rev = float(obj.get("revenue", 0))
+ total_rev_w += rev
+ if date_key:
+ w_days.append((date_key, rev))
+ else:
+ for k, v in obj.items():
+ walk_weekly(v, date_key=k if len(k) == 10 and "-" in k else date_key)
+ elif isinstance(obj, list):
+ for item in obj:
+ walk_weekly(item, date_key)
+ walk_weekly(week)
+ w_revenue = f"${total_rev_w:.4f}"
+ except:
+ pass
+ txt = (
+ f"**Revenue Report**\n--------------------\n"
+ f"**Balance:** {bal_str}\n--------------------\n"
+ f"**Yesterday:**\n"
+ f" Revenue: {y_revenue}\n"
+ f" Impressions: {y_impressions}\n"
+ f" Clicks: {y_clicks}\n"
+ f" CPM: {y_cpm}\n--------------------\n"
+ f"**Last 7 Days:** {w_revenue} total\n"
+ )
+ if w_days:
+ w_days.sort()
+ for d, r in w_days:
+ txt += f" {d}: ${r:.4f}\n"
+ return txt
+
+# =============================================================================
+# BACKUP MANAGER - vzdump, pg_dump, config archive
+# =============================================================================
+
+BACKUP_TARGETS = {
+ "perseus_ct": {
+ "type": "vzdump",
+ "ct": 120,
+ "node": "node-1",
+ "desc": "Perseus container (FastAPI, bot, configs)",
+ "storage": "pbs-main",
+ "keep": 7,
+ },
+ "postgres": {
+ "type": "pg_dump",
+ "host": "perseus",
+ "db": "perseus",
+ "dest": os.path.join(os.path.dirname(os.path.abspath(__file__)), "backups"),
+ "keep": 7,
+ },
+}
+
+def run_backup(target_name):
+ """Execute a specific backup target. Returns status dict."""
+ target = BACKUP_TARGETS.get(target_name)
+ if not target:
+ return {"ok": False, "error": f"Unknown target: {target_name}"}
+ ts = datetime.utcnow().strftime('%Y-%m-%d_%H%M')
+ if target["type"] == "pg_dump":
+ dest = target["dest"]
+ cmd = f"mkdir -p {dest} && pg_dump {target['db']} > {dest}/perseus_{ts}.sql && ls -la {dest}/"
+ r = run_local(cmd)
+ if r["ok"]:
+ run_local(f"cd {dest} && ls -t *.sql | tail -n +{target['keep'] + 1} | xargs -r rm")
+ return {"ok": r["ok"], "target": target_name, "type": "pg_dump", "output": r["out"][:500]}
+ elif target["type"] == "vzdump":
+ ct = target["ct"]
+ cmd = f"vzdump {ct} --storage {target['storage']} --compress zstd --mode snapshot --quiet 1"
+ r = run_ssh(target["node"], cmd, timeout=300)
+ return {"ok": r["ok"], "target": target_name, "type": "vzdump", "output": r["out"][:500]}
+ return {"ok": False, "error": "Unknown backup type"}
+
+def run_all_backups():
+ results = []
+ for name in BACKUP_TARGETS:
+ r = run_backup(name)
+ results.append(r)
+ return results
+
+def format_backup_report(results):
+ txt = "**Backup Report**\n--------------------\n"
+ for r in results:
+ st = "[OK]" if r.get("ok") else "[FAIL]"
+ txt += f"{st} **{r.get('target', '?')}** ({r.get('type', '?')})\n"
+ if not r.get("ok"):
+ txt += f" Error: {r.get('error', r.get('output', 'unknown'))[:200]}\n"
+ txt += f"--------------------\n**Targets:** {len(BACKUP_TARGETS)} | **Time:** {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}"
+ return txt
+
+# =============================================================================
+# INFRASTRUCTURE MONITOR - CPU, RAM, disk, GPU health checks
+# =============================================================================
+
+INFRA_THRESHOLDS = {
+ "cpu_pct": 90,
+ "ram_pct": 90,
+ "disk_pct": 85,
+ "gpu_temp_c": 80,
+}
+
+def check_host_health(host_name):
+ """Check CPU, RAM, disk on a host. Returns health dict."""
+ health = {"host": host_name, "ok": True, "alerts": [], "metrics": {}}
+ if host_name == "perseus":
+ runner = run_local
+ else:
+ runner = lambda cmd: run_ssh(host_name, cmd)
+ r = runner("nproc && cat /proc/loadavg")
+ if r["ok"]:
+ try:
+ lines = r["out"].split("\n")
+ cores = int(lines[0].strip())
+ load1 = float(lines[1].split()[0])
+ cpu_pct = round((load1 / cores) * 100, 1)
+ health["metrics"]["cpu_pct"] = cpu_pct
+ health["metrics"]["load"] = load1
+ health["metrics"]["cores"] = cores
+ if cpu_pct > INFRA_THRESHOLDS["cpu_pct"]:
+ health["alerts"].append(f"[CRIT] CPU {cpu_pct}% (load {load1}/{cores} cores)")
+ health["ok"] = False
+ except:
+ pass
+ r = runner("free -m | grep Mem")
+ if r["ok"]:
+ try:
+ parts = r["out"].split()
+ total = int(parts[1])
+ used = int(parts[2])
+ ram_pct = round((used / total) * 100, 1)
+ health["metrics"]["ram_total_mb"] = total
+ health["metrics"]["ram_used_mb"] = used
+ health["metrics"]["ram_pct"] = ram_pct
+ if ram_pct > INFRA_THRESHOLDS["ram_pct"]:
+ health["alerts"].append(f"[CRIT] RAM {ram_pct}% ({used}/{total} MB)")
+ health["ok"] = False
+ except:
+ pass
+ r = runner("df -h / | tail -1")
+ if r["ok"]:
+ try:
+ parts = r["out"].split()
+ disk_pct = int(parts[4].replace("%", ""))
+ health["metrics"]["disk_pct"] = disk_pct
+ health["metrics"]["disk_size"] = parts[1]
+ health["metrics"]["disk_used"] = parts[2]
+ health["metrics"]["disk_avail"] = parts[3]
+ if disk_pct > INFRA_THRESHOLDS["disk_pct"]:
+ health["alerts"].append(f"[CRIT] Disk {disk_pct}% ({parts[2]}/{parts[1]})")
+ health["ok"] = False
+ except:
+ pass
+ return health
+
+def check_gpu_health():
+ health = {"host": "gpu", "ok": True, "alerts": [], "metrics": {}}
+ r = run_ssh("gpu", "nvidia-smi --query-gpu=utilization.gpu,memory.used,memory.total,temperature.gpu --format=csv,noheader,nounits")
+ if r["ok"]:
+ try:
+ parts = [x.strip() for x in r["out"].split(",")]
+ gpu_util = int(parts[0])
+ mem_used = int(parts[1])
+ mem_total = int(parts[2])
+ temp = int(parts[3])
+ health["metrics"]["gpu_util_pct"] = gpu_util
+ health["metrics"]["vram_used_mb"] = mem_used
+ health["metrics"]["vram_total_mb"] = mem_total
+ health["metrics"]["gpu_temp_c"] = temp
+ if temp > INFRA_THRESHOLDS["gpu_temp_c"]:
+ health["alerts"].append(f"[CRIT] GPU Temp {temp}C")
+ health["ok"] = False
+ except:
+ health["alerts"].append(f"[WARN] GPU parse error: {r['out'][:100]}")
+ else:
+ health["ok"] = False
+ health["alerts"].append(f"[FAIL] Can't reach GPU: {r['out'][:100]}")
+ return health
+
+def check_ollama_health():
+ health = {"host": "ollama", "ok": True, "alerts": [], "metrics": {}}
+ try:
+ r = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5)
+ models = [m["name"] for m in r.json().get("models", [])]
+ health["metrics"]["available_models"] = models
+ health["metrics"]["model_count"] = len(models)
+ except:
+ health["ok"] = False
+ health["alerts"].append("[FAIL] Ollama unreachable")
+ try:
+ r = requests.get(f"{OLLAMA_URL}/api/ps", timeout=5)
+ running = [m["name"] for m in r.json().get("models", [])]
+ health["metrics"]["loaded_models"] = running
+ except:
+ pass
+ return health
+
+def full_infra_check():
+ results = []
+ for host in ["node-1", "node-2"]:
+ results.append(check_host_health(host))
+ for ct in ["perseus", "dns", "gpu"]:
+ results.append(check_host_health(ct))
+ results.append(check_gpu_health())
+ results.append(check_ollama_health())
+ return results
+
+def format_infra_health(results):
+ all_ok = all(r.get("ok", False) for r in results)
+ overall = "[OK] All Clear" if all_ok else "[WARN] Issues Detected"
+ txt = f"**Infrastructure Health** -- {overall}\n--------------------\n"
+ for r in results:
+ host = r["host"]
+ st = "[OK]" if r["ok"] else "[CRIT]"
+ m = r.get("metrics", {})
+ txt += f"\n{st} **{host}**\n"
+ if "cpu_pct" in m:
+ txt += f" CPU: {m['cpu_pct']}% (load {m.get('load','?')}/{m.get('cores','?')} cores)\n"
+ if "ram_pct" in m:
+ txt += f" RAM: {m['ram_pct']}% ({m.get('ram_used_mb','?')}/{m.get('ram_total_mb','?')} MB)\n"
+ if "disk_pct" in m:
+ txt += f" Disk: {m['disk_pct']}% ({m.get('disk_used','?')}/{m.get('disk_size','?')})\n"
+ if "gpu_util_pct" in m:
+ txt += f" GPU: {m['gpu_util_pct']}% | VRAM: {m.get('vram_used_mb','?')}/{m.get('vram_total_mb','?')} MB | Temp: {m.get('gpu_temp_c','?')}C\n"
+ if "available_models" in m:
+ txt += f" Models: {', '.join(m['available_models'])}\n"
+ loaded = m.get("loaded_models", [])
+ if loaded:
+ txt += f" Loaded: {', '.join(loaded)}\n"
+ for alert in r.get("alerts", []):
+ txt += f" {alert}\n"
+ txt += f"\n--------------------\n**Checked:** {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}"
+ return txt
+
+# =============================================================================
+# AGENT PROMPT BUILDER - content and analysis agents
+# =============================================================================
+
+def build_prompt(agent, state):
+ c = state['command']
+ p = state.get('previous_result', {}).get('result', '')
+ if any(kw in str(p).lower() for kw in ["balance", "revenue", "cpm", "impressions"]):
+ p = ''
+ if agent == "social_publisher":
+ return f"""You write engaging social media posts for a content platform. Generate professional, high-engagement posts.
+
+Topic requested: {c}
+
+Rules:
+- Each post MUST be under 280 characters
+- Use hooks, trending hashtags, and strong calls to action
+- Write 5 posts in professional tone
+- Output ONLY JSON: [{{"post":"text","hashtags":["#tag"]}}]"""
+ elif agent == "compliance_scanner":
+ return f"""Content compliance scan. Analyze: '{c}' | {p}. Check for policy violations, copyright issues, and content quality. Output ONLY JSON: {{"risk_level":"low/medium/high","reason":"brief","confidence":"0-100%","action_recommended":"flag/monitor/approve"}}"""
+ elif agent == "content_curator":
+ return f"""Content curator. Analyze: '{c}' | {p}. Filter quality, remove duplicates, rank by engagement potential. Output JSON array."""
+ elif agent == "content_rewriter":
+ return f"""Rewrite content metadata for SEO: '{c}' | {p}. Generate 3 SEO-friendly variants. Output JSON array."""
+ else:
+ topic = c
+ for prefix in ["find trends on", "find trends in", "find trends for", "find trends about",
+ "find trends", "trending niches for", "trending niches in", "trending niches",
+ "trend hunt", "viral trends in", "viral trends on", "viral trends",
+ "what's trending in", "what's trending on", "what's trending", "whats trending"]:
+ if topic.lower().startswith(prefix):
+ topic = topic[len(prefix):].strip() or "general"
+ break
+ return f"""You are a trend analyst for digital content. Analyze current viral trends related to: {topic}
+
+Find 3-5 trending niches/topics. For each, explain WHY it's trending, name relevant creators if applicable, and estimate engagement potential.
+
+Output ONLY JSON: [{{"niche":"","reason":"","creators":[],"engagement_potential":"high/medium/low"}}]"""
+
+# =============================================================================
+# MAIN CHAT ENDPOINT - intent classification and agent routing
+# =============================================================================
+
+@app.post("/chat")
+async def chat(command: str = Body(...), previous_result: dict = Body(default=None)):
+ state = {"command": command, "previous_result": previous_result or {}}
+ if state["previous_result"]:
+ state["command"] += " | previous data: " + json.dumps(state["previous_result"])
+
+ lc = command.lower().strip()
+ clean = lc.rstrip("!?.")
+
+ greetings = {"hey","hi","hello","sup","yo","what's up","whats up","hey perseus","hi perseus",
+ "hello perseus","good morning","good night","gm","gn","wassup","what up","yo perseus"}
+ if clean in greetings:
+ return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
+ "result": f"Perseus online. Brain: **{ACTIVE_BRAIN}**\n`help` for commands",
+ "requires_approval": False})
+
+ if clean in {"help","commands","what can you do","?"}:
+ return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
+ "result": "**Perseus Commands**\n--------------------\n"
+ "**Talk naturally** -- anything goes to your active brain\n--------------------\n"
+ "**System:** `status` . `costs` . `infra` . `queue`\n"
+ "**Revenue:** `revenue` . `balance` . `ad stats`\n"
+ "**Content:** `post ideas for [x]` . `find trends` . `compliance scan [x]` . `rewrite [x]` . `curate [x]`\n"
+ "**Backup:** `backup` (run now) . `backup status` (check last)\n"
+ "**Health:** `health check` . `infra health` . `gpu health` . `disk usage`\n"
+ "**Search:** `search [query]` or just ask about weather/news\n--------------------\n"
+ "**! Commands:**\n"
+ "`!model` -- show/switch brain (grok . gpt . ollama)\n"
+ "`!exec [host] [cmd]` -- run command on host\n"
+ "`!backup [run|status]` -- manage backups\n"
+ "`!health` -- full infra health check\n"
+ "`!post [#]` . `!postall` . `!clearqueue` -- post queue",
+ "requires_approval": False})
+
+ if clean in {"status","health","system status"}:
+ up = datetime.utcnow() - STARTUP_TIME
+ h, rem = divmod(int(up.total_seconds()), 3600)
+ m, s = divmod(rem, 60)
+ try:
+ oc = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5)
+ o_st = "[OK]"
+ o_models = ", ".join(x["name"] for x in oc.json().get("models", []))
+ except:
+ o_st = "[FAIL]"; o_models = "N/A"
+ recent = get_recent_commands(5)
+ rt = ""
+ for r in recent:
+ a, c, t = r
+ rt += f" `{str(t)[:16]}` [{a}] {str(c)[:40]}\n"
+ rt = rt or " None\n"
+ costs = get_cost_summary()
+ return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
+ "result": f"**Perseus Status**\n--------------------\n"
+ f"**Uptime:** {h}h {m}m {s}s | **Brain:** `{ACTIVE_BRAIN}`\n"
+ f"**FastAPI:** [OK] :3002 | **Discord:** [OK] {bot.user}\n--------------------\n"
+ f"**Models:**\n Grok 4.1: ($2.00/$10.00 per 1M tok)\n GPT-4o-mini: ($0.15/$0.60 per 1M tok)\n Ollama: {o_st} ({OLLAMA_MODEL}) -- FREE\n Available: {o_models}\n"
+ f"--------------------\n"
+ f"**API Spend:** ${costs.get('today',0):.4f} today . ${costs.get('month',0):.4f}/mo . ${costs.get('budget_remaining',10):.2f} left\n"
+ f"**Agents:** {len(AGENT_REGISTRY)} | **Queue:** {len(PENDING_POSTS)} posts\n--------------------\n**Recent:**\n{rt}",
+ "requires_approval": False})
+
+ if clean in {"costs","cost","spend","budget","spending"}:
+ costs = get_cost_summary()
+ if "error" in costs:
+ return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
+ "result": f"[WARN] {costs['error']}", "requires_approval": False})
+ bf = min(int((costs['month'] / MONTHLY_BUDGET_CEILING) * 20), 20)
+ bar = "#" * bf + "-" * (20 - bf)
+ pct = (costs['month'] / MONTHLY_BUDGET_CEILING) * 100
+ ab = ""
+ for a, c in costs.get('agents', []):
+ ab += f" {a}: {c} calls\n"
+ ab = ab or " None today\n"
+ msg = (f"**Cost Tracker**\n--------------------\n"
+ f"**Today:** ${costs['today']:.4f} | **Month:** ${costs['month']:.4f} | **All Time:** ${costs['total']:.4f}\n"
+ f"**Budget:** [{bar}] {pct:.1f}% -- ${costs['budget_remaining']:.2f} left\n--------------------\n"
+ f"**API Pricing:**\n"
+ f" Grok 4.1 Fast: $2.00 in / $10.00 out per 1M tokens\n"
+ f" GPT-4o-mini: $0.15 in / $0.60 out per 1M tokens\n"
+ f" Ollama (local): FREE\n--------------------\n"
+ f"**Today's Usage:**\n{ab}")
+ if costs['budget_remaining'] < 2.0:
+ msg += "\n[WARN] **Under $2 left!**"
+ return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
+ "result": msg, "requires_approval": False})
+
+ if clean in {"infra","infrastructure","hosts","nodes","map"}:
+ txt = "**Infrastructure**\n--------------------\n"
+ for n, i in INFRA_MAP.items():
+ ct = f" (CT {i['ct']})" if 'ct' in i else ""
+ txt += f"**{n}**{ct} . `{i['ip']}` . {i['desc']}\n"
+ return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
+ "result": txt, "requires_approval": False})
+
+ if clean in {"queue","post queue","pending","pending posts"}:
+ if not PENDING_POSTS:
+ msg = "Queue empty. Try `post ideas for [topic]`"
+ else:
+ msg = "**Post Queue:**\n--------------------\n"
+ for i, tw in enumerate(PENDING_POSTS, 1):
+ msg += f"**{i}.** {tw['text'][:200]}\n"
+ msg += "\n`!post [#]` . `!postall` . `!clearqueue`"
+ return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
+ "result": msg, "requires_approval": False})
+
+ if lc.startswith("search "):
+ query = command[7:].strip()
+ results = brave_search(query, 5)
+ if results:
+ txt = f"**Search: {query}**\n--------------------\n"
+ for r in results:
+ txt += f"- **{r['title']}**\n {r['desc']}\n {r['url']}\n\n"
+ else:
+ txt = f"No results for: {query}"
+ return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
+ "result": txt, "requires_approval": False})
+
+ if lc.startswith("ssh "):
+ parts = command.strip().split(None, 2)
+ if len(parts) >= 3:
+ host = resolve_host(parts[1])
+ cmd = parts[2]
+ if not host:
+ return JSONResponse({"agent": "command_executor", "timestamp": datetime.utcnow().isoformat(),
+ "result": f"[FAIL] Unknown host: `{parts[1]}`. Type `infra`.", "requires_approval": False})
+ if is_blocked(cmd):
+ return JSONResponse({"agent": "command_executor", "timestamp": datetime.utcnow().isoformat(),
+ "result": "[BLOCKED]", "requires_approval": False})
+ r = run_local(cmd) if host == "perseus" else run_ssh(host, cmd)
+ st = "[OK]" if r["ok"] else "[FAIL]"
+ save_to_db("command_executor", f"ssh {host} {cmd}", r["out"])
+ return JSONResponse({"agent": "command_executor", "timestamp": datetime.utcnow().isoformat(),
+ "result": f"{st} **{r['host']}** `{cmd}`\n```\n{r['out'][:1500]}\n```", "requires_approval": False})
+
+ # Revenue / analytics triggers
+ revenue_triggers = ["revenue", "balance", "ad stats", "analytics", "ad revenue",
+ "earnings", "ad earnings", "publisher stats",
+ "how much did we make", "how much money", "cpm", "ad performance"]
+ if any(t in lc for t in revenue_triggers):
+ report = analytics_full_report()
+ raw_display = format_analytics_report(report)
+ grok_prompt = f"""You are Perseus, a revenue analyst. Here is the latest analytics data:
+{json.dumps(report, indent=2, default=str)}
+
+Provide a brief, actionable analysis:
+1. How is revenue trending? (up/down/flat vs prior days)
+2. What's the CPM looking like?
+3. Any red flags (zero impressions, drops, zones not performing)?
+4. One concrete recommendation to increase revenue.
+
+Keep it under 200 words. Be direct -- facts and actions, not fluff."""
+ brain = call_grok(grok_prompt)
+ analysis = brain.get("result", "Analysis unavailable")
+ cost = brain.get("cost", 0)
+ full_result = raw_display + f"\n--------------------\n**Analysis:**\n{analysis}"
+ save_to_db("revenue_tracker", command, full_result, cost)
+ return JSONResponse({"agent": "revenue_tracker", "timestamp": datetime.utcnow().isoformat(),
+ "result": full_result, "requires_approval": False, "model": "grok + analytics-api"})
+
+ # Backup triggers
+ backup_triggers = ["backup", "run backup", "backup now", "backup status", "backups",
+ "last backup", "check backups", "backup report"]
+ if any(t in lc for t in backup_triggers):
+ if any(t in lc for t in ["status", "last", "check"]):
+ result = "**Backup Status**\n--------------------\nUse `!backup status` for detailed check."
+ save_to_db("backup_manager", command, result)
+ return JSONResponse({"agent": "backup_manager", "timestamp": datetime.utcnow().isoformat(),
+ "result": result, "requires_approval": False, "model": "local"})
+ else:
+ results = run_all_backups()
+ result = format_backup_report(results)
+ failed = [r for r in results if not r.get("ok")]
+ if failed:
+ send_alert_sync(f"[WARN] **Backup failures:** {len(failed)}/{len(results)}\n{result}")
+ save_to_db("backup_manager", command, result)
+ return JSONResponse({"agent": "backup_manager", "timestamp": datetime.utcnow().isoformat(),
+ "result": result, "requires_approval": False, "model": "local"})
+
+ # Infrastructure health triggers
+ infra_health_triggers = ["health check", "infra health", "system health", "check health",
+ "monitor", "infra check", "are hosts ok", "host health",
+ "check all hosts", "diagnostics", "gpu health", "ram usage",
+ "disk usage", "cpu usage"]
+ if any(t in lc for t in infra_health_triggers):
+ results = full_infra_check()
+ result = format_infra_health(results)
+ alerts = []
+ for r in results:
+ alerts.extend(r.get("alerts", []))
+ if alerts:
+ send_alert_sync(f"[WARN] **Infra Alerts:**\n" + "\n".join(alerts))
+ save_to_db("infra_monitor", command, result)
+ return JSONResponse({"agent": "infra_monitor", "timestamp": datetime.utcnow().isoformat(),
+ "result": result, "requires_approval": False, "model": "local"})
+
+ # Content agent triggers
+ agent_name = None
+ if any(t in lc for t in ["post ideas", "post idea", "generate post", "social post"]):
+ agent_name = "social_publisher"
+ elif any(t in lc for t in ["find trends", "trending niches", "trend hunt", "viral trends", "what's trending", "whats trending"]):
+ agent_name = "trend_hunter"
+ elif any(t in lc for t in ["curate content", "filter content", "dedup", "deduplicate"]):
+ agent_name = "content_curator"
+ elif any(t in lc for t in ["rewrite this", "rephrase this", "rewrite caption", "generate variant"]):
+ agent_name = "content_rewriter"
+ elif any(t in lc for t in ["compliance scan", "policy scan", "content scan", "legal scan"]):
+ agent_name = "compliance_scanner"
+
+ if agent_name:
+ prompt = build_prompt(agent_name, state)
+ local = AGENT_REGISTRY.get(agent_name, {}).get("local", True)
+ cost = 0.0
+ req_approval = False
+ if local:
+ try:
+ result_data = call_ollama(prompt)
+ result = result_data.get("result", "")
+ if agent_name == "social_publisher":
+ try:
+ js = result.find("["); je = result.rfind("]") + 1
+ if js >= 0 and je > js:
+ posts = json.loads(result[js:je])
+ for p in posts:
+ t = p.get("post", p.get("tweet", ""))
+ if t:
+ PENDING_POSTS.append({"text": t, "by": "discord", "ts": datetime.utcnow().isoformat()})
+ result += f"\n\n[OK] **{len(posts)} queued.** `queue` . `!post [#]`"
+ except:
+ pass
+ except Exception as e:
+ result = f"Ollama: {e}"
+ else:
+ if agent_name == "compliance_scanner":
+ try:
+ gr = requests.post("https://api.openai.com/v1/chat/completions",
+ headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
+ json={"model": "gpt-4o-mini", "messages": [{"role": "user", "content": prompt}],
+ "temperature": 0.5, "max_tokens": 200}, timeout=30)
+ rj = gr.json()
+ result = rj["choices"][0]["message"]["content"].strip()
+ u = rj.get("usage", {})
+ cost = (u.get("prompt_tokens", 0) / 1e6 * 0.15) + (u.get("completion_tokens", 0) / 1e6 * 0.60)
+ if '"risk_level": "high"' in result:
+ req_approval = True
+ send_alert_sync(f"[WARN] **HIGH RISK**\n`{command}`\n```{result}```\n`!approve` / `!deny`")
+ result += " | APPROVAL PENDING"
+ except Exception as e:
+ result = f"GPT: {e}"
+ else:
+ result = f"External ({agent_name}): {command}"
+ save_to_db(agent_name, command, result, cost)
+ agent_model = OLLAMA_MODEL if local else ("gpt-4o-mini" if agent_name == "compliance_scanner" else "external")
+ return JSONResponse({"agent": agent_name, "timestamp": datetime.utcnow().isoformat(),
+ "result": result, "requires_approval": req_approval, "model": agent_model})
+
+ # General assistant -- everything else goes to the active brain
+ prev = previous_result or {}
+ context_block = ""
+ if prev:
+ prev_agent = prev.get("agent", "")
+ prev_result = str(prev.get("result", ""))[:500]
+ context_block = f"\n\n[Previous exchange -- agent: {prev_agent}, response: {prev_result}]\nThe user may be following up on this."
+
+ search_triggers = ["weather", "news", "price of", "stock", "score", "who won", "what happened"]
+ enhanced_prompt = command
+ if any(t in lc for t in search_triggers):
+ results = brave_search(command, 3)
+ if results:
+ context = "\n".join(f"- {r['title']}: {r['desc']}" for r in results)
+ enhanced_prompt = f"{command}\n\nRecent search results:\n{context}\n\nUse these to answer accurately."
+
+ enhanced_prompt += context_block
+ brain = call_brain(enhanced_prompt, SYSTEM_PROMPT)
+ result = brain["result"]
+ cost = brain.get("cost", 0)
+ model_used = brain.get("model", ACTIVE_BRAIN)
+
+ if ACTIVE_BRAIN == "ollama":
+ for marker in ["\nUser:", "\nuser:", "\nHuman:", "\nhuman:", "\nAssistant:", "\nassistant:"]:
+ idx = result.find(marker)
+ if idx > 0:
+ result = result[:idx].strip()
+
+ if "EXECUTE:" in result or "SEARCH:" in result:
+ result = process_ai_response(result)
+
+ save_to_db("general_assistant", command, result, cost)
+ return JSONResponse({"agent": "general_assistant", "timestamp": datetime.utcnow().isoformat(),
+ "result": result, "requires_approval": False, "model": model_used})
+
+# =============================================================================
+# API ENDPOINTS
+# =============================================================================
+
+@app.post("/agents/{agent_name}")
+async def run_agent(agent_name: str, payload: dict = Body(...)):
+ save_to_db(agent_name, json.dumps(payload), "executed")
+ return JSONResponse({"agent": agent_name, "result": "executed", "timestamp": datetime.utcnow().isoformat()})
+
+# =============================================================================
+# DISCORD ! COMMANDS
+# =============================================================================
+
+@bot.command(name="approve")
+async def approve_cmd(ctx):
+ if allowed_ctx(ctx):
+ await ctx.send("[OK] Action **approved**.")
+
+@bot.command(name="deny")
+async def deny_cmd(ctx):
+ if allowed_ctx(ctx):
+ await ctx.send("[DENY] Action **denied**.")
+
+@bot.command(name="model")
+async def switch_model(ctx, *, name: str = None):
+ global ACTIVE_BRAIN, OLLAMA_MODEL
+ if not allowed_ctx(ctx):
+ return
+ if not name:
+ try:
+ oc = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5)
+ om = ", ".join(f"`{m['name']}`" for m in oc.json().get("models", []))
+ except:
+ om = "can't reach"
+ await ctx.send(f"**Brain:** `{ACTIVE_BRAIN}` | **Ollama:** `{OLLAMA_MODEL}`\n--------------------\n"
+ f"Switch: `!model grok` . `!model gpt` . `!model ollama`\n"
+ f"Ollama: `!model ollama [name]`\nAvailable: {om}")
+ return
+ n = name.lower().strip()
+ if n == "grok":
+ ACTIVE_BRAIN = "grok"
+ await ctx.send("Brain -> **Grok 4.1 Fast** ($2/$10 per 1M tok)")
+ elif n in {"gpt", "gpt4", "gpt-4o-mini", "openai", "mini"}:
+ ACTIVE_BRAIN = "gpt"
+ await ctx.send("Brain -> **GPT-4o-mini** ($0.15/$0.60 per 1M tok)")
+ elif n in {"ollama", "local"}:
+ ACTIVE_BRAIN = "ollama"
+ await ctx.send(f"Brain -> **Ollama** (`{OLLAMA_MODEL}`) -- FREE")
+ elif n.startswith("ollama "):
+ target = n[7:].strip()
+ try:
+ oc = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5)
+ avail = [m["name"] for m in oc.json().get("models", [])]
+ matched = next((m for m in avail if target in m.lower()), None)
+ if matched:
+ old = OLLAMA_MODEL
+ OLLAMA_MODEL = matched
+ ACTIVE_BRAIN = "ollama"
+ await ctx.send(f"`{old}` -> `{matched}` | Brain -> **ollama**")
+ else:
+ await ctx.send(f"[FAIL] `{target}` not found. Available: {', '.join(f'`{m}`' for m in avail)}")
+ except:
+ await ctx.send("[FAIL] Can't reach Ollama")
+ else:
+ await ctx.send("Use: `!model grok` . `!model gpt` . `!model ollama` . `!model ollama [name]`")
+
+@bot.command(name="exec")
+async def exec_cmd(ctx, host: str = None, *, cmd: str = None):
+ if not allowed_ctx(ctx):
+ return
+ if not host or not cmd:
+ await ctx.send("Usage: `!exec [host] [command]`\nHosts: " + ", ".join(f"`{h}`" for h in INFRA_MAP.keys()))
+ return
+ h = resolve_host(host)
+ if not h:
+ await ctx.send(f"[FAIL] Unknown host `{host}`. Hosts: " + ", ".join(f"`{k}`" for k in INFRA_MAP.keys()))
+ return
+ if is_blocked(cmd):
+ await ctx.send("[BLOCKED]")
+ return
+ async with ctx.typing():
+ r = run_local(cmd) if h == "perseus" else run_ssh(h, cmd)
+ st = "[OK]" if r["ok"] else "[FAIL]"
+ save_to_db("command_executor", f"!exec {h} {cmd}", r["out"])
+ await ctx.send(f"{st} **{r['host']}** `{cmd}`\n```\n{r['out'][:1500]}\n```")
+
+@bot.command(name="post")
+async def post_cmd(ctx, number: int = None):
+ if not allowed_ctx(ctx):
+ return
+ if not PENDING_POSTS:
+ await ctx.send("Queue empty."); return
+ if not number or number < 1 or number > len(PENDING_POSTS):
+ await ctx.send(f"Pick 1-{len(PENDING_POSTS)}"); return
+ tw = PENDING_POSTS.pop(number - 1)
+ await ctx.send(f"Posting #{number}...")
+ r = post_tweet(tw["text"])
+ if r["ok"]:
+ await ctx.send(f"[OK] {r['url']}"); save_to_db("social_publisher", "posted", tw["text"])
+ else:
+ await ctx.send(f"[FAIL] {r['error']}"); PENDING_POSTS.insert(number - 1, tw)
+
+@bot.command(name="postall")
+async def postall_cmd(ctx):
+ if not allowed_ctx(ctx):
+ return
+ if not PENDING_POSTS:
+ await ctx.send("Queue empty."); return
+ n = len(PENDING_POSTS)
+ await ctx.send(f"Posting {n}...")
+ ok = fail = 0
+ while PENDING_POSTS:
+ tw = PENDING_POSTS.pop(0)
+ r = post_tweet(tw["text"])
+ if r["ok"]:
+ ok += 1; await ctx.send(f"[OK] [{ok}/{n}] {r['url']}"); save_to_db("social_publisher", "posted", tw["text"])
+ else:
+ fail += 1; await ctx.send(f"[FAIL] [{ok+fail}/{n}] {r['error']}")
+ if PENDING_POSTS:
+ await asyncio.sleep(30)
+ await ctx.send(f"Done: {ok} posted, {fail} failed.")
+
+@bot.command(name="clearqueue")
+async def clearq(ctx):
+ if not allowed_ctx(ctx):
+ return
+ n = len(PENDING_POSTS); PENDING_POSTS.clear()
+ await ctx.send(f"Cleared {n}.")
+
+@bot.command(name="backup")
+async def backup_cmd(ctx, action: str = "run"):
+ if not allowed_ctx(ctx):
+ return
+ async with ctx.typing():
+ if action.lower() in ["status", "check", "last"]:
+ result = "Use `backup status` in chat for detailed check."
+ else:
+ await ctx.send("Running all backups...")
+ results = run_all_backups()
+ result = format_backup_report(results)
+ if len(result) > 1990:
+ result = result[:1990] + "..."
+ await ctx.send(result)
+
+@bot.command(name="health")
+async def health_cmd(ctx):
+ if not allowed_ctx(ctx):
+ return
+ async with ctx.typing():
+ results = full_infra_check()
+ result = format_infra_health(results)
+ chunks = []
+ while len(result) > 1990:
+ split_at = result.rfind("\n", 0, 1990)
+ if split_at < 500:
+ split_at = 1990
+ chunks.append(result[:split_at])
+ result = result[split_at:].lstrip("\n")
+ chunks.append(result)
+ for chunk in chunks:
+ await ctx.send(chunk)
+
+# =============================================================================
+# STARTUP
+# =============================================================================
+
+print(f"Perseus ready | Brain: {ACTIVE_BRAIN} | Ollama: {OLLAMA_MODEL} | Hosts: {len(INFRA_MAP)} | Agents: {len(AGENT_REGISTRY)} | Search: Brave")
video-api.js +127 -0
@@ -0,0 +1,127 @@
+const express = require('express');
+const Database = require('better-sqlite3');
+const Joi = require('joi');
+const cors = require('cors');
+const helmet = require('helmet');
+const morgan = require('morgan');
+const swaggerUi = require('swagger-ui-express');
+const jwt = require('jsonwebtoken');
+const bcrypt = require('bcryptjs');
+const rateLimit = require('express-rate-limit');
+
+const app = express();
+const PORT = process.env.PORT || 3000;
+const JWT_SECRET = process.env.JWT_SECRET || 'your-super-secret-jwt-key-change-this';
+
+// Middleware
+app.use(helmet());
+app.use(cors());
+app.use(express.json());
+app.use(morgan('combined'));
+const limiter = rateLimit({ windowMs: 15 * 60 * 1000, max: 100 });
+app.use('/api/', limiter);
+
+// SQLite setup
+const db = new Database(':memory:');
+db.exec(`CREATE TABLE users (id INTEGER PRIMARY KEY, username TEXT UNIQUE, password TEXT)`);
+db.exec(`CREATE TABLE videos (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, description TEXT, url TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP)`);
+const hashed = bcrypt.hashSync('admin', 10);
+db.prepare(`INSERT INTO users (username, password) VALUES ('admin', ?)`).run(hashed);
+
+// Auth middleware
+const authenticateToken = (req, res, next) => {
+ const token = req.header('Authorization')?.replace('Bearer ', '');
+ if (!token) return res.status(401).json({ message: 'Access token required' });
+ jwt.verify(token, JWT_SECRET, (err, user) => {
+ if (err) return res.status(403).json({ message: 'Invalid token' });
+ req.user = user;
+ next();
+ });
+};
+
+// Joi schemas
+const videoSchema = Joi.object({
+ title: Joi.string().min(1).max(255).required(),
+ description: Joi.string().min(1).max(1000).required(),
+ url: Joi.string().uri().required()
+});
+
+// Login endpoint
+app.post('/api/login', (req, res) => {
+ const { username, password } = req.body;
+ const user = db.prepare('SELECT * FROM users WHERE username = ?').get(username);
+ if (!user || !bcrypt.compareSync(password, user.password)) {
+ return res.status(401).json({ message: 'Invalid credentials' });
+ }
+ const token = jwt.sign({ id: user.id }, JWT_SECRET, { expiresIn: '24h' });
+ res.json({ token });
+});
+
+// CRUD Videos
+app.get('/api/videos', authenticateToken, (req, res) => {
+ const page = parseInt(req.query.page) || 1;
+ const limit = parseInt(req.query.limit) || 10;
+ const offset = (page - 1) * limit;
+ const rows = db.prepare('SELECT * FROM videos ORDER BY created_at DESC LIMIT ? OFFSET ?').all(limit, offset);
+ const count = db.prepare('SELECT COUNT(*) as count FROM videos').get();
+ res.json({ data: rows, pagination: { page, limit, total: count.count, pages: Math.ceil(count.count / limit) } });
+});
+
+app.get('/api/videos/:id', authenticateToken, (req, res) => {
+ const row = db.prepare('SELECT * FROM videos WHERE id = ?').get(req.params.id);
+ if (!row) return res.status(404).json({ message: 'Video not found' });
+ res.json(row);
+});
+
+app.post('/api/videos', authenticateToken, (req, res) => {
+ const { error } = videoSchema.validate(req.body);
+ if (error) return res.status(400).json({ message: error.details[0].message });
+ const result = db.prepare('INSERT INTO videos (title, description, url) VALUES (?, ?, ?)').run(req.body.title, req.body.description, req.body.url);
+ res.status(201).json({ id: result.lastInsertRowid });
+});
+
+app.put('/api/videos/:id', authenticateToken, (req, res) => {
+ const { error } = videoSchema.validate(req.body);
+ if (error) return res.status(400).json({ message: error.details[0].message });
+ const result = db.prepare('UPDATE videos SET title = ?, description = ?, url = ? WHERE id = ?').run(req.body.title, req.body.description, req.body.url, req.params.id);
+ if (result.changes === 0) return res.status(404).json({ message: 'Video not found' });
+ res.json({ updated: true });
+});
+
+app.delete('/api/videos/:id', authenticateToken, (req, res) => {
+ const result = db.prepare('DELETE FROM videos WHERE id = ?').run(req.params.id);
+ if (result.changes === 0) return res.status(404).json({ message: 'Video not found' });
+ res.status(204).send();
+});
+
+// Search endpoint
+app.get('/api/search', authenticateToken, (req, res) => {
+ const q = req.query.q;
+ if (!q) return res.status(400).json({ message: 'Query required' });
+ const rows = db.prepare('SELECT * FROM videos WHERE title LIKE ? OR description LIKE ?').all(`%${q}%`, `%${q}%`);
+ res.json(rows);
+});
+
+// Swagger docs
+app.use('/docs', swaggerUi.serve, swaggerUi.setup({
+ openapi: '3.0.0',
+ info: { title: 'Video Library API', version: '1.0.0' },
+ paths: {
+ '/api/login': {
+ post: {
+ summary: 'Login',
+ requestBody: { content: { 'application/json': { schema: { type: 'object', properties: { username: { type: 'string' }, password: { type: 'string' } } } } } },
+ responses: { '200': { description: 'Token' } }
+ }
+ },
+ '/api/videos': {
+ get: { summary: 'List videos', responses: { '200': { description: 'Videos' } } },
+ post: { summary: 'Create video', responses: { '201': { description: 'Created' } } }
+ }
+ }
+}));
+
+// Error handler
+app.use((err, req, res, next) => res.status(500).json({ message: err.message }));
+
+app.listen(PORT, () => console.log(`Video API running on http://localhost:${PORT}`));