Zion Boggan
repos/Oversight/oversight_core/siem.py
zionboggan.com ↗
430 lines · python
History for this file →
1
"""SIEM export formatters for Oversight registry events.
2
 
3
Oversight records beacon callbacks (DNS, HTTP pixel, OCSP, license) in the
4
registry's ``events`` table. Security teams need those events in whichever
5
incident pipeline they already run: Splunk, Microsoft Sentinel, or an
6
Elastic stack following the Elastic Common Schema. This module provides
7
schema-stable formatters for each of the three, a normalized event model,
8
and minimal file/HTTP sinks so operators can stream live or stage to a
9
forwarder.
10
 
11
Formatters are pure. They do not perform network I/O and they do not
12
access the database. Transport lives in the sink classes and is
13
optional. The default workflow is to emit JSON lines and let an existing
14
site forwarder (Splunk Universal Forwarder, Azure Monitor Agent,
15
Filebeat) deliver them, so Oversight does not need to carry SIEM
16
credentials in the default deployment.
17
 
18
Event semantics match the registry ``events`` table exactly. See
19
``docs/SIEM.md`` for the field dictionary, the Sentinel HMAC signing
20
recipe, and example Splunk / Elastic dashboards.
21
"""
22
 
23
from __future__ import annotations
24
 
25
import base64
26
import hashlib
27
import hmac
28
import json
29
import sqlite3
30
import time
31
from dataclasses import dataclass, field, asdict
32
from datetime import datetime, timezone
33
from typing import Any, Iterable, Iterator, Mapping, Optional
34
 
35
 
36
ECS_VERSION = "8.11.0"
37
SCHEMA_VERSION = "oversight-siem-1"
38
 
39
BEACON_KINDS = {"dns", "http_img", "ocsp", "license"}
40
ACTION_BY_KIND = {
41
    "dns": "beacon-dns-callback",
42
    "http_img": "beacon-http-pixel",
43
    "ocsp": "beacon-ocsp-callback",
44
    "license": "beacon-license-check",
45
}
46
 
47
 
48
def iso8601(unix_ts: int | float) -> str:
49
    """RFC 3339 UTC timestamp to second precision, suitable for ECS ``@timestamp``."""
50
    return datetime.fromtimestamp(float(unix_ts), tz=timezone.utc).strftime(
51
        "%Y-%m-%dT%H:%M:%SZ"
52
    )
53
 
54
 
55
@dataclass
56
class OversightEvent:
57
    """Normalized Oversight event, one row of the registry ``events`` table.
58
 
59
    ``registry_id`` is the registry's ed25519 public key hex (or a short
60
    fingerprint thereof), not an operator-chosen hostname. SIEM consumers
61
    use it to tell federated registries apart.
62
    """
63
 
64
    event_id: str
65
    event_kind: str
66
    occurred_unix: int
67
    occurred_at: str
68
    registry_id: str
69
    token_id: Optional[str] = None
70
    file_id: Optional[str] = None
71
    recipient_id: Optional[str] = None
72
    issuer_id: Optional[str] = None
73
    source_ip: Optional[str] = None
74
    user_agent: Optional[str] = None
75
    qualified_timestamp: Optional[str] = None
76
    tlog_index: Optional[int] = None
77
    extra: dict = field(default_factory=dict)
78
 
79
    def to_dict(self) -> dict:
80
        return asdict(self)
81
 
82
 
83
def from_registry_row(
84
    row: Mapping[str, Any] | sqlite3.Row,
85
    *,
86
    registry_id: str,
87
) -> OversightEvent:
88
    """Map a ``SELECT * FROM events`` row into an :class:`OversightEvent`."""
89
    if isinstance(row, sqlite3.Row):
90
        d = {k: row[k] for k in row.keys()}
91
    else:
92
        d = dict(row)
93
 
94
    extra_raw = d.get("extra") or ""
95
    try:
96
        extra = json.loads(extra_raw) if extra_raw else {}
97
    except (TypeError, json.JSONDecodeError):
98
        extra = {"raw": extra_raw}
99
 
100
    occurred_unix = int(d.get("timestamp") or 0)
101
    return OversightEvent(
102
        event_id=str(d.get("id", "")),
103
        event_kind=str(d.get("kind") or ""),
104
        occurred_unix=occurred_unix,
105
        occurred_at=iso8601(occurred_unix) if occurred_unix else "",
106
        registry_id=registry_id,
107
        token_id=d.get("token_id"),
108
        file_id=d.get("file_id"),
109
        recipient_id=d.get("recipient_id"),
110
        issuer_id=d.get("issuer_id"),
111
        source_ip=d.get("source_ip"),
112
        user_agent=d.get("user_agent") or None,
113
        qualified_timestamp=d.get("qualified_timestamp"),
114
        tlog_index=d.get("tlog_index") if d.get("tlog_index") is not None else None,
115
        extra=extra if isinstance(extra, dict) else {"raw": extra},
116
    )
117
 
118
 
119
def _clean(d: dict) -> dict:
120
    """Drop keys whose values are ``None`` or empty string."""
121
    return {k: v for k, v in d.items() if v not in (None, "")}
122
 
123
 
124
def to_splunk_hec(
125
    evt: OversightEvent,
126
    *,
127
    source: str = "oversight:registry",
128
    sourcetype: str = "oversight:beacon",
129
    index: Optional[str] = None,
130
    host: Optional[str] = None,
131
) -> dict:
132
    """Format an event as a Splunk HTTP Event Collector envelope.
133
 
134
    Posted one-per-line (JSONL) or one-per-request against
135
    ``/services/collector/event``. ``time`` is epoch seconds as a float,
136
    which Splunk accepts natively.
137
    """
138
    envelope: dict = {
139
        "time": float(evt.occurred_unix),
140
        "host": host or evt.registry_id,
141
        "source": source,
142
        "sourcetype": sourcetype,
143
        "event": _clean({
144
            "schema": SCHEMA_VERSION,
145
            "kind": evt.event_kind,
146
            "action": ACTION_BY_KIND.get(evt.event_kind, f"beacon-{evt.event_kind}"),
147
            "event_id": evt.event_id,
148
            "occurred_at": evt.occurred_at,
149
            "token_id": evt.token_id,
150
            "file_id": evt.file_id,
151
            "recipient_id": evt.recipient_id,
152
            "issuer_id": evt.issuer_id,
153
            "source_ip": evt.source_ip,
154
            "user_agent": evt.user_agent,
155
            "qualified_timestamp": evt.qualified_timestamp,
156
            "tlog_index": evt.tlog_index,
157
            "registry_id": evt.registry_id,
158
            "extra": evt.extra or None,
159
        }),
160
        "fields": _clean({
161
            "file_id": evt.file_id,
162
            "recipient_id": evt.recipient_id,
163
            "issuer_id": evt.issuer_id,
164
            "beacon_kind": evt.event_kind,
165
        }),
166
    }
167
    if index:
168
        envelope["index"] = index
169
    return envelope
170
 
171
 
172
def to_ecs(evt: OversightEvent) -> dict:
173
    """Format an event as Elastic Common Schema 8.x.
174
 
175
    The custom ``oversight.*`` namespace carries protocol-native fields
176
    that do not have a canonical ECS home. ECS reserves top-level
177
    ``event.*``, ``source.*``, ``user_agent.*``, and ``labels.*`` for
178
    the common cases so dashboards built on the Elastic Security app
179
    light up without extra mapping work.
180
    """
181
    ecs_event = _clean({
182
        "@timestamp": evt.occurred_at,
183
        "ecs": {"version": ECS_VERSION},
184
        "event": _clean({
185
            "kind": "event",
186
            "category": ["network"],
187
            "type": ["access", "info"],
188
            "dataset": "oversight.beacon",
189
            "module": "oversight",
190
            "provider": "oversight-registry",
191
            "action": ACTION_BY_KIND.get(evt.event_kind, f"beacon-{evt.event_kind}"),
192
            "id": evt.event_id,
193
            "outcome": "success",
194
        }),
195
        "source": _clean({"ip": evt.source_ip}) or None,
196
        "user_agent": _clean({"original": evt.user_agent}) or None,
197
        "labels": _clean({
198
            "oversight_token_id": evt.token_id,
199
            "oversight_file_id": evt.file_id,
200
            "oversight_recipient_id": evt.recipient_id,
201
            "oversight_issuer_id": evt.issuer_id,
202
            "oversight_beacon_kind": evt.event_kind,
203
        }),
204
        "oversight": _clean({
205
            "schema": SCHEMA_VERSION,
206
            "registry_id": evt.registry_id,
207
            "token_id": evt.token_id,
208
            "file_id": evt.file_id,
209
            "recipient_id": evt.recipient_id,
210
            "issuer_id": evt.issuer_id,
211
            "beacon_kind": evt.event_kind,
212
            "tlog_index": evt.tlog_index,
213
            "qualified_timestamp": evt.qualified_timestamp,
214
            "extra": evt.extra or None,
215
        }),
216
    })
217
    return ecs_event
218
 
219
 
220
def to_sentinel(evt: OversightEvent) -> dict:
221
    """Format an event for Microsoft Sentinel's Log Analytics custom logs.
222
 
223
    Sentinel's Data Collector API accepts flat JSON objects. Nested
224
    structures are allowed but become dynamic columns that are harder
225
    to KQL against, so the flat shape is the operator-friendly default.
226
    """
227
    return _clean({
228
        "TimeGenerated": evt.occurred_at,
229
        "Schema": SCHEMA_VERSION,
230
        "RegistryId": evt.registry_id,
231
        "EventId": evt.event_id,
232
        "BeaconKind": evt.event_kind,
233
        "Action": ACTION_BY_KIND.get(evt.event_kind, f"beacon-{evt.event_kind}"),
234
        "TokenId": evt.token_id,
235
        "FileId": evt.file_id,
236
        "RecipientId": evt.recipient_id,
237
        "IssuerId": evt.issuer_id,
238
        "SourceIp": evt.source_ip,
239
        "UserAgent": evt.user_agent,
240
        "QualifiedTimestamp": evt.qualified_timestamp,
241
        "TlogIndex": evt.tlog_index,
242
        "ExtraJson": json.dumps(evt.extra, separators=(",", ":")) if evt.extra else None,
243
    })
244
 
245
 
246
FORMATTERS = {
247
    "splunk": to_splunk_hec,
248
    "ecs": to_ecs,
249
    "sentinel": to_sentinel,
250
}
251
 
252
 
253
def format_event(evt: OversightEvent, fmt: str, **kwargs) -> dict:
254
    if fmt not in FORMATTERS:
255
        raise ValueError(f"unknown SIEM format: {fmt!r} (choices: {sorted(FORMATTERS)})")
256
    return FORMATTERS[fmt](evt, **kwargs) if fmt == "splunk" else FORMATTERS[fmt](evt)
257
 
258
 
259
 
260
 
261
def sentinel_authorization(
262
    *,
263
    workspace_id: str,
264
    shared_key_b64: str,
265
    content_length: int,
266
    date_rfc1123: str,
267
    method: str = "POST",
268
    content_type: str = "application/json",
269
    resource: str = "/api/logs",
270
) -> str:
271
    """Build the ``Authorization`` header for the Sentinel Data Collector API.
272
 
273
    The signing recipe follows Microsoft's current documentation for the
274
    Log Analytics HTTP Data Collector API. Callers supply an RFC 1123
275
    ``x-ms-date`` value and Content-Length; this helper hashes the
276
    canonical string and returns ``SharedKey {workspace_id}:{base64_hmac}``
277
    ready to drop into ``Authorization``.
278
    """
279
    string_to_hash = (
280
        f"{method}\n{content_length}\n{content_type}\nx-ms-date:{date_rfc1123}\n{resource}"
281
    )
282
    decoded_key = base64.b64decode(shared_key_b64)
283
    digest = hmac.new(
284
        decoded_key, string_to_hash.encode("utf-8"), hashlib.sha256
285
    ).digest()
286
    encoded_hash = base64.b64encode(digest).decode("utf-8")
287
    return f"SharedKey {workspace_id}:{encoded_hash}"
288
 
289
 
290
 
291
 
292
class Sink:
293
    """Interface: ``send(records: Iterable[dict]) -> int`` returns emitted count."""
294
 
295
    def send(self, records: Iterable[dict]) -> int:
296
        raise NotImplementedError
297
 
298
    def close(self) -> None:
299
        pass
300
 
301
 
302
class FileSink(Sink):
303
    """Append JSON lines to a file. Safe for log-rotation forwarders."""
304
 
305
    def __init__(self, path: str, *, mode: str = "a"):
306
        if mode not in ("a", "w"):
307
            raise ValueError("FileSink mode must be 'a' or 'w'")
308
        self.path = path
309
        self._fh = open(path, mode, encoding="utf-8")
310
 
311
    def send(self, records: Iterable[dict]) -> int:
312
        n = 0
313
        for rec in records:
314
            self._fh.write(json.dumps(rec, separators=(",", ":")) + "\n")
315
            n += 1
316
        self._fh.flush()
317
        return n
318
 
319
    def close(self) -> None:
320
        try:
321
            self._fh.close()
322
        except Exception:
323
            pass
324
 
325
 
326
class StdoutSink(Sink):
327
    """JSON lines to stdout. Useful for piping into a forwarder."""
328
 
329
    def __init__(self):
330
        import sys
331
        self._out = sys.stdout
332
 
333
    def send(self, records: Iterable[dict]) -> int:
334
        n = 0
335
        for rec in records:
336
            self._out.write(json.dumps(rec, separators=(",", ":")) + "\n")
337
            n += 1
338
        self._out.flush()
339
        return n
340
 
341
 
342
class HTTPJSONSink(Sink):
343
    """POST records as a JSON array to a generic HTTP endpoint.
344
 
345
    Covers Splunk HEC (``Authorization: Splunk <token>``), Elastic
346
    ``_bulk`` when callers pre-format the payload, and any in-house
347
    HTTP collector. This sink does not retry; callers retry.
348
    """
349
 
350
    def __init__(
351
        self,
352
        url: str,
353
        *,
354
        headers: Optional[Mapping[str, str]] = None,
355
        timeout: float = 10.0,
356
        verify: bool = True,
357
    ):
358
        import httpx
359
        self._client = httpx.Client(timeout=timeout, verify=verify)
360
        self.url = url
361
        self._headers = dict(headers or {})
362
 
363
    def send(self, records: Iterable[dict]) -> int:
364
        batch = list(records)
365
        if not batch:
366
            return 0
367
        resp = self._client.post(self.url, json=batch, headers=self._headers)
368
        resp.raise_for_status()
369
        return len(batch)
370
 
371
    def close(self) -> None:
372
        try:
373
            self._client.close()
374
        except Exception:
375
            pass
376
 
377
 
378
 
379
 
380
def iter_registry_events(
381
    db_path: str,
382
    *,
383
    since_unix: Optional[int] = None,
384
    limit: Optional[int] = None,
385
    registry_id: str,
386
) -> Iterator[OversightEvent]:
387
    """Yield :class:`OversightEvent` records from a registry SQLite database.
388
 
389
    Read-only; opens the DB with ``PRAGMA query_only=ON`` so an operator
390
    can safely run this against a live registry. The caller is responsible
391
    for passing the registry's own public identifier (typically
392
    ``IDENTITY['ed25519_pub']``).
393
    """
394
    con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
395
    try:
396
        con.row_factory = sqlite3.Row
397
        con.execute("PRAGMA query_only=ON")
398
        sql = "SELECT * FROM events"
399
        params: list = []
400
        if since_unix is not None:
401
            sql += " WHERE timestamp >= ?"
402
            params.append(int(since_unix))
403
        sql += " ORDER BY timestamp ASC, id ASC"
404
        if limit is not None:
405
            sql += " LIMIT ?"
406
            params.append(int(limit))
407
        for row in con.execute(sql, params):
408
            yield from_registry_row(row, registry_id=registry_id)
409
    finally:
410
        con.close()
411
 
412
 
413
def export_events(
414
    *,
415
    events: Iterable[OversightEvent],
416
    fmt: str,
417
    sink: Sink,
418
    splunk_kwargs: Optional[dict] = None,
419
) -> int:
420
    """Format and push events through a sink. Returns emitted count."""
421
    if fmt not in FORMATTERS:
422
        raise ValueError(f"unknown SIEM format: {fmt!r}")
423
    splunk_kwargs = splunk_kwargs or {}
424
    def _gen():
425
        for evt in events:
426
            if fmt == "splunk":
427
                yield to_splunk_hec(evt, **splunk_kwargs)
428
            else:
429
                yield FORMATTERS[fmt](evt)
430
    return sink.send(_gen())