| 1 | """Focused tests for the SIEM export formatters and registry-row mapping.""" |
| 2 | |
| 3 | import base64 |
| 4 | import json |
| 5 | import os |
| 6 | import sqlite3 |
| 7 | import sys |
| 8 | |
| 9 | ROOT = os.path.join(os.path.dirname(__file__), "..") |
| 10 | sys.path.insert(0, ROOT) |
| 11 | |
| 12 | from oversight_core import siem |
| 13 | |
| 14 | |
| 15 | REGISTRY_ID = "deadbeef" * 8 |
| 16 | |
| 17 | |
| 18 | def _sample_event(**overrides) -> siem.OversightEvent: |
| 19 | base = dict( |
| 20 | event_id="42", |
| 21 | event_kind="dns", |
| 22 | occurred_unix=1_735_000_000, |
| 23 | occurred_at=siem.iso8601(1_735_000_000), |
| 24 | registry_id=REGISTRY_ID, |
| 25 | token_id="tok_abc", |
| 26 | file_id="file_xyz", |
| 27 | recipient_id="rcpt_alice", |
| 28 | issuer_id="issuer_zion", |
| 29 | source_ip="198.51.100.42", |
| 30 | user_agent="Mozilla/5.0", |
| 31 | qualified_timestamp="2024-12-24T01:06:40Z", |
| 32 | tlog_index=7, |
| 33 | extra={"qname": "abc.t.example.com", "qtype": "A"}, |
| 34 | ) |
| 35 | base.update(overrides) |
| 36 | return siem.OversightEvent(**base) |
| 37 | |
| 38 | |
| 39 | def test_splunk_envelope_carries_time_host_event_and_fields(): |
| 40 | evt = _sample_event() |
| 41 | out = siem.to_splunk_hec(evt, source="s", sourcetype="st", index="main", host="h") |
| 42 | |
| 43 | assert out["time"] == 1_735_000_000.0 |
| 44 | assert out["host"] == "h" |
| 45 | assert out["source"] == "s" |
| 46 | assert out["sourcetype"] == "st" |
| 47 | assert out["index"] == "main" |
| 48 | assert out["event"]["kind"] == "dns" |
| 49 | assert out["event"]["action"] == "beacon-dns-callback" |
| 50 | assert out["event"]["token_id"] == "tok_abc" |
| 51 | assert out["event"]["tlog_index"] == 7 |
| 52 | assert out["fields"]["file_id"] == "file_xyz" |
| 53 | assert out["fields"]["beacon_kind"] == "dns" |
| 54 | |
| 55 | |
| 56 | def test_splunk_drops_empty_optional_fields(): |
| 57 | evt = _sample_event(user_agent=None, source_ip=None, qualified_timestamp=None) |
| 58 | out = siem.to_splunk_hec(evt) |
| 59 | assert "user_agent" not in out["event"] |
| 60 | assert "source_ip" not in out["event"] |
| 61 | assert "qualified_timestamp" not in out["event"] |
| 62 | |
| 63 | |
| 64 | def test_ecs_document_has_canonical_fields(): |
| 65 | evt = _sample_event() |
| 66 | out = siem.to_ecs(evt) |
| 67 | assert out["@timestamp"] == siem.iso8601(1_735_000_000) |
| 68 | assert out["ecs"]["version"] == siem.ECS_VERSION |
| 69 | assert out["event"]["kind"] == "event" |
| 70 | assert "network" in out["event"]["category"] |
| 71 | assert out["event"]["dataset"] == "oversight.beacon" |
| 72 | assert out["event"]["action"] == "beacon-dns-callback" |
| 73 | assert out["source"]["ip"] == "198.51.100.42" |
| 74 | assert out["user_agent"]["original"] == "Mozilla/5.0" |
| 75 | assert out["labels"]["oversight_token_id"] == "tok_abc" |
| 76 | assert out["oversight"]["registry_id"] == REGISTRY_ID |
| 77 | assert out["oversight"]["tlog_index"] == 7 |
| 78 | |
| 79 | |
| 80 | def test_ecs_ua_and_source_absent_when_empty(): |
| 81 | evt = _sample_event(user_agent=None, source_ip=None) |
| 82 | out = siem.to_ecs(evt) |
| 83 | assert "source" not in out |
| 84 | assert "user_agent" not in out |
| 85 | |
| 86 | |
| 87 | def test_sentinel_flat_row_kql_friendly(): |
| 88 | evt = _sample_event() |
| 89 | out = siem.to_sentinel(evt) |
| 90 | assert out["TimeGenerated"] == siem.iso8601(1_735_000_000) |
| 91 | assert out["BeaconKind"] == "dns" |
| 92 | assert out["TokenId"] == "tok_abc" |
| 93 | assert out["SourceIp"] == "198.51.100.42" |
| 94 | assert out["TlogIndex"] == 7 |
| 95 | assert json.loads(out["ExtraJson"])["qname"] == "abc.t.example.com" |
| 96 | assert "ExtraJson" in out |
| 97 | assert all(not k.startswith("@") for k in out) |
| 98 | |
| 99 | |
| 100 | def test_from_registry_row_reads_sqlite_row(tmp_path): |
| 101 | db_path = tmp_path / "events.db" |
| 102 | con = sqlite3.connect(db_path) |
| 103 | con.row_factory = sqlite3.Row |
| 104 | con.executescript( |
| 105 | """ |
| 106 | CREATE TABLE events ( |
| 107 | id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 108 | token_id TEXT NOT NULL, |
| 109 | file_id TEXT, |
| 110 | recipient_id TEXT, |
| 111 | issuer_id TEXT, |
| 112 | kind TEXT NOT NULL, |
| 113 | source_ip TEXT, |
| 114 | user_agent TEXT, |
| 115 | extra TEXT, |
| 116 | timestamp INTEGER NOT NULL, |
| 117 | qualified_timestamp TEXT, |
| 118 | tlog_index INTEGER |
| 119 | ); |
| 120 | """ |
| 121 | ) |
| 122 | con.execute( |
| 123 | "INSERT INTO events (token_id,file_id,recipient_id,issuer_id,kind," |
| 124 | "source_ip,user_agent,extra,timestamp,qualified_timestamp,tlog_index) " |
| 125 | "VALUES (?,?,?,?,?,?,?,?,?,?,?)", |
| 126 | ("tok", "file", "rcpt", "iss", "dns", |
| 127 | "203.0.113.9", "curl/8", json.dumps({"qtype": "A"}), |
| 128 | 1_735_000_000, "2024-12-24T01:06:40Z", 11), |
| 129 | ) |
| 130 | con.commit() |
| 131 | |
| 132 | row = con.execute("SELECT * FROM events WHERE id=1").fetchone() |
| 133 | evt = siem.from_registry_row(row, registry_id=REGISTRY_ID) |
| 134 | con.close() |
| 135 | |
| 136 | assert evt.event_kind == "dns" |
| 137 | assert evt.token_id == "tok" |
| 138 | assert evt.source_ip == "203.0.113.9" |
| 139 | assert evt.tlog_index == 11 |
| 140 | assert evt.extra == {"qtype": "A"} |
| 141 | |
| 142 | events = list(siem.iter_registry_events(str(db_path), registry_id=REGISTRY_ID)) |
| 143 | assert len(events) == 1 |
| 144 | assert events[0].token_id == "tok" |
| 145 | |
| 146 | |
| 147 | def test_sentinel_authorization_matches_microsoft_recipe(): |
| 148 | workspace = "00000000-0000-0000-0000-000000000001" |
| 149 | key_bytes = b"\x01" * 32 |
| 150 | shared_key_b64 = base64.b64encode(key_bytes).decode("utf-8") |
| 151 | date = "Mon, 22 Apr 2026 12:00:00 GMT" |
| 152 | body_len = 1234 |
| 153 | |
| 154 | header1 = siem.sentinel_authorization( |
| 155 | workspace_id=workspace, |
| 156 | shared_key_b64=shared_key_b64, |
| 157 | content_length=body_len, |
| 158 | date_rfc1123=date, |
| 159 | ) |
| 160 | header2 = siem.sentinel_authorization( |
| 161 | workspace_id=workspace, |
| 162 | shared_key_b64=shared_key_b64, |
| 163 | content_length=body_len, |
| 164 | date_rfc1123=date, |
| 165 | ) |
| 166 | assert header1 == header2 |
| 167 | assert header1.startswith(f"SharedKey {workspace}:") |
| 168 | assert len(header1.split(":")[-1]) >= 40 |
| 169 | |
| 170 | |
| 171 | def test_filesink_and_stdoutsink_write_jsonl(tmp_path): |
| 172 | evts = [_sample_event(event_id=str(i)) for i in range(3)] |
| 173 | sink_path = tmp_path / "events.jsonl" |
| 174 | sink = siem.FileSink(str(sink_path), mode="w") |
| 175 | try: |
| 176 | n = siem.export_events(events=iter(evts), fmt="ecs", sink=sink) |
| 177 | finally: |
| 178 | sink.close() |
| 179 | assert n == 3 |
| 180 | lines = [json.loads(l) for l in sink_path.read_text().splitlines() if l.strip()] |
| 181 | assert len(lines) == 3 |
| 182 | assert lines[0]["event"]["action"] == "beacon-dns-callback" |
| 183 | |
| 184 | |
| 185 | def test_unknown_format_raises(): |
| 186 | try: |
| 187 | siem.format_event(_sample_event(), "wazuh") |
| 188 | except ValueError as e: |
| 189 | assert "wazuh" in str(e) |
| 190 | return |
| 191 | raise AssertionError("expected ValueError for unknown SIEM format") |
| 192 | |
| 193 | |
| 194 | def test_action_names_cover_all_beacon_kinds(): |
| 195 | for k in ("dns", "http_img", "ocsp", "license"): |
| 196 | evt = _sample_event(event_kind=k) |
| 197 | assert siem.to_splunk_hec(evt)["event"]["action"].startswith("beacon-") |
| 198 | assert siem.to_ecs(evt)["event"]["action"].startswith("beacon-") |
| 199 | assert siem.to_sentinel(evt)["Action"].startswith("beacon-") |