| 1 | """SIEM export formatters for Oversight registry events. |
| 2 | |
| 3 | Oversight records beacon callbacks (DNS, HTTP pixel, OCSP, license) in the |
| 4 | registry's ``events`` table. Security teams need those events in whichever |
| 5 | incident pipeline they already run: Splunk, Microsoft Sentinel, or an |
| 6 | Elastic stack following the Elastic Common Schema. This module provides |
| 7 | schema-stable formatters for each of the three, a normalized event model, |
| 8 | and minimal file/HTTP sinks so operators can stream live or stage to a |
| 9 | forwarder. |
| 10 | |
| 11 | Formatters are pure. They do not perform network I/O and they do not |
| 12 | access the database. Transport lives in the sink classes and is |
| 13 | optional. The default workflow is to emit JSON lines and let an existing |
| 14 | site forwarder (Splunk Universal Forwarder, Azure Monitor Agent, |
| 15 | Filebeat) deliver them, so Oversight does not need to carry SIEM |
| 16 | credentials in the default deployment. |
| 17 | |
| 18 | Event semantics match the registry ``events`` table exactly. See |
| 19 | ``docs/SIEM.md`` for the field dictionary, the Sentinel HMAC signing |
| 20 | recipe, and example Splunk / Elastic dashboards. |
| 21 | """ |
| 22 | |
| 23 | from __future__ import annotations |
| 24 | |
| 25 | import base64 |
| 26 | import hashlib |
| 27 | import hmac |
| 28 | import json |
| 29 | import sqlite3 |
| 30 | import time |
| 31 | from dataclasses import dataclass, field, asdict |
| 32 | from datetime import datetime, timezone |
| 33 | from typing import Any, Iterable, Iterator, Mapping, Optional |
| 34 | |
| 35 | |
| 36 | ECS_VERSION = "8.11.0" |
| 37 | SCHEMA_VERSION = "oversight-siem-1" |
| 38 | |
| 39 | BEACON_KINDS = {"dns", "http_img", "ocsp", "license"} |
| 40 | ACTION_BY_KIND = { |
| 41 | "dns": "beacon-dns-callback", |
| 42 | "http_img": "beacon-http-pixel", |
| 43 | "ocsp": "beacon-ocsp-callback", |
| 44 | "license": "beacon-license-check", |
| 45 | } |
| 46 | |
| 47 | |
| 48 | def iso8601(unix_ts: int | float) -> str: |
| 49 | """RFC 3339 UTC timestamp to second precision, suitable for ECS ``@timestamp``.""" |
| 50 | return datetime.fromtimestamp(float(unix_ts), tz=timezone.utc).strftime( |
| 51 | "%Y-%m-%dT%H:%M:%SZ" |
| 52 | ) |
| 53 | |
| 54 | |
| 55 | @dataclass |
| 56 | class OversightEvent: |
| 57 | """Normalized Oversight event, one row of the registry ``events`` table. |
| 58 | |
| 59 | ``registry_id`` is the registry's ed25519 public key hex (or a short |
| 60 | fingerprint thereof), not an operator-chosen hostname. SIEM consumers |
| 61 | use it to tell federated registries apart. |
| 62 | """ |
| 63 | |
| 64 | event_id: str |
| 65 | event_kind: str |
| 66 | occurred_unix: int |
| 67 | occurred_at: str |
| 68 | registry_id: str |
| 69 | token_id: Optional[str] = None |
| 70 | file_id: Optional[str] = None |
| 71 | recipient_id: Optional[str] = None |
| 72 | issuer_id: Optional[str] = None |
| 73 | source_ip: Optional[str] = None |
| 74 | user_agent: Optional[str] = None |
| 75 | qualified_timestamp: Optional[str] = None |
| 76 | tlog_index: Optional[int] = None |
| 77 | extra: dict = field(default_factory=dict) |
| 78 | |
| 79 | def to_dict(self) -> dict: |
| 80 | return asdict(self) |
| 81 | |
| 82 | |
| 83 | def from_registry_row( |
| 84 | row: Mapping[str, Any] | sqlite3.Row, |
| 85 | *, |
| 86 | registry_id: str, |
| 87 | ) -> OversightEvent: |
| 88 | """Map a ``SELECT * FROM events`` row into an :class:`OversightEvent`.""" |
| 89 | if isinstance(row, sqlite3.Row): |
| 90 | d = {k: row[k] for k in row.keys()} |
| 91 | else: |
| 92 | d = dict(row) |
| 93 | |
| 94 | extra_raw = d.get("extra") or "" |
| 95 | try: |
| 96 | extra = json.loads(extra_raw) if extra_raw else {} |
| 97 | except (TypeError, json.JSONDecodeError): |
| 98 | extra = {"raw": extra_raw} |
| 99 | |
| 100 | occurred_unix = int(d.get("timestamp") or 0) |
| 101 | return OversightEvent( |
| 102 | event_id=str(d.get("id", "")), |
| 103 | event_kind=str(d.get("kind") or ""), |
| 104 | occurred_unix=occurred_unix, |
| 105 | occurred_at=iso8601(occurred_unix) if occurred_unix else "", |
| 106 | registry_id=registry_id, |
| 107 | token_id=d.get("token_id"), |
| 108 | file_id=d.get("file_id"), |
| 109 | recipient_id=d.get("recipient_id"), |
| 110 | issuer_id=d.get("issuer_id"), |
| 111 | source_ip=d.get("source_ip"), |
| 112 | user_agent=d.get("user_agent") or None, |
| 113 | qualified_timestamp=d.get("qualified_timestamp"), |
| 114 | tlog_index=d.get("tlog_index") if d.get("tlog_index") is not None else None, |
| 115 | extra=extra if isinstance(extra, dict) else {"raw": extra}, |
| 116 | ) |
| 117 | |
| 118 | |
| 119 | def _clean(d: dict) -> dict: |
| 120 | """Drop keys whose values are ``None`` or empty string.""" |
| 121 | return {k: v for k, v in d.items() if v not in (None, "")} |
| 122 | |
| 123 | |
| 124 | def to_splunk_hec( |
| 125 | evt: OversightEvent, |
| 126 | *, |
| 127 | source: str = "oversight:registry", |
| 128 | sourcetype: str = "oversight:beacon", |
| 129 | index: Optional[str] = None, |
| 130 | host: Optional[str] = None, |
| 131 | ) -> dict: |
| 132 | """Format an event as a Splunk HTTP Event Collector envelope. |
| 133 | |
| 134 | Posted one-per-line (JSONL) or one-per-request against |
| 135 | ``/services/collector/event``. ``time`` is epoch seconds as a float, |
| 136 | which Splunk accepts natively. |
| 137 | """ |
| 138 | envelope: dict = { |
| 139 | "time": float(evt.occurred_unix), |
| 140 | "host": host or evt.registry_id, |
| 141 | "source": source, |
| 142 | "sourcetype": sourcetype, |
| 143 | "event": _clean({ |
| 144 | "schema": SCHEMA_VERSION, |
| 145 | "kind": evt.event_kind, |
| 146 | "action": ACTION_BY_KIND.get(evt.event_kind, f"beacon-{evt.event_kind}"), |
| 147 | "event_id": evt.event_id, |
| 148 | "occurred_at": evt.occurred_at, |
| 149 | "token_id": evt.token_id, |
| 150 | "file_id": evt.file_id, |
| 151 | "recipient_id": evt.recipient_id, |
| 152 | "issuer_id": evt.issuer_id, |
| 153 | "source_ip": evt.source_ip, |
| 154 | "user_agent": evt.user_agent, |
| 155 | "qualified_timestamp": evt.qualified_timestamp, |
| 156 | "tlog_index": evt.tlog_index, |
| 157 | "registry_id": evt.registry_id, |
| 158 | "extra": evt.extra or None, |
| 159 | }), |
| 160 | "fields": _clean({ |
| 161 | "file_id": evt.file_id, |
| 162 | "recipient_id": evt.recipient_id, |
| 163 | "issuer_id": evt.issuer_id, |
| 164 | "beacon_kind": evt.event_kind, |
| 165 | }), |
| 166 | } |
| 167 | if index: |
| 168 | envelope["index"] = index |
| 169 | return envelope |
| 170 | |
| 171 | |
| 172 | def to_ecs(evt: OversightEvent) -> dict: |
| 173 | """Format an event as Elastic Common Schema 8.x. |
| 174 | |
| 175 | The custom ``oversight.*`` namespace carries protocol-native fields |
| 176 | that do not have a canonical ECS home. ECS reserves top-level |
| 177 | ``event.*``, ``source.*``, ``user_agent.*``, and ``labels.*`` for |
| 178 | the common cases so dashboards built on the Elastic Security app |
| 179 | light up without extra mapping work. |
| 180 | """ |
| 181 | ecs_event = _clean({ |
| 182 | "@timestamp": evt.occurred_at, |
| 183 | "ecs": {"version": ECS_VERSION}, |
| 184 | "event": _clean({ |
| 185 | "kind": "event", |
| 186 | "category": ["network"], |
| 187 | "type": ["access", "info"], |
| 188 | "dataset": "oversight.beacon", |
| 189 | "module": "oversight", |
| 190 | "provider": "oversight-registry", |
| 191 | "action": ACTION_BY_KIND.get(evt.event_kind, f"beacon-{evt.event_kind}"), |
| 192 | "id": evt.event_id, |
| 193 | "outcome": "success", |
| 194 | }), |
| 195 | "source": _clean({"ip": evt.source_ip}) or None, |
| 196 | "user_agent": _clean({"original": evt.user_agent}) or None, |
| 197 | "labels": _clean({ |
| 198 | "oversight_token_id": evt.token_id, |
| 199 | "oversight_file_id": evt.file_id, |
| 200 | "oversight_recipient_id": evt.recipient_id, |
| 201 | "oversight_issuer_id": evt.issuer_id, |
| 202 | "oversight_beacon_kind": evt.event_kind, |
| 203 | }), |
| 204 | "oversight": _clean({ |
| 205 | "schema": SCHEMA_VERSION, |
| 206 | "registry_id": evt.registry_id, |
| 207 | "token_id": evt.token_id, |
| 208 | "file_id": evt.file_id, |
| 209 | "recipient_id": evt.recipient_id, |
| 210 | "issuer_id": evt.issuer_id, |
| 211 | "beacon_kind": evt.event_kind, |
| 212 | "tlog_index": evt.tlog_index, |
| 213 | "qualified_timestamp": evt.qualified_timestamp, |
| 214 | "extra": evt.extra or None, |
| 215 | }), |
| 216 | }) |
| 217 | return ecs_event |
| 218 | |
| 219 | |
| 220 | def to_sentinel(evt: OversightEvent) -> dict: |
| 221 | """Format an event for Microsoft Sentinel's Log Analytics custom logs. |
| 222 | |
| 223 | Sentinel's Data Collector API accepts flat JSON objects. Nested |
| 224 | structures are allowed but become dynamic columns that are harder |
| 225 | to KQL against, so the flat shape is the operator-friendly default. |
| 226 | """ |
| 227 | return _clean({ |
| 228 | "TimeGenerated": evt.occurred_at, |
| 229 | "Schema": SCHEMA_VERSION, |
| 230 | "RegistryId": evt.registry_id, |
| 231 | "EventId": evt.event_id, |
| 232 | "BeaconKind": evt.event_kind, |
| 233 | "Action": ACTION_BY_KIND.get(evt.event_kind, f"beacon-{evt.event_kind}"), |
| 234 | "TokenId": evt.token_id, |
| 235 | "FileId": evt.file_id, |
| 236 | "RecipientId": evt.recipient_id, |
| 237 | "IssuerId": evt.issuer_id, |
| 238 | "SourceIp": evt.source_ip, |
| 239 | "UserAgent": evt.user_agent, |
| 240 | "QualifiedTimestamp": evt.qualified_timestamp, |
| 241 | "TlogIndex": evt.tlog_index, |
| 242 | "ExtraJson": json.dumps(evt.extra, separators=(",", ":")) if evt.extra else None, |
| 243 | }) |
| 244 | |
| 245 | |
| 246 | FORMATTERS = { |
| 247 | "splunk": to_splunk_hec, |
| 248 | "ecs": to_ecs, |
| 249 | "sentinel": to_sentinel, |
| 250 | } |
| 251 | |
| 252 | |
| 253 | def format_event(evt: OversightEvent, fmt: str, **kwargs) -> dict: |
| 254 | if fmt not in FORMATTERS: |
| 255 | raise ValueError(f"unknown SIEM format: {fmt!r} (choices: {sorted(FORMATTERS)})") |
| 256 | return FORMATTERS[fmt](evt, **kwargs) if fmt == "splunk" else FORMATTERS[fmt](evt) |
| 257 | |
| 258 | |
| 259 | |
| 260 | |
| 261 | def sentinel_authorization( |
| 262 | *, |
| 263 | workspace_id: str, |
| 264 | shared_key_b64: str, |
| 265 | content_length: int, |
| 266 | date_rfc1123: str, |
| 267 | method: str = "POST", |
| 268 | content_type: str = "application/json", |
| 269 | resource: str = "/api/logs", |
| 270 | ) -> str: |
| 271 | """Build the ``Authorization`` header for the Sentinel Data Collector API. |
| 272 | |
| 273 | The signing recipe follows Microsoft's current documentation for the |
| 274 | Log Analytics HTTP Data Collector API. Callers supply an RFC 1123 |
| 275 | ``x-ms-date`` value and Content-Length; this helper hashes the |
| 276 | canonical string and returns ``SharedKey {workspace_id}:{base64_hmac}`` |
| 277 | ready to drop into ``Authorization``. |
| 278 | """ |
| 279 | string_to_hash = ( |
| 280 | f"{method}\n{content_length}\n{content_type}\nx-ms-date:{date_rfc1123}\n{resource}" |
| 281 | ) |
| 282 | decoded_key = base64.b64decode(shared_key_b64) |
| 283 | digest = hmac.new( |
| 284 | decoded_key, string_to_hash.encode("utf-8"), hashlib.sha256 |
| 285 | ).digest() |
| 286 | encoded_hash = base64.b64encode(digest).decode("utf-8") |
| 287 | return f"SharedKey {workspace_id}:{encoded_hash}" |
| 288 | |
| 289 | |
| 290 | |
| 291 | |
| 292 | class Sink: |
| 293 | """Interface: ``send(records: Iterable[dict]) -> int`` returns emitted count.""" |
| 294 | |
| 295 | def send(self, records: Iterable[dict]) -> int: |
| 296 | raise NotImplementedError |
| 297 | |
| 298 | def close(self) -> None: |
| 299 | pass |
| 300 | |
| 301 | |
| 302 | class FileSink(Sink): |
| 303 | """Append JSON lines to a file. Safe for log-rotation forwarders.""" |
| 304 | |
| 305 | def __init__(self, path: str, *, mode: str = "a"): |
| 306 | if mode not in ("a", "w"): |
| 307 | raise ValueError("FileSink mode must be 'a' or 'w'") |
| 308 | self.path = path |
| 309 | self._fh = open(path, mode, encoding="utf-8") |
| 310 | |
| 311 | def send(self, records: Iterable[dict]) -> int: |
| 312 | n = 0 |
| 313 | for rec in records: |
| 314 | self._fh.write(json.dumps(rec, separators=(",", ":")) + "\n") |
| 315 | n += 1 |
| 316 | self._fh.flush() |
| 317 | return n |
| 318 | |
| 319 | def close(self) -> None: |
| 320 | try: |
| 321 | self._fh.close() |
| 322 | except Exception: |
| 323 | pass |
| 324 | |
| 325 | |
| 326 | class StdoutSink(Sink): |
| 327 | """JSON lines to stdout. Useful for piping into a forwarder.""" |
| 328 | |
| 329 | def __init__(self): |
| 330 | import sys |
| 331 | self._out = sys.stdout |
| 332 | |
| 333 | def send(self, records: Iterable[dict]) -> int: |
| 334 | n = 0 |
| 335 | for rec in records: |
| 336 | self._out.write(json.dumps(rec, separators=(",", ":")) + "\n") |
| 337 | n += 1 |
| 338 | self._out.flush() |
| 339 | return n |
| 340 | |
| 341 | |
| 342 | class HTTPJSONSink(Sink): |
| 343 | """POST records as a JSON array to a generic HTTP endpoint. |
| 344 | |
| 345 | Covers Splunk HEC (``Authorization: Splunk <token>``), Elastic |
| 346 | ``_bulk`` when callers pre-format the payload, and any in-house |
| 347 | HTTP collector. This sink does not retry; callers retry. |
| 348 | """ |
| 349 | |
| 350 | def __init__( |
| 351 | self, |
| 352 | url: str, |
| 353 | *, |
| 354 | headers: Optional[Mapping[str, str]] = None, |
| 355 | timeout: float = 10.0, |
| 356 | verify: bool = True, |
| 357 | ): |
| 358 | import httpx |
| 359 | self._client = httpx.Client(timeout=timeout, verify=verify) |
| 360 | self.url = url |
| 361 | self._headers = dict(headers or {}) |
| 362 | |
| 363 | def send(self, records: Iterable[dict]) -> int: |
| 364 | batch = list(records) |
| 365 | if not batch: |
| 366 | return 0 |
| 367 | resp = self._client.post(self.url, json=batch, headers=self._headers) |
| 368 | resp.raise_for_status() |
| 369 | return len(batch) |
| 370 | |
| 371 | def close(self) -> None: |
| 372 | try: |
| 373 | self._client.close() |
| 374 | except Exception: |
| 375 | pass |
| 376 | |
| 377 | |
| 378 | |
| 379 | |
| 380 | def iter_registry_events( |
| 381 | db_path: str, |
| 382 | *, |
| 383 | since_unix: Optional[int] = None, |
| 384 | limit: Optional[int] = None, |
| 385 | registry_id: str, |
| 386 | ) -> Iterator[OversightEvent]: |
| 387 | """Yield :class:`OversightEvent` records from a registry SQLite database. |
| 388 | |
| 389 | Read-only; opens the DB with ``PRAGMA query_only=ON`` so an operator |
| 390 | can safely run this against a live registry. The caller is responsible |
| 391 | for passing the registry's own public identifier (typically |
| 392 | ``IDENTITY['ed25519_pub']``). |
| 393 | """ |
| 394 | con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) |
| 395 | try: |
| 396 | con.row_factory = sqlite3.Row |
| 397 | con.execute("PRAGMA query_only=ON") |
| 398 | sql = "SELECT * FROM events" |
| 399 | params: list = [] |
| 400 | if since_unix is not None: |
| 401 | sql += " WHERE timestamp >= ?" |
| 402 | params.append(int(since_unix)) |
| 403 | sql += " ORDER BY timestamp ASC, id ASC" |
| 404 | if limit is not None: |
| 405 | sql += " LIMIT ?" |
| 406 | params.append(int(limit)) |
| 407 | for row in con.execute(sql, params): |
| 408 | yield from_registry_row(row, registry_id=registry_id) |
| 409 | finally: |
| 410 | con.close() |
| 411 | |
| 412 | |
| 413 | def export_events( |
| 414 | *, |
| 415 | events: Iterable[OversightEvent], |
| 416 | fmt: str, |
| 417 | sink: Sink, |
| 418 | splunk_kwargs: Optional[dict] = None, |
| 419 | ) -> int: |
| 420 | """Format and push events through a sink. Returns emitted count.""" |
| 421 | if fmt not in FORMATTERS: |
| 422 | raise ValueError(f"unknown SIEM format: {fmt!r}") |
| 423 | splunk_kwargs = splunk_kwargs or {} |
| 424 | def _gen(): |
| 425 | for evt in events: |
| 426 | if fmt == "splunk": |
| 427 | yield to_splunk_hec(evt, **splunk_kwargs) |
| 428 | else: |
| 429 | yield FORMATTERS[fmt](evt) |
| 430 | return sink.send(_gen()) |