Zion Boggan
repos/SOC Automation Lab/integrations/custom-thehive.py
zionboggan.com ↗
64 lines · python
History for this file →
1
import json
2
import sys
3
from pathlib import Path
4
from urllib import request, error
5
 
6
TIMEOUT = 10
7
 
8
 
9
def load_alert(alert_path):
10
    with open(alert_path, "r", encoding="utf-8") as handle:
11
        return json.load(handle)
12
 
13
 
14
def build_payload(alert):
15
    rule = alert.get("rule", {})
16
    agent = alert.get("agent", {})
17
    data = alert.get("data", {})
18
    return {
19
        "source": "wazuh",
20
        "rule_id": rule.get("id"),
21
        "rule_level": rule.get("level"),
22
        "rule_description": rule.get("description"),
23
        "mitre": rule.get("mitre", {}),
24
        "agent_id": agent.get("id"),
25
        "agent_name": agent.get("name"),
26
        "agent_ip": agent.get("ip"),
27
        "src_ip": data.get("srcip"),
28
        "dst_ip": data.get("dstip"),
29
        "full_log": alert.get("full_log"),
30
        "timestamp": alert.get("timestamp"),
31
        "raw": alert,
32
    }
33
 
34
 
35
def post(hook_url, payload):
36
    body = json.dumps(payload).encode("utf-8")
37
    req = request.Request(
38
        hook_url,
39
        data=body,
40
        headers={"Content-Type": "application/json"},
41
        method="POST",
42
    )
43
    with request.urlopen(req, timeout=TIMEOUT) as response:
44
        return response.status
45
 
46
 
47
def main(argv):
48
    if len(argv) < 4:
49
        return 1
50
    alert_path = argv[1]
51
    hook_url = argv[3]
52
    if not Path(alert_path).exists():
53
        return 1
54
    alert = load_alert(alert_path)
55
    payload = build_payload(alert)
56
    try:
57
        status = post(hook_url, payload)
58
    except (error.URLError, error.HTTPError):
59
        return 1
60
    return 0 if status < 400 else 1
61
 
62
 
63
if __name__ == "__main__":
64
    sys.exit(main(sys.argv))