Zion Boggan zionboggan.com ↗
1246 lines · python
History for this file →
1
import os
2
import time
3
import subprocess
4
from datetime import datetime, timedelta
5
from fastapi import FastAPI, Body
6
from fastapi.responses import JSONResponse
7
import json
8
import psycopg2
9
import requests
10
import discord
11
from discord.ext import commands
12
import asyncio
13
import threading
14
import hmac
15
import hashlib
16
import base64
17
import urllib.parse
18
import uuid
19
 
20
 
21
def load_env(path=None):
22
    if path is None:
23
        path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")
24
    if os.path.exists(path):
25
        with open(path) as f:
26
            for line in f:
27
                line = line.strip()
28
                if line and not line.startswith("#") and "=" in line:
29
                    k, _, v = line.partition("=")
30
                    os.environ.setdefault(k.strip(), v.strip().strip('"').strip("'"))
31
 
32
load_env()
33
 
34
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
35
XAI_API_KEY = os.getenv("XAI_API_KEY", "")
36
BRAVE_API_KEY = os.getenv("BRAVE_API_KEY", "")
37
DISCORD_TOKEN = os.getenv("DISCORD_TOKEN", "")
38
DISCORD_CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID", "0"))
39
OWNER_DISCORD_ID = int(os.getenv("OWNER_DISCORD_ID", "0"))
40
 
41
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434")
42
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3:8b")
43
ACTIVE_BRAIN = "grok"
44
 
45
X_CONSUMER_KEY = os.getenv("X_CONSUMER_KEY", "")
46
X_CONSUMER_SECRET = os.getenv("X_CONSUMER_SECRET", "")
47
X_ACCESS_TOKEN = os.getenv("X_ACCESS_TOKEN", "")
48
X_ACCESS_TOKEN_SECRET = os.getenv("X_ACCESS_TOKEN_SECRET", "")
49
 
50
ANALYTICS_API_KEY = os.getenv("ANALYTICS_API_KEY", "")
51
ANALYTICS_BASE = os.getenv("ANALYTICS_BASE_URL", "https://api.example.com")
52
 
53
MODEL_COSTS = {
54
    "gpt-4o-mini": {"input_per_1m": 0.15, "output_per_1m": 0.60},
55
    "grok-4-1-fast-reasoning": {"input_per_1m": 2.00, "output_per_1m": 10.00},
56
    "ollama": {"input_per_1m": 0.0, "output_per_1m": 0.0},
57
}
58
MONTHLY_BUDGET_CEILING = float(os.getenv("MONTHLY_BUDGET", "10.00"))
59
 
60
RATE_LIMIT = {}
61
RATE_LIMIT_SECONDS = 5
62
CHANNEL_LAST_RESULT = {}
63
STARTUP_TIME = datetime.utcnow()
64
PENDING_POSTS = []
65
 
66
app = FastAPI(title="Perseus")
67
DB_CONFIG = {
68
    "dbname": os.getenv("PG_DBNAME", "perseus"),
69
    "user": os.getenv("PG_USER", "perseus"),
70
    "password": os.getenv("PG_PASSWORD", ""),
71
    "host": os.getenv("PG_HOST", "localhost"),
72
}
73
 
74
 
75
INFRA_MAP = {
76
    "node-1": {"ip": os.getenv("NODE1_IP", "REDACTED-IP"), "type": "proxmox_host", "desc": "Proxmox host 1"},
77
    "node-2": {"ip": os.getenv("NODE2_IP", "REDACTED-IP"), "type": "proxmox_host", "desc": "Proxmox host 2 (GPU)"},
78
    "perseus": {"ip": os.getenv("PERSEUS_IP", "REDACTED-IP"), "type": "lxc", "ct": 120, "node": "node-1", "desc": "Perseus main (FastAPI + Discord)"},
79
    "vpn": {"ip": os.getenv("VPN_IP", "REDACTED-IP"), "type": "lxc", "ct": 200, "node": "node-1", "desc": "WireGuard VPN"},
80
    "dns": {"ip": os.getenv("DNS_IP", "REDACTED-IP"), "type": "lxc", "ct": 201, "node": "node-1", "desc": "Pi-hole DNS"},
81
    "backup": {"ip": os.getenv("BACKUP_IP", "REDACTED-IP"), "type": "lxc", "ct": 203, "node": "node-1", "desc": "Proxmox Backup Server"},
82
    "gpu": {"ip": os.getenv("GPU_IP", "REDACTED-IP"), "type": "lxc", "ct": 205, "node": "node-2", "desc": "Ollama + GPU inference"},
83
}
84
 
85
HOST_ALIASES = {
86
    "gpu node": "node-2", "compute": "node-2",
87
    "pi-hole": "dns", "pihole": "dns", "adblock": "dns",
88
    "wireguard": "vpn", "wg": "vpn",
89
    "backup server": "backup", "pbs": "backup",
90
    "ollama": "gpu", "llm": "gpu",
91
    "main": "perseus", "self": "perseus", "local": "perseus",
92
}
93
 
94
BLOCKED_COMMANDS = ["rm -rf /", "rm -rf /*", "mkfs", "dd if=/dev/zero", ":(){ :|:& };:",
95
    "chmod -R 777 /", "shutdown", "reboot", "poweroff", "init 0", "init 6", "halt"]
96
 
97
AUTO_APPROVE = ["ls", "cat", "grep", "find", "df", "du", "free", "top", "uptime", "hostname",
98
    "whoami", "id", "ps", "systemctl status", "journalctl", "ip a", "ip addr", "ifconfig",
99
    "ping", "nvidia-smi", "ollama list", "ollama ps", "pct list", "qm list", "pvesh",
100
    "pihole", "docker ps", "head", "tail", "wc", "sort", "date", "cal", "echo", "pwd", "env"]
101
 
102
AGENT_REGISTRY = {
103
    "scraper": {"model": "langgraph", "local": True},
104
    "content_curator": {"model": "ollama", "local": True},
105
    "trend_hunter": {"model": "ollama", "local": True},
106
    "social_publisher": {"model": "ollama", "local": True},
107
    "compliance_scanner": {"model": "gpt4o-mini", "local": False},
108
    "ab_testing_orchestrator": {"model": "gpt4o-mini", "local": False},
109
    "analytics_reporter": {"model": "langgraph", "local": True},
110
    "backup_manager": {"model": "langgraph", "local": True},
111
    "competitor_intelligence": {"model": "grok-fast", "local": False},
112
    "content_rewriter": {"model": "ollama", "local": True},
113
    "engagement_analyst": {"model": "ollama", "local": True},
114
    "funnel_optimizer": {"model": "claude", "local": False},
115
    "traffic_router": {"model": "langgraph", "local": True},
116
    "scheduler_intelligence": {"model": "langgraph", "local": True},
117
    "model_router": {"model": "langgraph", "local": True},
118
    "rate_limit_resilience": {"model": "langgraph", "local": True},
119
    "infra_monitor": {"model": "langgraph", "local": True},
120
    "compliance_approval": {"model": "langgraph", "local": True},
121
    "capability_builder": {"model": "langgraph", "local": True},
122
    "revenue_tracker": {"model": "grok-fast", "local": False},
123
}
124
 
125
 
126
intents = discord.Intents.default()
127
intents.message_content = True
128
bot = commands.Bot(command_prefix='!', intents=intents)
129
bot_loop = None
130
 
131
def allowed_ctx(ctx):
132
    """Check if command is from group channel or owner DM."""
133
    if ctx.channel.id == DISCORD_CHANNEL_ID:
134
        return True
135
    if isinstance(ctx.channel, discord.DMChannel) and ctx.author.id == OWNER_DISCORD_ID:
136
        return True
137
    return False
138
 
139
@bot.event
140
async def on_ready():
141
    global bot_loop
142
    bot_loop = asyncio.get_event_loop()
143
    print(f"Discord bot logged in as {bot.user}")
144
 
145
async def send_discord_alert(msg):
146
    try:
147
        ch = bot.get_channel(DISCORD_CHANNEL_ID)
148
        if ch:
149
            await ch.send(msg)
150
    except Exception as e:
151
        print(f"Alert fail: {e}")
152
 
153
def send_alert_sync(msg):
154
    if bot_loop and bot_loop.is_running():
155
        asyncio.run_coroutine_threadsafe(send_discord_alert(msg), bot_loop)
156
 
157
@bot.event
158
async def on_message(message):
159
    if message.author == bot.user:
160
        return
161
    is_group = message.channel.id == DISCORD_CHANNEL_ID
162
    is_owner_dm = isinstance(message.channel, discord.DMChannel) and message.author.id == OWNER_DISCORD_ID
163
    if not is_group and not is_owner_dm:
164
        return
165
    if message.content.startswith("!"):
166
        await bot.process_commands(message)
167
        return
168
    if not message.content.strip():
169
        return
170
    uid = str(message.author.id)
171
    now = time.time()
172
    if uid in RATE_LIMIT and (now - RATE_LIMIT[uid]) < RATE_LIMIT_SECONDS:
173
        await message.reply(f"Rate limited -- wait {RATE_LIMIT_SECONDS - (now - RATE_LIMIT[uid]):.0f}s")
174
        return
175
    RATE_LIMIT[uid] = now
176
 
177
    async with message.channel.typing():
178
        try:
179
            payload = {"command": message.content}
180
            last = CHANNEL_LAST_RESULT.get(message.channel.id)
181
            if last:
182
                payload["previous_result"] = last
183
            resp = requests.post("http://localhost:3002/chat", json=payload, timeout=120)
184
            data = resp.json()
185
            agent = data.get("agent", "unknown")
186
            result = data.get("result", "No result")
187
            model = data.get("model", "")
188
            CHANNEL_LAST_RESULT[message.channel.id] = {"agent": agent, "result": result}
189
            model_sig = f" . `{model}`" if model else ""
190
            header = f"**[{agent}]{model_sig}**\n"
191
            full = header + result
192
            chunks = []
193
            while len(full) > 1990:
194
                split_at = full.rfind("\n", 0, 1990)
195
                if split_at < 500:
196
                    split_at = 1990
197
                chunks.append(full[:split_at])
198
                full = full[split_at:].lstrip("\n")
199
            chunks.append(full)
200
            for i, chunk in enumerate(chunks):
201
                if i == 0:
202
                    await message.reply(chunk)
203
                else:
204
                    await message.channel.send(chunk)
205
        except Exception as e:
206
            await message.reply(f"[WARN] {str(e)}")
207
 
208
def run_discord_bot():
209
    asyncio.run(bot.start(DISCORD_TOKEN))
210
 
211
threading.Thread(target=run_discord_bot, daemon=True).start()
212
 
213
 
214
def save_to_db(agent, command, result, cost=0.0, model_used=""):
215
    try:
216
        conn = psycopg2.connect(**DB_CONFIG)
217
        cur = conn.cursor()
218
        cur.execute("INSERT INTO metadata (agent, timestamp, command, result, cost_usd) VALUES (%s, %s, %s, %s, %s)",
219
            (agent, datetime.utcnow().isoformat(), command, json.dumps(result), cost))
220
        conn.commit()
221
        cur.close()
222
        conn.close()
223
    except Exception as e:
224
        print(f"DB: {e}")
225
 
226
def get_cost_summary():
227
    try:
228
        conn = psycopg2.connect(**DB_CONFIG)
229
        cur = conn.cursor()
230
        today = datetime.utcnow().strftime('%Y-%m-%d')
231
        month_start = datetime.utcnow().strftime('%Y-%m-01')
232
        cur.execute("SELECT COALESCE(SUM(cost_usd),0) FROM metadata WHERE timestamp::text >= %s", (today,))
233
        td = float(cur.fetchone()[0])
234
        cur.execute("SELECT COALESCE(SUM(cost_usd),0) FROM metadata WHERE timestamp::text >= %s", (month_start,))
235
        mo = float(cur.fetchone()[0])
236
        cur.execute("SELECT COALESCE(SUM(cost_usd),0) FROM metadata")
237
        tot = float(cur.fetchone()[0])
238
        cur.execute("SELECT agent, COUNT(*) FROM metadata WHERE timestamp::text >= %s GROUP BY agent ORDER BY COUNT(*) DESC LIMIT 8", (today,))
239
        ac = cur.fetchall()
240
        cur.close()
241
        conn.close()
242
        return {"today": td, "month": mo, "total": tot, "budget_remaining": MONTHLY_BUDGET_CEILING - mo, "agents": ac}
243
    except Exception as e:
244
        return {"error": str(e)}
245
 
246
def get_recent_commands(n=5):
247
    try:
248
        conn = psycopg2.connect(**DB_CONFIG)
249
        cur = conn.cursor()
250
        cur.execute("SELECT agent, command, timestamp FROM metadata ORDER BY timestamp DESC LIMIT %s", (n,))
251
        rows = cur.fetchall()
252
        cur.close()
253
        conn.close()
254
        return rows
255
    except:
256
        return []
257
 
258
 
259
def resolve_host(name):
260
    n = name.lower().strip()
261
    if n in INFRA_MAP:
262
        return n
263
    return HOST_ALIASES.get(n, None)
264
 
265
def is_blocked(cmd):
266
    for b in BLOCKED_COMMANDS:
267
        if b in cmd.lower():
268
            return True
269
    return False
270
 
271
def is_safe(cmd):
272
    first = cmd.lower().split()[0] if cmd.split() else ""
273
    return any(cmd.lower().startswith(s) or first == s for s in AUTO_APPROVE)
274
 
275
def run_ssh(host, cmd, timeout=30):
276
    try:
277
        r = subprocess.run(["ssh", "-o", "ConnectTimeout=10", host, cmd],
278
            capture_output=True, text=True, timeout=timeout)
279
        return {"ok": r.returncode == 0, "out": (r.stdout.strip() or r.stderr.strip() or "(empty)"), "host": host}
280
    except subprocess.TimeoutExpired:
281
        return {"ok": False, "out": f"Timeout ({timeout}s)", "host": host}
282
    except Exception as e:
283
        return {"ok": False, "out": str(e), "host": host}
284
 
285
def run_local(cmd, timeout=30):
286
    try:
287
        r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
288
        return {"ok": r.returncode == 0, "out": (r.stdout.strip() or r.stderr.strip() or "(empty)"), "host": "perseus"}
289
    except subprocess.TimeoutExpired:
290
        return {"ok": False, "out": f"Timeout ({timeout}s)", "host": "perseus"}
291
    except Exception as e:
292
        return {"ok": False, "out": str(e), "host": "perseus"}
293
 
294
 
295
def brave_search(query, count=3):
296
    try:
297
        resp = requests.get("https://api.search.brave.com/res/v1/web/search",
298
            headers={"X-Subscription-Token": BRAVE_API_KEY, "Accept": "application/json"},
299
            params={"q": query, "count": count}, timeout=10)
300
        if resp.status_code == 200:
301
            results = resp.json().get("web", {}).get("results", [])
302
            return [{"title": r.get("title", ""), "url": r.get("url", ""), "desc": r.get("description", "")[:200]} for r in results]
303
        return []
304
    except:
305
        return []
306
 
307
 
308
SYSTEM_PROMPT = """You are Perseus, the command AI for a Proxmox homelab infrastructure. You are direct, efficient, and technically competent.
309
 
310
Infrastructure you control (all have SSH access):
311
- node-1: Proxmox host 1 (primary compute)
312
- node-2: Proxmox host 2 (GPU compute node)
313
- CT 120 / perseus: Your home -- FastAPI, PostgreSQL, Discord bot
314
- CT 200 / vpn: WireGuard VPN
315
- CT 201 / dns: Pi-hole DNS/ad blocking
316
- CT 203 / backup: Proxmox Backup Server
317
- CT 205 / gpu: Ollama + local LLM inference on GPU
318
 
319
When asked to DO something on infrastructure, include commands as:
320
EXECUTE:hostname:command (one per line)
321
 
322
Examples:
323
"check disk on dns" -> EXECUTE:dns:df -h
324
"GPU status" -> EXECUTE:gpu:nvidia-smi
325
"whitelist google.com on pihole" -> EXECUTE:dns:pihole -w google.com
326
 
327
When asked about weather, news, or current events, you have web search. Include:
328
SEARCH:query
329
and the results will be appended.
330
 
331
For general questions, just answer. Keep responses concise."""
332
 
333
def call_grok(prompt, sys=None):
334
    msgs = []
335
    if sys:
336
        msgs.append({"role": "system", "content": sys})
337
    msgs.append({"role": "user", "content": prompt})
338
    try:
339
        r = requests.post("https://api.x.ai/v1/chat/completions",
340
            headers={"Authorization": f"Bearer {XAI_API_KEY}", "Content-Type": "application/json"},
341
            json={"model": "grok-4-1-fast-reasoning", "messages": msgs, "temperature": 0.7, "max_tokens": 1500}, timeout=60)
342
        if r.status_code == 200:
343
            d = r.json()
344
            txt = d["choices"][0]["message"]["content"].strip()
345
            u = d.get("usage", {})
346
            cost = (u.get("prompt_tokens", 0) / 1e6 * 2.0) + (u.get("completion_tokens", 0) / 1e6 * 10.0)
347
            return {"ok": True, "result": txt, "cost": cost, "model": "grok-4-1-fast-reasoning"}
348
        return {"ok": False, "result": f"Grok HTTP {r.status_code}", "cost": 0, "model": "grok"}
349
    except Exception as e:
350
        return {"ok": False, "result": f"Grok: {e}", "cost": 0, "model": "grok"}
351
 
352
def call_gpt(prompt, sys=None):
353
    msgs = []
354
    if sys:
355
        msgs.append({"role": "system", "content": sys})
356
    msgs.append({"role": "user", "content": prompt})
357
    try:
358
        r = requests.post("https://api.openai.com/v1/chat/completions",
359
            headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
360
            json={"model": "gpt-4o-mini", "messages": msgs, "temperature": 0.7, "max_tokens": 1500}, timeout=60)
361
        if r.status_code == 200:
362
            d = r.json()
363
            txt = d["choices"][0]["message"]["content"].strip()
364
            u = d.get("usage", {})
365
            cost = (u.get("prompt_tokens", 0) / 1e6 * 0.15) + (u.get("completion_tokens", 0) / 1e6 * 0.60)
366
            return {"ok": True, "result": txt, "cost": cost, "model": "gpt-4o-mini"}
367
        return {"ok": False, "result": f"GPT HTTP {r.status_code}", "cost": 0, "model": "gpt"}
368
    except Exception as e:
369
        return {"ok": False, "result": f"GPT: {e}", "cost": 0, "model": "gpt"}
370
 
371
def call_ollama(prompt):
372
    try:
373
        r = requests.post(f"{OLLAMA_URL}/api/chat",
374
            json={"model": OLLAMA_MODEL, "messages": [{"role": "user", "content": prompt}],
375
                  "stream": False, "options": {"temperature": 0.7}}, timeout=90)
376
        return {"ok": True, "result": r.json().get("message", {}).get("content", "").strip(), "cost": 0, "model": OLLAMA_MODEL}
377
    except Exception as e:
378
        return {"ok": False, "result": f"Ollama: {e}", "cost": 0, "model": OLLAMA_MODEL}
379
 
380
def call_brain(prompt, sys=None):
381
    if ACTIVE_BRAIN == "grok":
382
        return call_grok(prompt, sys)
383
    elif ACTIVE_BRAIN == "gpt":
384
        return call_gpt(prompt, sys)
385
    else:
386
        full = f"{sys}\n\nUser: {prompt}" if sys else prompt
387
        return call_ollama(full)
388
 
389
def process_ai_response(text):
390
    """Handle EXECUTE: and SEARCH: directives in AI responses."""
391
    lines = text.split("\n")
392
    out_lines = []
393
    exec_results = []
394
 
395
    for line in lines:
396
        s = line.strip()
397
        if s.startswith("EXECUTE:"):
398
            parts = s.replace("EXECUTE:", "").split(":", 1)
399
            if len(parts) == 2:
400
                host = resolve_host(parts[0].strip())
401
                cmd = parts[1].strip()
402
                if not host:
403
                    exec_results.append(f"[FAIL] Unknown host: `{parts[0]}`")
404
                elif is_blocked(cmd):
405
                    exec_results.append(f"[BLOCKED] `{cmd}`")
406
                elif is_safe(cmd):
407
                    r = run_local(cmd) if host == "perseus" else run_ssh(host, cmd)
408
                    st = "[OK]" if r["ok"] else "[FAIL]"
409
                    exec_results.append(f"{st} **{r['host']}** `{cmd}`\n```\n{r['out'][:800]}\n```")
410
                else:
411
                    exec_results.append(f"[PENDING] **Needs approval:** `{cmd}` on `{host}`\nRun: `!exec {host} {cmd}`")
412
            else:
413
                out_lines.append(line)
414
        elif s.startswith("SEARCH:"):
415
            query = s.replace("SEARCH:", "").strip()
416
            results = brave_search(query)
417
            if results:
418
                search_text = "\n".join(f"- **{r['title']}**: {r['desc']} ([link]({r['url']}))" for r in results)
419
                exec_results.append(f"**Search: {query}**\n{search_text}")
420
            else:
421
                exec_results.append(f"No results for: {query}")
422
        else:
423
            out_lines.append(line)
424
 
425
    final = "\n".join(out_lines).strip()
426
    if exec_results:
427
        final += "\n\n" + "\n".join(exec_results)
428
    return final
429
 
430
 
431
def x_sign(method, url, params, cs, ts):
432
    sp = "&".join(f"{urllib.parse.quote(k,safe='')}={urllib.parse.quote(v,safe='')}" for k, v in sorted(params.items()))
433
    bs = f"{method}&{urllib.parse.quote(url,safe='')}&{urllib.parse.quote(sp,safe='')}"
434
    sk = f"{urllib.parse.quote(cs,safe='')}&{urllib.parse.quote(ts,safe='')}"
435
    return base64.b64encode(hmac.new(sk.encode(), bs.encode(), hashlib.sha1).digest()).decode()
436
 
437
def post_tweet(text):
438
    url = "https://api.x.com/2/tweets"
439
    op = {"oauth_consumer_key": X_CONSUMER_KEY, "oauth_token": X_ACCESS_TOKEN,
440
          "oauth_signature_method": "HMAC-SHA1", "oauth_timestamp": str(int(time.time())),
441
          "oauth_nonce": uuid.uuid4().hex, "oauth_version": "1.0"}
442
    op["oauth_signature"] = x_sign("POST", url, op, X_CONSUMER_SECRET, X_ACCESS_TOKEN_SECRET)
443
    ah = "OAuth " + ", ".join(f'{urllib.parse.quote(k,safe="")}="{urllib.parse.quote(v,safe="")}"' for k, v in sorted(op.items()))
444
    try:
445
        r = requests.post(url, headers={"Authorization": ah, "Content-Type": "application/json"},
446
            json={"text": text}, timeout=30)
447
        if r.status_code in [200, 201]:
448
            tid = r.json().get("data", {}).get("id", "?")
449
            return {"ok": True, "url": f"https://x.com/status/{tid}"}
450
        return {"ok": False, "error": f"HTTP {r.status_code}: {r.text[:300]}"}
451
    except Exception as e:
452
        return {"ok": False, "error": str(e)}
453
 
454
 
455
def analytics_get(endpoint, extra_params=None):
456
    """Generic GET for analytics/ad platform API."""
457
    params = {"key": ANALYTICS_API_KEY}
458
    if extra_params:
459
        params.update(extra_params)
460
    try:
461
        r = requests.get(f"{ANALYTICS_BASE}{endpoint}", params=params, timeout=15)
462
        if r.status_code == 200:
463
            data = r.json()
464
            if data.get("status") == "success":
465
                return {"ok": True, "result": data.get("result", {})}
466
            return {"ok": False, "error": f"API error: {data}"}
467
        return {"ok": False, "error": f"HTTP {r.status_code}"}
468
    except Exception as e:
469
        return {"ok": False, "error": str(e)}
470
 
471
def analytics_balance():
472
    return analytics_get("/publisher/balance")
473
 
474
def analytics_stats(date=None, date2=None, group="date"):
475
    params = {"group": group}
476
    if date:
477
        params["date"] = date
478
    if date2:
479
        params["date2"] = date2
480
    return analytics_get("/publisher/listStats", params)
481
 
482
def analytics_full_report():
483
    """Pull balance + yesterday stats + 7-day stats."""
484
    today = datetime.utcnow().strftime('%Y-%m-%d')
485
    yesterday = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d')
486
    week_ago = (datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%d')
487
    balance = analytics_balance()
488
    daily = analytics_stats(date=yesterday)
489
    weekly = analytics_stats(date=week_ago, date2=yesterday, group="date")
490
    return {
491
        "balance": balance.get("result", {}) if balance.get("ok") else {"error": balance.get("error")},
492
        "yesterday": daily.get("result", {}) if daily.get("ok") else {"error": daily.get("error")},
493
        "weekly": weekly.get("result", {}) if weekly.get("ok") else {"error": weekly.get("error")},
494
    }
495
 
496
def format_analytics_report(report):
497
    """Format analytics data for Discord display."""
498
    bal = report.get("balance", {})
499
    bal_str = f"${bal.get('balance', 'N/A')} {bal.get('currency', '')}" if "error" not in bal else f"[FAIL] {bal['error']}"
500
    yest = report.get("yesterday", {})
501
    y_revenue = y_impressions = y_clicks = y_cpm = "N/A"
502
    if "error" not in yest:
503
        try:
504
            total_rev = 0.0
505
            total_imp = 0
506
            total_clicks = 0
507
            found = False
508
            def walk_stats(obj):
509
                nonlocal total_rev, total_imp, total_clicks, found
510
                if isinstance(obj, dict):
511
                    if "revenue" in obj:
512
                        found = True
513
                        total_rev += float(obj.get("revenue", 0))
514
                        total_imp += int(obj.get("impressions", 0))
515
                        total_clicks += int(obj.get("clicks", 0))
516
                    else:
517
                        for v in obj.values():
518
                            walk_stats(v)
519
                elif isinstance(obj, list):
520
                    for item in obj:
521
                        walk_stats(item)
522
            walk_stats(yest)
523
            if found:
524
                y_revenue = f"${total_rev:.4f}"
525
                y_impressions = f"{total_imp:,}"
526
                y_clicks = f"{total_clicks:,}"
527
                y_cpm = f"${(total_rev / total_imp * 1000):.4f}" if total_imp > 0 else "N/A"
528
        except:
529
            pass
530
    week = report.get("weekly", {})
531
    w_revenue = "N/A"
532
    w_days = []
533
    if "error" not in week:
534
        try:
535
            total_rev_w = 0.0
536
            def walk_weekly(obj, date_key=None):
537
                nonlocal total_rev_w, w_days
538
                if isinstance(obj, dict):
539
                    if "revenue" in obj:
540
                        rev = float(obj.get("revenue", 0))
541
                        total_rev_w += rev
542
                        if date_key:
543
                            w_days.append((date_key, rev))
544
                    else:
545
                        for k, v in obj.items():
546
                            walk_weekly(v, date_key=k if len(k) == 10 and "-" in k else date_key)
547
                elif isinstance(obj, list):
548
                    for item in obj:
549
                        walk_weekly(item, date_key)
550
            walk_weekly(week)
551
            w_revenue = f"${total_rev_w:.4f}"
552
        except:
553
            pass
554
    txt = (
555
        f"**Revenue Report**\n--------------------\n"
556
        f"**Balance:** {bal_str}\n--------------------\n"
557
        f"**Yesterday:**\n"
558
        f"  Revenue: {y_revenue}\n"
559
        f"  Impressions: {y_impressions}\n"
560
        f"  Clicks: {y_clicks}\n"
561
        f"  CPM: {y_cpm}\n--------------------\n"
562
        f"**Last 7 Days:** {w_revenue} total\n"
563
    )
564
    if w_days:
565
        w_days.sort()
566
        for d, r in w_days:
567
            txt += f"  {d}: ${r:.4f}\n"
568
    return txt
569
 
570
 
571
BACKUP_TARGETS = {
572
    "perseus_ct": {
573
        "type": "vzdump",
574
        "ct": 120,
575
        "node": "node-1",
576
        "desc": "Perseus container (FastAPI, bot, configs)",
577
        "storage": "pbs-main",
578
        "keep": 7,
579
    },
580
    "postgres": {
581
        "type": "pg_dump",
582
        "host": "perseus",
583
        "db": "perseus",
584
        "dest": os.path.join(os.path.dirname(os.path.abspath(__file__)), "backups"),
585
        "keep": 7,
586
    },
587
}
588
 
589
def run_backup(target_name):
590
    """Execute a specific backup target. Returns status dict."""
591
    target = BACKUP_TARGETS.get(target_name)
592
    if not target:
593
        return {"ok": False, "error": f"Unknown target: {target_name}"}
594
    ts = datetime.utcnow().strftime('%Y-%m-%d_%H%M')
595
    if target["type"] == "pg_dump":
596
        dest = target["dest"]
597
        cmd = f"mkdir -p {dest} && pg_dump {target['db']} > {dest}/perseus_{ts}.sql && ls -la {dest}/"
598
        r = run_local(cmd)
599
        if r["ok"]:
600
            run_local(f"cd {dest} && ls -t *.sql | tail -n +{target['keep'] + 1} | xargs -r rm")
601
        return {"ok": r["ok"], "target": target_name, "type": "pg_dump", "output": r["out"][:500]}
602
    elif target["type"] == "vzdump":
603
        ct = target["ct"]
604
        cmd = f"vzdump {ct} --storage {target['storage']} --compress zstd --mode snapshot --quiet 1"
605
        r = run_ssh(target["node"], cmd, timeout=300)
606
        return {"ok": r["ok"], "target": target_name, "type": "vzdump", "output": r["out"][:500]}
607
    return {"ok": False, "error": "Unknown backup type"}
608
 
609
def run_all_backups():
610
    results = []
611
    for name in BACKUP_TARGETS:
612
        r = run_backup(name)
613
        results.append(r)
614
    return results
615
 
616
def format_backup_report(results):
617
    txt = "**Backup Report**\n--------------------\n"
618
    for r in results:
619
        st = "[OK]" if r.get("ok") else "[FAIL]"
620
        txt += f"{st} **{r.get('target', '?')}** ({r.get('type', '?')})\n"
621
        if not r.get("ok"):
622
            txt += f"  Error: {r.get('error', r.get('output', 'unknown'))[:200]}\n"
623
    txt += f"--------------------\n**Targets:** {len(BACKUP_TARGETS)} | **Time:** {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}"
624
    return txt
625
 
626
 
627
INFRA_THRESHOLDS = {
628
    "cpu_pct": 90,
629
    "ram_pct": 90,
630
    "disk_pct": 85,
631
    "gpu_temp_c": 80,
632
}
633
 
634
def check_host_health(host_name):
635
    """Check CPU, RAM, disk on a host. Returns health dict."""
636
    health = {"host": host_name, "ok": True, "alerts": [], "metrics": {}}
637
    if host_name == "perseus":
638
        runner = run_local
639
    else:
640
        runner = lambda cmd: run_ssh(host_name, cmd)
641
    r = runner("nproc && cat /proc/loadavg")
642
    if r["ok"]:
643
        try:
644
            lines = r["out"].split("\n")
645
            cores = int(lines[0].strip())
646
            load1 = float(lines[1].split()[0])
647
            cpu_pct = round((load1 / cores) * 100, 1)
648
            health["metrics"]["cpu_pct"] = cpu_pct
649
            health["metrics"]["load"] = load1
650
            health["metrics"]["cores"] = cores
651
            if cpu_pct > INFRA_THRESHOLDS["cpu_pct"]:
652
                health["alerts"].append(f"[CRIT] CPU {cpu_pct}% (load {load1}/{cores} cores)")
653
                health["ok"] = False
654
        except:
655
            pass
656
    r = runner("free -m | grep Mem")
657
    if r["ok"]:
658
        try:
659
            parts = r["out"].split()
660
            total = int(parts[1])
661
            used = int(parts[2])
662
            ram_pct = round((used / total) * 100, 1)
663
            health["metrics"]["ram_total_mb"] = total
664
            health["metrics"]["ram_used_mb"] = used
665
            health["metrics"]["ram_pct"] = ram_pct
666
            if ram_pct > INFRA_THRESHOLDS["ram_pct"]:
667
                health["alerts"].append(f"[CRIT] RAM {ram_pct}% ({used}/{total} MB)")
668
                health["ok"] = False
669
        except:
670
            pass
671
    r = runner("df -h / | tail -1")
672
    if r["ok"]:
673
        try:
674
            parts = r["out"].split()
675
            disk_pct = int(parts[4].replace("%", ""))
676
            health["metrics"]["disk_pct"] = disk_pct
677
            health["metrics"]["disk_size"] = parts[1]
678
            health["metrics"]["disk_used"] = parts[2]
679
            health["metrics"]["disk_avail"] = parts[3]
680
            if disk_pct > INFRA_THRESHOLDS["disk_pct"]:
681
                health["alerts"].append(f"[CRIT] Disk {disk_pct}% ({parts[2]}/{parts[1]})")
682
                health["ok"] = False
683
        except:
684
            pass
685
    return health
686
 
687
def check_gpu_health():
688
    health = {"host": "gpu", "ok": True, "alerts": [], "metrics": {}}
689
    r = run_ssh("gpu", "nvidia-smi --query-gpu=utilization.gpu,memory.used,memory.total,temperature.gpu --format=csv,noheader,nounits")
690
    if r["ok"]:
691
        try:
692
            parts = [x.strip() for x in r["out"].split(",")]
693
            gpu_util = int(parts[0])
694
            mem_used = int(parts[1])
695
            mem_total = int(parts[2])
696
            temp = int(parts[3])
697
            health["metrics"]["gpu_util_pct"] = gpu_util
698
            health["metrics"]["vram_used_mb"] = mem_used
699
            health["metrics"]["vram_total_mb"] = mem_total
700
            health["metrics"]["gpu_temp_c"] = temp
701
            if temp > INFRA_THRESHOLDS["gpu_temp_c"]:
702
                health["alerts"].append(f"[CRIT] GPU Temp {temp}C")
703
                health["ok"] = False
704
        except:
705
            health["alerts"].append(f"[WARN] GPU parse error: {r['out'][:100]}")
706
    else:
707
        health["ok"] = False
708
        health["alerts"].append(f"[FAIL] Can't reach GPU: {r['out'][:100]}")
709
    return health
710
 
711
def check_ollama_health():
712
    health = {"host": "ollama", "ok": True, "alerts": [], "metrics": {}}
713
    try:
714
        r = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5)
715
        models = [m["name"] for m in r.json().get("models", [])]
716
        health["metrics"]["available_models"] = models
717
        health["metrics"]["model_count"] = len(models)
718
    except:
719
        health["ok"] = False
720
        health["alerts"].append("[FAIL] Ollama unreachable")
721
    try:
722
        r = requests.get(f"{OLLAMA_URL}/api/ps", timeout=5)
723
        running = [m["name"] for m in r.json().get("models", [])]
724
        health["metrics"]["loaded_models"] = running
725
    except:
726
        pass
727
    return health
728
 
729
def full_infra_check():
730
    results = []
731
    for host in ["node-1", "node-2"]:
732
        results.append(check_host_health(host))
733
    for ct in ["perseus", "dns", "gpu"]:
734
        results.append(check_host_health(ct))
735
    results.append(check_gpu_health())
736
    results.append(check_ollama_health())
737
    return results
738
 
739
def format_infra_health(results):
740
    all_ok = all(r.get("ok", False) for r in results)
741
    overall = "[OK] All Clear" if all_ok else "[WARN] Issues Detected"
742
    txt = f"**Infrastructure Health** -- {overall}\n--------------------\n"
743
    for r in results:
744
        host = r["host"]
745
        st = "[OK]" if r["ok"] else "[CRIT]"
746
        m = r.get("metrics", {})
747
        txt += f"\n{st} **{host}**\n"
748
        if "cpu_pct" in m:
749
            txt += f"  CPU: {m['cpu_pct']}% (load {m.get('load','?')}/{m.get('cores','?')} cores)\n"
750
        if "ram_pct" in m:
751
            txt += f"  RAM: {m['ram_pct']}% ({m.get('ram_used_mb','?')}/{m.get('ram_total_mb','?')} MB)\n"
752
        if "disk_pct" in m:
753
            txt += f"  Disk: {m['disk_pct']}% ({m.get('disk_used','?')}/{m.get('disk_size','?')})\n"
754
        if "gpu_util_pct" in m:
755
            txt += f"  GPU: {m['gpu_util_pct']}% | VRAM: {m.get('vram_used_mb','?')}/{m.get('vram_total_mb','?')} MB | Temp: {m.get('gpu_temp_c','?')}C\n"
756
        if "available_models" in m:
757
            txt += f"  Models: {', '.join(m['available_models'])}\n"
758
            loaded = m.get("loaded_models", [])
759
            if loaded:
760
                txt += f"  Loaded: {', '.join(loaded)}\n"
761
        for alert in r.get("alerts", []):
762
            txt += f"  {alert}\n"
763
    txt += f"\n--------------------\n**Checked:** {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}"
764
    return txt
765
 
766
 
767
def build_prompt(agent, state):
768
    c = state['command']
769
    p = state.get('previous_result', {}).get('result', '')
770
    if any(kw in str(p).lower() for kw in ["balance", "revenue", "cpm", "impressions"]):
771
        p = ''
772
    if agent == "social_publisher":
773
        return f"""You write engaging social media posts for a content platform. Generate professional, high-engagement posts.
774
 
775
Topic requested: {c}
776
 
777
Rules:
778
- Each post MUST be under 280 characters
779
- Use hooks, trending hashtags, and strong calls to action
780
- Write 5 posts in professional tone
781
- Output ONLY JSON: [{{"post":"text","hashtags":["#tag"]}}]"""
782
    elif agent == "compliance_scanner":
783
        return f"""Content compliance scan. Analyze: '{c}' | {p}. Check for policy violations, copyright issues, and content quality. Output ONLY JSON: {{"risk_level":"low/medium/high","reason":"brief","confidence":"0-100%","action_recommended":"flag/monitor/approve"}}"""
784
    elif agent == "content_curator":
785
        return f"""Content curator. Analyze: '{c}' | {p}. Filter quality, remove duplicates, rank by engagement potential. Output JSON array."""
786
    elif agent == "content_rewriter":
787
        return f"""Rewrite content metadata for SEO: '{c}' | {p}. Generate 3 SEO-friendly variants. Output JSON array."""
788
    else:
789
        topic = c
790
        for prefix in ["find trends on", "find trends in", "find trends for", "find trends about",
791
                        "find trends", "trending niches for", "trending niches in", "trending niches",
792
                        "trend hunt", "viral trends in", "viral trends on", "viral trends",
793
                        "what's trending in", "what's trending on", "what's trending", "whats trending"]:
794
            if topic.lower().startswith(prefix):
795
                topic = topic[len(prefix):].strip() or "general"
796
                break
797
        return f"""You are a trend analyst for digital content. Analyze current viral trends related to: {topic}
798
 
799
Find 3-5 trending niches/topics. For each, explain WHY it's trending, name relevant creators if applicable, and estimate engagement potential.
800
 
801
Output ONLY JSON: [{{"niche":"","reason":"","creators":[],"engagement_potential":"high/medium/low"}}]"""
802
 
803
 
804
@app.post("/chat")
805
async def chat(command: str = Body(...), previous_result: dict = Body(default=None)):
806
    state = {"command": command, "previous_result": previous_result or {}}
807
    if state["previous_result"]:
808
        state["command"] += " | previous data: " + json.dumps(state["previous_result"])
809
 
810
    lc = command.lower().strip()
811
    clean = lc.rstrip("!?.")
812
 
813
    greetings = {"hey","hi","hello","sup","yo","what's up","whats up","hey perseus","hi perseus",
814
                 "hello perseus","good morning","good night","gm","gn","wassup","what up","yo perseus"}
815
    if clean in greetings:
816
        return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
817
            "result": f"Perseus online. Brain: **{ACTIVE_BRAIN}**\n`help` for commands",
818
            "requires_approval": False})
819
 
820
    if clean in {"help","commands","what can you do","?"}:
821
        return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
822
            "result": "**Perseus Commands**\n--------------------\n"
823
                "**Talk naturally** -- anything goes to your active brain\n--------------------\n"
824
                "**System:** `status` . `costs` . `infra` . `queue`\n"
825
                "**Revenue:** `revenue` . `balance` . `ad stats`\n"
826
                "**Content:** `post ideas for [x]` . `find trends` . `compliance scan [x]` . `rewrite [x]` . `curate [x]`\n"
827
                "**Backup:** `backup` (run now) . `backup status` (check last)\n"
828
                "**Health:** `health check` . `infra health` . `gpu health` . `disk usage`\n"
829
                "**Search:** `search [query]` or just ask about weather/news\n--------------------\n"
830
                "**! Commands:**\n"
831
                "`!model` -- show/switch brain (grok . gpt . ollama)\n"
832
                "`!exec [host] [cmd]` -- run command on host\n"
833
                "`!backup [run|status]` -- manage backups\n"
834
                "`!health` -- full infra health check\n"
835
                "`!post [#]` . `!postall` . `!clearqueue` -- post queue",
836
            "requires_approval": False})
837
 
838
    if clean in {"status","health","system status"}:
839
        up = datetime.utcnow() - STARTUP_TIME
840
        h, rem = divmod(int(up.total_seconds()), 3600)
841
        m, s = divmod(rem, 60)
842
        try:
843
            oc = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5)
844
            o_st = "[OK]"
845
            o_models = ", ".join(x["name"] for x in oc.json().get("models", []))
846
        except:
847
            o_st = "[FAIL]"; o_models = "N/A"
848
        recent = get_recent_commands(5)
849
        rt = ""
850
        for r in recent:
851
            a, c, t = r
852
            rt += f"  `{str(t)[:16]}` [{a}] {str(c)[:40]}\n"
853
        rt = rt or "  None\n"
854
        costs = get_cost_summary()
855
        return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
856
            "result": f"**Perseus Status**\n--------------------\n"
857
                f"**Uptime:** {h}h {m}m {s}s | **Brain:** `{ACTIVE_BRAIN}`\n"
858
                f"**FastAPI:** [OK] :3002 | **Discord:** [OK] {bot.user}\n--------------------\n"
859
                f"**Models:**\n  Grok 4.1: ($2.00/$10.00 per 1M tok)\n  GPT-4o-mini: ($0.15/$0.60 per 1M tok)\n  Ollama: {o_st} ({OLLAMA_MODEL}) -- FREE\n  Available: {o_models}\n"
860
                f"--------------------\n"
861
                f"**API Spend:** ${costs.get('today',0):.4f} today . ${costs.get('month',0):.4f}/mo . ${costs.get('budget_remaining',10):.2f} left\n"
862
                f"**Agents:** {len(AGENT_REGISTRY)} | **Queue:** {len(PENDING_POSTS)} posts\n--------------------\n**Recent:**\n{rt}",
863
            "requires_approval": False})
864
 
865
    if clean in {"costs","cost","spend","budget","spending"}:
866
        costs = get_cost_summary()
867
        if "error" in costs:
868
            return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
869
                "result": f"[WARN] {costs['error']}", "requires_approval": False})
870
        bf = min(int((costs['month'] / MONTHLY_BUDGET_CEILING) * 20), 20)
871
        bar = "#" * bf + "-" * (20 - bf)
872
        pct = (costs['month'] / MONTHLY_BUDGET_CEILING) * 100
873
        ab = ""
874
        for a, c in costs.get('agents', []):
875
            ab += f"  {a}: {c} calls\n"
876
        ab = ab or "  None today\n"
877
        msg = (f"**Cost Tracker**\n--------------------\n"
878
            f"**Today:** ${costs['today']:.4f} | **Month:** ${costs['month']:.4f} | **All Time:** ${costs['total']:.4f}\n"
879
            f"**Budget:** [{bar}] {pct:.1f}% -- ${costs['budget_remaining']:.2f} left\n--------------------\n"
880
            f"**API Pricing:**\n"
881
            f"  Grok 4.1 Fast: $2.00 in / $10.00 out per 1M tokens\n"
882
            f"  GPT-4o-mini: $0.15 in / $0.60 out per 1M tokens\n"
883
            f"  Ollama (local): FREE\n--------------------\n"
884
            f"**Today's Usage:**\n{ab}")
885
        if costs['budget_remaining'] < 2.0:
886
            msg += "\n[WARN] **Under $2 left!**"
887
        return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
888
            "result": msg, "requires_approval": False})
889
 
890
    if clean in {"infra","infrastructure","hosts","nodes","map"}:
891
        txt = "**Infrastructure**\n--------------------\n"
892
        for n, i in INFRA_MAP.items():
893
            ct = f" (CT {i['ct']})" if 'ct' in i else ""
894
            txt += f"**{n}**{ct} . `{i['ip']}` . {i['desc']}\n"
895
        return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
896
            "result": txt, "requires_approval": False})
897
 
898
    if clean in {"queue","post queue","pending","pending posts"}:
899
        if not PENDING_POSTS:
900
            msg = "Queue empty. Try `post ideas for [topic]`"
901
        else:
902
            msg = "**Post Queue:**\n--------------------\n"
903
            for i, tw in enumerate(PENDING_POSTS, 1):
904
                msg += f"**{i}.** {tw['text'][:200]}\n"
905
            msg += "\n`!post [#]` . `!postall` . `!clearqueue`"
906
        return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
907
            "result": msg, "requires_approval": False})
908
 
909
    if lc.startswith("search "):
910
        query = command[7:].strip()
911
        results = brave_search(query, 5)
912
        if results:
913
            txt = f"**Search: {query}**\n--------------------\n"
914
            for r in results:
915
                txt += f"- **{r['title']}**\n  {r['desc']}\n  {r['url']}\n\n"
916
        else:
917
            txt = f"No results for: {query}"
918
        return JSONResponse({"agent": "perseus", "timestamp": datetime.utcnow().isoformat(),
919
            "result": txt, "requires_approval": False})
920
 
921
    if lc.startswith("ssh "):
922
        parts = command.strip().split(None, 2)
923
        if len(parts) >= 3:
924
            host = resolve_host(parts[1])
925
            cmd = parts[2]
926
            if not host:
927
                return JSONResponse({"agent": "command_executor", "timestamp": datetime.utcnow().isoformat(),
928
                    "result": f"[FAIL] Unknown host: `{parts[1]}`. Type `infra`.", "requires_approval": False})
929
            if is_blocked(cmd):
930
                return JSONResponse({"agent": "command_executor", "timestamp": datetime.utcnow().isoformat(),
931
                    "result": "[BLOCKED]", "requires_approval": False})
932
            r = run_local(cmd) if host == "perseus" else run_ssh(host, cmd)
933
            st = "[OK]" if r["ok"] else "[FAIL]"
934
            save_to_db("command_executor", f"ssh {host} {cmd}", r["out"])
935
            return JSONResponse({"agent": "command_executor", "timestamp": datetime.utcnow().isoformat(),
936
                "result": f"{st} **{r['host']}** `{cmd}`\n```\n{r['out'][:1500]}\n```", "requires_approval": False})
937
 
938
    revenue_triggers = ["revenue", "balance", "ad stats", "analytics", "ad revenue",
939
                        "earnings", "ad earnings", "publisher stats",
940
                        "how much did we make", "how much money", "cpm", "ad performance"]
941
    if any(t in lc for t in revenue_triggers):
942
        report = analytics_full_report()
943
        raw_display = format_analytics_report(report)
944
        grok_prompt = f"""You are Perseus, a revenue analyst. Here is the latest analytics data:
945
{json.dumps(report, indent=2, default=str)}
946
 
947
Provide a brief, actionable analysis:
948
1. How is revenue trending? (up/down/flat vs prior days)
949
2. What's the CPM looking like?
950
3. Any red flags (zero impressions, drops, zones not performing)?
951
4. One concrete recommendation to increase revenue.
952
 
953
Keep it under 200 words. Be direct -- facts and actions, not fluff."""
954
        brain = call_grok(grok_prompt)
955
        analysis = brain.get("result", "Analysis unavailable")
956
        cost = brain.get("cost", 0)
957
        full_result = raw_display + f"\n--------------------\n**Analysis:**\n{analysis}"
958
        save_to_db("revenue_tracker", command, full_result, cost)
959
        return JSONResponse({"agent": "revenue_tracker", "timestamp": datetime.utcnow().isoformat(),
960
            "result": full_result, "requires_approval": False, "model": "grok + analytics-api"})
961
 
962
    backup_triggers = ["backup", "run backup", "backup now", "backup status", "backups",
963
                       "last backup", "check backups", "backup report"]
964
    if any(t in lc for t in backup_triggers):
965
        if any(t in lc for t in ["status", "last", "check"]):
966
            result = "**Backup Status**\n--------------------\nUse `!backup status` for detailed check."
967
            save_to_db("backup_manager", command, result)
968
            return JSONResponse({"agent": "backup_manager", "timestamp": datetime.utcnow().isoformat(),
969
                "result": result, "requires_approval": False, "model": "local"})
970
        else:
971
            results = run_all_backups()
972
            result = format_backup_report(results)
973
            failed = [r for r in results if not r.get("ok")]
974
            if failed:
975
                send_alert_sync(f"[WARN] **Backup failures:** {len(failed)}/{len(results)}\n{result}")
976
            save_to_db("backup_manager", command, result)
977
            return JSONResponse({"agent": "backup_manager", "timestamp": datetime.utcnow().isoformat(),
978
                "result": result, "requires_approval": False, "model": "local"})
979
 
980
    infra_health_triggers = ["health check", "infra health", "system health", "check health",
981
                             "monitor", "infra check", "are hosts ok", "host health",
982
                             "check all hosts", "diagnostics", "gpu health", "ram usage",
983
                             "disk usage", "cpu usage"]
984
    if any(t in lc for t in infra_health_triggers):
985
        results = full_infra_check()
986
        result = format_infra_health(results)
987
        alerts = []
988
        for r in results:
989
            alerts.extend(r.get("alerts", []))
990
        if alerts:
991
            send_alert_sync(f"[WARN] **Infra Alerts:**\n" + "\n".join(alerts))
992
        save_to_db("infra_monitor", command, result)
993
        return JSONResponse({"agent": "infra_monitor", "timestamp": datetime.utcnow().isoformat(),
994
            "result": result, "requires_approval": False, "model": "local"})
995
 
996
    agent_name = None
997
    if any(t in lc for t in ["post ideas", "post idea", "generate post", "social post"]):
998
        agent_name = "social_publisher"
999
    elif any(t in lc for t in ["find trends", "trending niches", "trend hunt", "viral trends", "what's trending", "whats trending"]):
1000
        agent_name = "trend_hunter"
1001
    elif any(t in lc for t in ["curate content", "filter content", "dedup", "deduplicate"]):
1002
        agent_name = "content_curator"
1003
    elif any(t in lc for t in ["rewrite this", "rephrase this", "rewrite caption", "generate variant"]):
1004
        agent_name = "content_rewriter"
1005
    elif any(t in lc for t in ["compliance scan", "policy scan", "content scan", "legal scan"]):
1006
        agent_name = "compliance_scanner"
1007
 
1008
    if agent_name:
1009
        prompt = build_prompt(agent_name, state)
1010
        local = AGENT_REGISTRY.get(agent_name, {}).get("local", True)
1011
        cost = 0.0
1012
        req_approval = False
1013
        if local:
1014
            try:
1015
                result_data = call_ollama(prompt)
1016
                result = result_data.get("result", "")
1017
                if agent_name == "social_publisher":
1018
                    try:
1019
                        js = result.find("["); je = result.rfind("]") + 1
1020
                        if js >= 0 and je > js:
1021
                            posts = json.loads(result[js:je])
1022
                            for p in posts:
1023
                                t = p.get("post", p.get("tweet", ""))
1024
                                if t:
1025
                                    PENDING_POSTS.append({"text": t, "by": "discord", "ts": datetime.utcnow().isoformat()})
1026
                            result += f"\n\n[OK] **{len(posts)} queued.** `queue` . `!post [#]`"
1027
                    except:
1028
                        pass
1029
            except Exception as e:
1030
                result = f"Ollama: {e}"
1031
        else:
1032
            if agent_name == "compliance_scanner":
1033
                try:
1034
                    gr = requests.post("https://api.openai.com/v1/chat/completions",
1035
                        headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
1036
                        json={"model": "gpt-4o-mini", "messages": [{"role": "user", "content": prompt}],
1037
                            "temperature": 0.5, "max_tokens": 200}, timeout=30)
1038
                    rj = gr.json()
1039
                    result = rj["choices"][0]["message"]["content"].strip()
1040
                    u = rj.get("usage", {})
1041
                    cost = (u.get("prompt_tokens", 0) / 1e6 * 0.15) + (u.get("completion_tokens", 0) / 1e6 * 0.60)
1042
                    if '"risk_level": "high"' in result:
1043
                        req_approval = True
1044
                        send_alert_sync(f"[WARN] **HIGH RISK**\n`{command}`\n```{result}```\n`!approve` / `!deny`")
1045
                        result += " | APPROVAL PENDING"
1046
                except Exception as e:
1047
                    result = f"GPT: {e}"
1048
            else:
1049
                result = f"External ({agent_name}): {command}"
1050
        save_to_db(agent_name, command, result, cost)
1051
        agent_model = OLLAMA_MODEL if local else ("gpt-4o-mini" if agent_name == "compliance_scanner" else "external")
1052
        return JSONResponse({"agent": agent_name, "timestamp": datetime.utcnow().isoformat(),
1053
            "result": result, "requires_approval": req_approval, "model": agent_model})
1054
 
1055
    prev = previous_result or {}
1056
    context_block = ""
1057
    if prev:
1058
        prev_agent = prev.get("agent", "")
1059
        prev_result = str(prev.get("result", ""))[:500]
1060
        context_block = f"\n\n[Previous exchange -- agent: {prev_agent}, response: {prev_result}]\nThe user may be following up on this."
1061
 
1062
    search_triggers = ["weather", "news", "price of", "stock", "score", "who won", "what happened"]
1063
    enhanced_prompt = command
1064
    if any(t in lc for t in search_triggers):
1065
        results = brave_search(command, 3)
1066
        if results:
1067
            context = "\n".join(f"- {r['title']}: {r['desc']}" for r in results)
1068
            enhanced_prompt = f"{command}\n\nRecent search results:\n{context}\n\nUse these to answer accurately."
1069
 
1070
    enhanced_prompt += context_block
1071
    brain = call_brain(enhanced_prompt, SYSTEM_PROMPT)
1072
    result = brain["result"]
1073
    cost = brain.get("cost", 0)
1074
    model_used = brain.get("model", ACTIVE_BRAIN)
1075
 
1076
    if ACTIVE_BRAIN == "ollama":
1077
        for marker in ["\nUser:", "\nuser:", "\nHuman:", "\nhuman:", "\nAssistant:", "\nassistant:"]:
1078
            idx = result.find(marker)
1079
            if idx > 0:
1080
                result = result[:idx].strip()
1081
 
1082
    if "EXECUTE:" in result or "SEARCH:" in result:
1083
        result = process_ai_response(result)
1084
 
1085
    save_to_db("general_assistant", command, result, cost)
1086
    return JSONResponse({"agent": "general_assistant", "timestamp": datetime.utcnow().isoformat(),
1087
        "result": result, "requires_approval": False, "model": model_used})
1088
 
1089
 
1090
@app.post("/agents/{agent_name}")
1091
async def run_agent(agent_name: str, payload: dict = Body(...)):
1092
    save_to_db(agent_name, json.dumps(payload), "executed")
1093
    return JSONResponse({"agent": agent_name, "result": "executed", "timestamp": datetime.utcnow().isoformat()})
1094
 
1095
 
1096
@bot.command(name="approve")
1097
async def approve_cmd(ctx):
1098
    if allowed_ctx(ctx):
1099
        await ctx.send("[OK] Action **approved**.")
1100
 
1101
@bot.command(name="deny")
1102
async def deny_cmd(ctx):
1103
    if allowed_ctx(ctx):
1104
        await ctx.send("[DENY] Action **denied**.")
1105
 
1106
@bot.command(name="model")
1107
async def switch_model(ctx, *, name: str = None):
1108
    global ACTIVE_BRAIN, OLLAMA_MODEL
1109
    if not allowed_ctx(ctx):
1110
        return
1111
    if not name:
1112
        try:
1113
            oc = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5)
1114
            om = ", ".join(f"`{m['name']}`" for m in oc.json().get("models", []))
1115
        except:
1116
            om = "can't reach"
1117
        await ctx.send(f"**Brain:** `{ACTIVE_BRAIN}` | **Ollama:** `{OLLAMA_MODEL}`\n--------------------\n"
1118
            f"Switch: `!model grok` . `!model gpt` . `!model ollama`\n"
1119
            f"Ollama: `!model ollama [name]`\nAvailable: {om}")
1120
        return
1121
    n = name.lower().strip()
1122
    if n == "grok":
1123
        ACTIVE_BRAIN = "grok"
1124
        await ctx.send("Brain -> **Grok 4.1 Fast** ($2/$10 per 1M tok)")
1125
    elif n in {"gpt", "gpt4", "gpt-4o-mini", "openai", "mini"}:
1126
        ACTIVE_BRAIN = "gpt"
1127
        await ctx.send("Brain -> **GPT-4o-mini** ($0.15/$0.60 per 1M tok)")
1128
    elif n in {"ollama", "local"}:
1129
        ACTIVE_BRAIN = "ollama"
1130
        await ctx.send(f"Brain -> **Ollama** (`{OLLAMA_MODEL}`) -- FREE")
1131
    elif n.startswith("ollama "):
1132
        target = n[7:].strip()
1133
        try:
1134
            oc = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5)
1135
            avail = [m["name"] for m in oc.json().get("models", [])]
1136
            matched = next((m for m in avail if target in m.lower()), None)
1137
            if matched:
1138
                old = OLLAMA_MODEL
1139
                OLLAMA_MODEL = matched
1140
                ACTIVE_BRAIN = "ollama"
1141
                await ctx.send(f"`{old}` -> `{matched}` | Brain -> **ollama**")
1142
            else:
1143
                await ctx.send(f"[FAIL] `{target}` not found. Available: {', '.join(f'`{m}`' for m in avail)}")
1144
        except:
1145
            await ctx.send("[FAIL] Can't reach Ollama")
1146
    else:
1147
        await ctx.send("Use: `!model grok` . `!model gpt` . `!model ollama` . `!model ollama [name]`")
1148
 
1149
@bot.command(name="exec")
1150
async def exec_cmd(ctx, host: str = None, *, cmd: str = None):
1151
    if not allowed_ctx(ctx):
1152
        return
1153
    if not host or not cmd:
1154
        await ctx.send("Usage: `!exec [host] [command]`\nHosts: " + ", ".join(f"`{h}`" for h in INFRA_MAP.keys()))
1155
        return
1156
    h = resolve_host(host)
1157
    if not h:
1158
        await ctx.send(f"[FAIL] Unknown host `{host}`. Hosts: " + ", ".join(f"`{k}`" for k in INFRA_MAP.keys()))
1159
        return
1160
    if is_blocked(cmd):
1161
        await ctx.send("[BLOCKED]")
1162
        return
1163
    async with ctx.typing():
1164
        r = run_local(cmd) if h == "perseus" else run_ssh(h, cmd)
1165
        st = "[OK]" if r["ok"] else "[FAIL]"
1166
        save_to_db("command_executor", f"!exec {h} {cmd}", r["out"])
1167
        await ctx.send(f"{st} **{r['host']}** `{cmd}`\n```\n{r['out'][:1500]}\n```")
1168
 
1169
@bot.command(name="post")
1170
async def post_cmd(ctx, number: int = None):
1171
    if not allowed_ctx(ctx):
1172
        return
1173
    if not PENDING_POSTS:
1174
        await ctx.send("Queue empty."); return
1175
    if not number or number < 1 or number > len(PENDING_POSTS):
1176
        await ctx.send(f"Pick 1-{len(PENDING_POSTS)}"); return
1177
    tw = PENDING_POSTS.pop(number - 1)
1178
    await ctx.send(f"Posting #{number}...")
1179
    r = post_tweet(tw["text"])
1180
    if r["ok"]:
1181
        await ctx.send(f"[OK] {r['url']}"); save_to_db("social_publisher", "posted", tw["text"])
1182
    else:
1183
        await ctx.send(f"[FAIL] {r['error']}"); PENDING_POSTS.insert(number - 1, tw)
1184
 
1185
@bot.command(name="postall")
1186
async def postall_cmd(ctx):
1187
    if not allowed_ctx(ctx):
1188
        return
1189
    if not PENDING_POSTS:
1190
        await ctx.send("Queue empty."); return
1191
    n = len(PENDING_POSTS)
1192
    await ctx.send(f"Posting {n}...")
1193
    ok = fail = 0
1194
    while PENDING_POSTS:
1195
        tw = PENDING_POSTS.pop(0)
1196
        r = post_tweet(tw["text"])
1197
        if r["ok"]:
1198
            ok += 1; await ctx.send(f"[OK] [{ok}/{n}] {r['url']}"); save_to_db("social_publisher", "posted", tw["text"])
1199
        else:
1200
            fail += 1; await ctx.send(f"[FAIL] [{ok+fail}/{n}] {r['error']}")
1201
        if PENDING_POSTS:
1202
            await asyncio.sleep(30)
1203
    await ctx.send(f"Done: {ok} posted, {fail} failed.")
1204
 
1205
@bot.command(name="clearqueue")
1206
async def clearq(ctx):
1207
    if not allowed_ctx(ctx):
1208
        return
1209
    n = len(PENDING_POSTS); PENDING_POSTS.clear()
1210
    await ctx.send(f"Cleared {n}.")
1211
 
1212
@bot.command(name="backup")
1213
async def backup_cmd(ctx, action: str = "run"):
1214
    if not allowed_ctx(ctx):
1215
        return
1216
    async with ctx.typing():
1217
        if action.lower() in ["status", "check", "last"]:
1218
            result = "Use `backup status` in chat for detailed check."
1219
        else:
1220
            await ctx.send("Running all backups...")
1221
            results = run_all_backups()
1222
            result = format_backup_report(results)
1223
        if len(result) > 1990:
1224
            result = result[:1990] + "..."
1225
        await ctx.send(result)
1226
 
1227
@bot.command(name="health")
1228
async def health_cmd(ctx):
1229
    if not allowed_ctx(ctx):
1230
        return
1231
    async with ctx.typing():
1232
        results = full_infra_check()
1233
        result = format_infra_health(results)
1234
        chunks = []
1235
        while len(result) > 1990:
1236
            split_at = result.rfind("\n", 0, 1990)
1237
            if split_at < 500:
1238
                split_at = 1990
1239
            chunks.append(result[:split_at])
1240
            result = result[split_at:].lstrip("\n")
1241
        chunks.append(result)
1242
        for chunk in chunks:
1243
            await ctx.send(chunk)
1244
 
1245
 
1246
print(f"Perseus ready | Brain: {ACTIVE_BRAIN} | Ollama: {OLLAMA_MODEL} | Hosts: {len(INFRA_MAP)} | Agents: {len(AGENT_REGISTRY)} | Search: Brave")