Zion Boggan
repos/Oversight/tests/test_siem_unit.py
zionboggan.com ↗
199 lines · python
History for this file →
1
"""Focused tests for the SIEM export formatters and registry-row mapping."""
2
 
3
import base64
4
import json
5
import os
6
import sqlite3
7
import sys
8
 
9
ROOT = os.path.join(os.path.dirname(__file__), "..")
10
sys.path.insert(0, ROOT)
11
 
12
from oversight_core import siem
13
 
14
 
15
REGISTRY_ID = "deadbeef" * 8
16
 
17
 
18
def _sample_event(**overrides) -> siem.OversightEvent:
19
    base = dict(
20
        event_id="42",
21
        event_kind="dns",
22
        occurred_unix=1_735_000_000,
23
        occurred_at=siem.iso8601(1_735_000_000),
24
        registry_id=REGISTRY_ID,
25
        token_id="tok_abc",
26
        file_id="file_xyz",
27
        recipient_id="rcpt_alice",
28
        issuer_id="issuer_zion",
29
        source_ip="198.51.100.42",
30
        user_agent="Mozilla/5.0",
31
        qualified_timestamp="2024-12-24T01:06:40Z",
32
        tlog_index=7,
33
        extra={"qname": "abc.t.example.com", "qtype": "A"},
34
    )
35
    base.update(overrides)
36
    return siem.OversightEvent(**base)
37
 
38
 
39
def test_splunk_envelope_carries_time_host_event_and_fields():
40
    evt = _sample_event()
41
    out = siem.to_splunk_hec(evt, source="s", sourcetype="st", index="main", host="h")
42
 
43
    assert out["time"] == 1_735_000_000.0
44
    assert out["host"] == "h"
45
    assert out["source"] == "s"
46
    assert out["sourcetype"] == "st"
47
    assert out["index"] == "main"
48
    assert out["event"]["kind"] == "dns"
49
    assert out["event"]["action"] == "beacon-dns-callback"
50
    assert out["event"]["token_id"] == "tok_abc"
51
    assert out["event"]["tlog_index"] == 7
52
    assert out["fields"]["file_id"] == "file_xyz"
53
    assert out["fields"]["beacon_kind"] == "dns"
54
 
55
 
56
def test_splunk_drops_empty_optional_fields():
57
    evt = _sample_event(user_agent=None, source_ip=None, qualified_timestamp=None)
58
    out = siem.to_splunk_hec(evt)
59
    assert "user_agent" not in out["event"]
60
    assert "source_ip" not in out["event"]
61
    assert "qualified_timestamp" not in out["event"]
62
 
63
 
64
def test_ecs_document_has_canonical_fields():
65
    evt = _sample_event()
66
    out = siem.to_ecs(evt)
67
    assert out["@timestamp"] == siem.iso8601(1_735_000_000)
68
    assert out["ecs"]["version"] == siem.ECS_VERSION
69
    assert out["event"]["kind"] == "event"
70
    assert "network" in out["event"]["category"]
71
    assert out["event"]["dataset"] == "oversight.beacon"
72
    assert out["event"]["action"] == "beacon-dns-callback"
73
    assert out["source"]["ip"] == "198.51.100.42"
74
    assert out["user_agent"]["original"] == "Mozilla/5.0"
75
    assert out["labels"]["oversight_token_id"] == "tok_abc"
76
    assert out["oversight"]["registry_id"] == REGISTRY_ID
77
    assert out["oversight"]["tlog_index"] == 7
78
 
79
 
80
def test_ecs_ua_and_source_absent_when_empty():
81
    evt = _sample_event(user_agent=None, source_ip=None)
82
    out = siem.to_ecs(evt)
83
    assert "source" not in out
84
    assert "user_agent" not in out
85
 
86
 
87
def test_sentinel_flat_row_kql_friendly():
88
    evt = _sample_event()
89
    out = siem.to_sentinel(evt)
90
    assert out["TimeGenerated"] == siem.iso8601(1_735_000_000)
91
    assert out["BeaconKind"] == "dns"
92
    assert out["TokenId"] == "tok_abc"
93
    assert out["SourceIp"] == "198.51.100.42"
94
    assert out["TlogIndex"] == 7
95
    assert json.loads(out["ExtraJson"])["qname"] == "abc.t.example.com"
96
    assert "ExtraJson" in out
97
    assert all(not k.startswith("@") for k in out)
98
 
99
 
100
def test_from_registry_row_reads_sqlite_row(tmp_path):
101
    db_path = tmp_path / "events.db"
102
    con = sqlite3.connect(db_path)
103
    con.row_factory = sqlite3.Row
104
    con.executescript(
105
        """
106
        CREATE TABLE events (
107
            id INTEGER PRIMARY KEY AUTOINCREMENT,
108
            token_id TEXT NOT NULL,
109
            file_id TEXT,
110
            recipient_id TEXT,
111
            issuer_id TEXT,
112
            kind TEXT NOT NULL,
113
            source_ip TEXT,
114
            user_agent TEXT,
115
            extra TEXT,
116
            timestamp INTEGER NOT NULL,
117
            qualified_timestamp TEXT,
118
            tlog_index INTEGER
119
        );
120
        """
121
    )
122
    con.execute(
123
        "INSERT INTO events (token_id,file_id,recipient_id,issuer_id,kind,"
124
        "source_ip,user_agent,extra,timestamp,qualified_timestamp,tlog_index) "
125
        "VALUES (?,?,?,?,?,?,?,?,?,?,?)",
126
        ("tok", "file", "rcpt", "iss", "dns",
127
         "203.0.113.9", "curl/8", json.dumps({"qtype": "A"}),
128
         1_735_000_000, "2024-12-24T01:06:40Z", 11),
129
    )
130
    con.commit()
131
 
132
    row = con.execute("SELECT * FROM events WHERE id=1").fetchone()
133
    evt = siem.from_registry_row(row, registry_id=REGISTRY_ID)
134
    con.close()
135
 
136
    assert evt.event_kind == "dns"
137
    assert evt.token_id == "tok"
138
    assert evt.source_ip == "203.0.113.9"
139
    assert evt.tlog_index == 11
140
    assert evt.extra == {"qtype": "A"}
141
 
142
    events = list(siem.iter_registry_events(str(db_path), registry_id=REGISTRY_ID))
143
    assert len(events) == 1
144
    assert events[0].token_id == "tok"
145
 
146
 
147
def test_sentinel_authorization_matches_microsoft_recipe():
148
    workspace = "00000000-0000-0000-0000-000000000001"
149
    key_bytes = b"\x01" * 32
150
    shared_key_b64 = base64.b64encode(key_bytes).decode("utf-8")
151
    date = "Mon, 22 Apr 2026 12:00:00 GMT"
152
    body_len = 1234
153
 
154
    header1 = siem.sentinel_authorization(
155
        workspace_id=workspace,
156
        shared_key_b64=shared_key_b64,
157
        content_length=body_len,
158
        date_rfc1123=date,
159
    )
160
    header2 = siem.sentinel_authorization(
161
        workspace_id=workspace,
162
        shared_key_b64=shared_key_b64,
163
        content_length=body_len,
164
        date_rfc1123=date,
165
    )
166
    assert header1 == header2
167
    assert header1.startswith(f"SharedKey {workspace}:")
168
    assert len(header1.split(":")[-1]) >= 40
169
 
170
 
171
def test_filesink_and_stdoutsink_write_jsonl(tmp_path):
172
    evts = [_sample_event(event_id=str(i)) for i in range(3)]
173
    sink_path = tmp_path / "events.jsonl"
174
    sink = siem.FileSink(str(sink_path), mode="w")
175
    try:
176
        n = siem.export_events(events=iter(evts), fmt="ecs", sink=sink)
177
    finally:
178
        sink.close()
179
    assert n == 3
180
    lines = [json.loads(l) for l in sink_path.read_text().splitlines() if l.strip()]
181
    assert len(lines) == 3
182
    assert lines[0]["event"]["action"] == "beacon-dns-callback"
183
 
184
 
185
def test_unknown_format_raises():
186
    try:
187
        siem.format_event(_sample_event(), "wazuh")
188
    except ValueError as e:
189
        assert "wazuh" in str(e)
190
        return
191
    raise AssertionError("expected ValueError for unknown SIEM format")
192
 
193
 
194
def test_action_names_cover_all_beacon_kinds():
195
    for k in ("dns", "http_img", "ocsp", "license"):
196
        evt = _sample_event(event_kind=k)
197
        assert siem.to_splunk_hec(evt)["event"]["action"].startswith("beacon-")
198
        assert siem.to_ecs(evt)["event"]["action"].startswith("beacon-")
199
        assert siem.to_sentinel(evt)["Action"].startswith("beacon-")