Zion Boggan zionboggan.com ↗

Add SIEM export: Splunk HEC, Microsoft Sentinel, Elastic Common Schema

Security teams running Splunk, Sentinel, or an ECS-based Elastic stack
need Oversight beacon events in the same pipeline as the rest of their
telemetry. v0.4.6 ships a normalized OversightEvent model, pure
formatters for each of the three SIEMs, and minimal file/stdout/HTTP
sinks so operators can stream live or stage for an existing forwarder.

- oversight_core/siem.py: event model, formatters, sinks, Sentinel HMAC
  signing helper, read-only SQLite iterator.
- cli/oversight.py: oversight siem export subcommand. Streams events as
  JSON lines to stdout, a file, or an HTTPS collector. Supports --since,
  --limit, repeatable --header, and Splunk source/sourcetype/index
  overrides. Opens the registry DB read-only for safe live export.
- docs/SIEM.md: operator integration guide with per-SIEM recipes, the
  event field dictionary, the Sentinel signing window, and the honest
  beacon-absence caveat.
- tests/test_siem_unit.py: 11 unit tests covering envelope shape per
  format, None-field suppression, SQLite row mapping, read-only
  iteration, Sentinel HMAC determinism, and action-name coverage.
- docs/ROADMAP.md: SIEM integration item marked shipped.
- Version bumped to 0.4.6. Additive; no breaking changes.
9c2b55a   Zion Boggan committed on Apr 22, 2026 (2 months ago)
CHANGELOG.md +27 -0
@@ -1,5 +1,32 @@
# Oversight CHANGELOG
+## v0.4.6 - 2026-04-22 SIEM export: Splunk, Sentinel, and Elastic
+
+Registry beacon events can now be emitted in three SIEM-native formats so
+security teams get Oversight data into the incident pipeline they already
+run. Formatters are pure; transport is a thin sink layer.
+
+- `oversight_core/siem.py`: new module. Normalized `OversightEvent` model
+ built from the registry `events` table, pure formatters for Splunk HEC,
+ Elastic Common Schema 8.x, and Microsoft Sentinel (Log Analytics custom
+ logs), plus `sentinel_authorization()` helper that signs the Data
+ Collector API `Authorization` header per Microsoft's recipe.
+- `cli/oversight.py`: new `oversight siem export` subcommand. Streams
+ events as JSON lines to stdout, a file, or an HTTPS collector. Supports
+ `--since`, `--limit`, repeatable `--header`, and Splunk source/sourcetype/
+ index overrides. Opens the registry database read-only so it is safe
+ to run against a live service.
+- `docs/SIEM.md`: operator integration guide covering each of the three
+ SIEMs, the event field dictionary, the Sentinel HMAC signing window,
+ and the honest beacon-absence caveat. Also surfaced from the website
+ docs index.
+- `tests/test_siem_unit.py`: 11 focused unit tests covering envelope
+ shape per format, empty-field suppression, SQLite row mapping,
+ read-only iteration, Sentinel HMAC stability, and action-name
+ coverage for every beacon kind.
+- `oversight_core/__init__.py` and `pyproject.toml`: version bumped to
+ `0.4.6`. No breaking changes; SIEM is additive.
+
## v0.4.5 - 2026-04-20 L3 safety, GUI, and registry federation docs
Review-driven hardening from `P:/Oversight/oversight-protocol-review.md`.
cli/oversight.py +66 -0
@@ -453,6 +453,53 @@ def cmd_attribute(args):
# ---------------- main ----------------
+def cmd_siem(args):
+ if args.siem_cmd != "export":
+ raise SystemExit(f"[!] unknown siem command: {args.siem_cmd}")
+
+ from oversight_core import siem
+
+ events = siem.iter_registry_events(
+ args.db,
+ since_unix=args.since,
+ limit=args.limit,
+ registry_id=args.registry_id,
+ )
+
+ output = args.output
+ if output == "-":
+ sink: siem.Sink = siem.StdoutSink()
+ elif output.startswith("http://") or output.startswith("https://"):
+ headers = {}
+ for h in args.header:
+ if ":" not in h:
+ raise SystemExit(f"[!] --header must be 'Key: Value', got: {h!r}")
+ k, v = h.split(":", 1)
+ headers[k.strip()] = v.strip()
+ sink = siem.HTTPJSONSink(output, headers=headers)
+ else:
+ sink = siem.FileSink(output, mode="a")
+
+ try:
+ splunk_kwargs = {}
+ if args.format == "splunk":
+ splunk_kwargs = {
+ "source": args.splunk_source,
+ "sourcetype": args.splunk_sourcetype,
+ "index": args.splunk_index,
+ }
+ count = siem.export_events(
+ events=events,
+ fmt=args.format,
+ sink=sink,
+ splunk_kwargs=splunk_kwargs,
+ )
+ finally:
+ sink.close()
+
+ sys.stderr.write(f"[ok] exported {count} event(s) as {args.format}\n")
+
+
def main():
p = argparse.ArgumentParser(prog="oversight")
sub = p.add_subparsers(dest="cmd", required=True)
@@ -496,6 +543,24 @@ def main():
a.add_argument("--fingerprints", default=None,
help="path to fingerprint file or directory for VM-strip detection")
+ x = sub.add_parser("siem", help="export registry events for Splunk, Sentinel, or Elastic")
+ xsub = x.add_subparsers(dest="siem_cmd", required=True)
+ xe = xsub.add_parser("export", help="emit events from the registry SQLite db")
+ xe.add_argument("--db", required=True, help="path to registry.sqlite")
+ xe.add_argument("--format", choices=("splunk", "ecs", "sentinel"), required=True)
+ xe.add_argument("--registry-id", required=True,
+ help="registry identifier (typically its ed25519 public key hex)")
+ xe.add_argument("--since", type=int, default=None,
+ help="only emit events with timestamp >= unix epoch seconds")
+ xe.add_argument("--limit", type=int, default=None)
+ xe.add_argument("--output", default="-",
+ help="'-' for stdout, a file path, or http(s):// URL for HTTP POST")
+ xe.add_argument("--header", action="append", default=[],
+ help="HTTP header in 'Key: Value' form, repeatable")
+ xe.add_argument("--splunk-index", default=None)
+ xe.add_argument("--splunk-source", default="oversight:registry")
+ xe.add_argument("--splunk-sourcetype", default="oversight:beacon")
+
args = p.parse_args()
try:
@@ -505,6 +570,7 @@ def main():
"open": cmd_open,
"inspect": cmd_inspect,
"attribute": cmd_attribute,
+ "siem": cmd_siem,
}[args.cmd](args)
except (ValueError, FileExistsError, OSError, json.JSONDecodeError) as exc:
raise SystemExit(f"[!] {exc}") from exc
docs/ROADMAP.md +1 -1
@@ -7,7 +7,7 @@ The launch plan is now gated on product usability and threat-model honesty:
1. **L3 safety fixes and collusion docs** - shipped in v0.4.5: L3 defaults off for wording-sensitive document classes, requires explicit disclosure when enabled, records `canonical_content_hash`, and supports a boilerplate-only mode.
2. **Web viewer / drag-drop share UI** - next website/product milestone. Do not launch broadly on HN/Reddit until non-technical recipients can open and inspect Oversight files without the CLI.
3. **Outlook add-in only** for the first ecosystem integration. Defer Drive, Box, SharePoint, and Teams plugins until there is a maintainer or design partner paying for them.
-4. **SIEM integration before SOC 2**: prioritize Splunk HEC, Microsoft Sentinel, and Elastic Common Schema exports because they are fast and high enterprise ROI.
+4. **SIEM integration before SOC 2**: prioritize Splunk HEC, Microsoft Sentinel, and Elastic Common Schema exports because they are fast and high enterprise ROI. *Formatters, the `oversight siem export` CLI, and the operator guide shipped in v0.4.6; see `docs/SIEM.md`.*
5. **SOC 2 Type 1 scoping** is realistic after a design partner. ISO 27001 comes after SOC 2. **FedRAMP is dropped from near-term planning**; it is a multi-year commercial program requiring sponsor-agency backing.
6. **Registry federation**: publish and harden `docs/spec/registry-v1.md` during the Rust Axum/SQLx registry work so a second operator can run a compatible registry.
docs/SIEM.md +150 -0
@@ -0,0 +1,150 @@
+# SIEM integration
+
+Oversight registries record every beacon callback (DNS, HTTP pixel, OCSP,
+license check) in the local SQLite `events` table and append a signed entry
+to the transparency log. Security teams running Splunk, Microsoft Sentinel,
+or an Elastic Common Schema stack want that data in the same pipeline as
+the rest of their telemetry. The `oversight_core.siem` module and the
+`oversight siem export` CLI handle the formatting and the minimal transport
+that gets events from the registry to the SIEM.
+
+The module is pure Python and stdlib-only for the formatters. HTTP
+transport reuses the `httpx` client already in the dependency set. No SIEM
+vendor SDK is required, and no vendor-specific credential lives in the
+Oversight process unless the operator configures one.
+
+## Event shape
+
+One normalized record per row of the `events` table. The registry
+identifier is typically the registry's own Ed25519 public key hex so
+federated operators are distinguishable in SIEM dashboards.
+
+| Field | Source column |
+|-----------------------|--------------------------|
+| `event_id` | `events.id` |
+| `event_kind` | `events.kind` (`dns`, `http_img`, `ocsp`, `license`) |
+| `occurred_unix` | `events.timestamp` |
+| `occurred_at` | derived RFC 3339 UTC |
+| `registry_id` | caller-supplied |
+| `token_id` | `events.token_id` |
+| `file_id` | `events.file_id` |
+| `recipient_id` | `events.recipient_id` |
+| `issuer_id` | `events.issuer_id` |
+| `source_ip` | `events.source_ip` |
+| `user_agent` | `events.user_agent` |
+| `qualified_timestamp` | `events.qualified_timestamp` (RFC 3161) |
+| `tlog_index` | `events.tlog_index` |
+| `extra` | `events.extra` (JSON) |
+
+## CLI
+
+```
+oversight siem export \
+ --db /var/lib/oversight/registry.sqlite \
+ --format splunk|ecs|sentinel \
+ --registry-id <ed25519_pub hex or short id> \
+ [--since <unix_ts>] \
+ [--limit N] \
+ [--output -|/path/to/file.jsonl|https://collector.example/endpoint] \
+ [--header 'Authorization: Splunk <hec-token>']
+```
+
+The default output is `-` (stdout, JSON lines). Forwarders like the Splunk
+Universal Forwarder, Azure Monitor Agent, or Filebeat can tail the file
+output directly; no Oversight-side credential is required. When the
+`--output` is an HTTP URL, the CLI POSTs a JSON array and fails loudly on
+non-2xx so a backoff wrapper can retry.
+
+### Splunk HTTP Event Collector
+
+Deploy the events over HEC:
+
+```
+oversight siem export --db registry.sqlite --registry-id $REG \
+ --format splunk \
+ --output https://splunk.example:8088/services/collector/event \
+ --header 'Authorization: Splunk 00000000-0000-0000-0000-000000000000'
+```
+
+`source` and `sourcetype` default to `oversight:registry` and
+`oversight:beacon`. Override with `--splunk-source`, `--splunk-sourcetype`,
+and `--splunk-index` to match your deployment's field extraction.
+
+### Microsoft Sentinel (Log Analytics Data Collector API)
+
+The Data Collector API requires an HMAC-SHA256 `Authorization` header.
+`oversight_core.siem.sentinel_authorization` computes it; the CLI does not
+yet sign requests on your behalf because the signing window depends on the
+RFC 1123 `x-ms-date` header, which must match the body length exactly.
+For production, write the records to a file and have Azure Monitor Agent
+pick them up, or wrap the signing in a small adapter:
+
+```python
+from oversight_core.siem import (
+ iter_registry_events, export_events, HTTPJSONSink, sentinel_authorization,
+)
+from datetime import datetime, timezone
+import json
+
+events = list(iter_registry_events("registry.sqlite", registry_id=REG))
+# Pre-format so we know the content length.
+batch = [e for e in events] # ... format via to_sentinel
+body = json.dumps([...]).encode("utf-8")
+date = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT")
+auth = sentinel_authorization(
+ workspace_id=WORKSPACE_ID,
+ shared_key_b64=SHARED_KEY,
+ content_length=len(body),
+ date_rfc1123=date,
+)
+sink = HTTPJSONSink(
+ f"https://{WORKSPACE_ID}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01",
+ headers={
+ "Authorization": auth,
+ "Log-Type": "Oversight",
+ "x-ms-date": date,
+ "time-generated-field": "TimeGenerated",
+ },
+)
+```
+
+The KQL-friendly custom log name is `Oversight_CL` after Sentinel ingests
+the first batch. Each beacon kind surfaces as a value of the `BeaconKind`
+column, so a single `Oversight_CL | where BeaconKind == "dns"` query pulls
+every DNS callback. Joins against your existing identity tables key off
+`RecipientId` or `IssuerId`.
+
+### Elastic Common Schema
+
+ECS 8.x-compatible records are ready to index into Elasticsearch or ship
+through Filebeat. The schema sets `event.module = "oversight"` and
+`event.dataset = "oversight.beacon"` so the Elastic Security app renders
+the events without extra mapping work.
+
+```
+oversight siem export --db registry.sqlite --registry-id $REG \
+ --format ecs \
+ --output /var/log/oversight/events.ndjson
+```
+
+Point Filebeat at the file with the `ndjson` parser and a `fields_under_root: true`
+processor that promotes the embedded `@timestamp`. The custom namespace at
+`oversight.*` preserves the protocol-native fields for runtime fields or
+Lens visualizations.
+
+## Honest limits
+
+Absence of a beacon is not evidence of no leak. Corporate egress filtering,
+air-gapped readers, and sandboxed previews suppress beacon callbacks.
+Oversight records what it sees; SIEM alerting on the absence of beacons
+needs a baseline and an explicit policy, not just the event stream.
+`docs/security.md` and the research threat model cover the details.
+
+## Fields you may want to rename on your side
+
+- `token_id` is the public beacon token, not an authentication token; renaming
+ to `beacon_token` in your dashboards avoids confusion with OAuth scopes.
+- `file_id` is the Oversight-internal content identifier, not a hash. Map to
+ your DLP system's `document_id` or equivalent.
+- `recipient_id` and `issuer_id` map to whatever identity scheme the Oversight
+ deployment uses (email, SSO subject, Ed25519 fingerprint).
oversight_core/__init__.py +1 -1
@@ -31,4 +31,4 @@ __all__ = [
"l3_policy",
]
-__version__ = "0.4.5"
+__version__ = "0.4.6"
oversight_core/siem.py +433 -0
@@ -0,0 +1,433 @@
+"""SIEM export formatters for Oversight registry events.
+
+Oversight records beacon callbacks (DNS, HTTP pixel, OCSP, license) in the
+registry's ``events`` table. Security teams need those events in whichever
+incident pipeline they already run: Splunk, Microsoft Sentinel, or an
+Elastic stack following the Elastic Common Schema. This module provides
+schema-stable formatters for each of the three, a normalized event model,
+and minimal file/HTTP sinks so operators can stream live or stage to a
+forwarder.
+
+Formatters are pure. They do not perform network I/O and they do not
+access the database. Transport lives in the sink classes and is
+optional. The default workflow is to emit JSON lines and let an existing
+site forwarder (Splunk Universal Forwarder, Azure Monitor Agent,
+Filebeat) deliver them, so Oversight does not need to carry SIEM
+credentials in the default deployment.
+
+Event semantics match the registry ``events`` table exactly. See
+``docs/SIEM.md`` for the field dictionary, the Sentinel HMAC signing
+recipe, and example Splunk / Elastic dashboards.
+"""
+
+from __future__ import annotations
+
+import base64
+import hashlib
+import hmac
+import json
+import sqlite3
+import time
+from dataclasses import dataclass, field, asdict
+from datetime import datetime, timezone
+from typing import Any, Iterable, Iterator, Mapping, Optional
+
+
+ECS_VERSION = "8.11.0"
+SCHEMA_VERSION = "oversight-siem-1"
+
+BEACON_KINDS = {"dns", "http_img", "ocsp", "license"}
+ACTION_BY_KIND = {
+ "dns": "beacon-dns-callback",
+ "http_img": "beacon-http-pixel",
+ "ocsp": "beacon-ocsp-callback",
+ "license": "beacon-license-check",
+}
+
+
+def iso8601(unix_ts: int | float) -> str:
+ """RFC 3339 UTC timestamp to second precision, suitable for ECS ``@timestamp``."""
+ return datetime.fromtimestamp(float(unix_ts), tz=timezone.utc).strftime(
+ "%Y-%m-%dT%H:%M:%SZ"
+ )
+
+
+@dataclass
+class OversightEvent:
+ """Normalized Oversight event, one row of the registry ``events`` table.
+
+ ``registry_id`` is the registry's ed25519 public key hex (or a short
+ fingerprint thereof), not an operator-chosen hostname. SIEM consumers
+ use it to tell federated registries apart.
+ """
+
+ event_id: str
+ event_kind: str
+ occurred_unix: int
+ occurred_at: str
+ registry_id: str
+ token_id: Optional[str] = None
+ file_id: Optional[str] = None
+ recipient_id: Optional[str] = None
+ issuer_id: Optional[str] = None
+ source_ip: Optional[str] = None
+ user_agent: Optional[str] = None
+ qualified_timestamp: Optional[str] = None
+ tlog_index: Optional[int] = None
+ extra: dict = field(default_factory=dict)
+
+ def to_dict(self) -> dict:
+ return asdict(self)
+
+
+def from_registry_row(
+ row: Mapping[str, Any] | sqlite3.Row,
+ *,
+ registry_id: str,
+) -> OversightEvent:
+ """Map a ``SELECT * FROM events`` row into an :class:`OversightEvent`."""
+ if isinstance(row, sqlite3.Row):
+ d = {k: row[k] for k in row.keys()}
+ else:
+ d = dict(row)
+
+ extra_raw = d.get("extra") or ""
+ try:
+ extra = json.loads(extra_raw) if extra_raw else {}
+ except (TypeError, json.JSONDecodeError):
+ extra = {"raw": extra_raw}
+
+ occurred_unix = int(d.get("timestamp") or 0)
+ return OversightEvent(
+ event_id=str(d.get("id", "")),
+ event_kind=str(d.get("kind") or ""),
+ occurred_unix=occurred_unix,
+ occurred_at=iso8601(occurred_unix) if occurred_unix else "",
+ registry_id=registry_id,
+ token_id=d.get("token_id"),
+ file_id=d.get("file_id"),
+ recipient_id=d.get("recipient_id"),
+ issuer_id=d.get("issuer_id"),
+ source_ip=d.get("source_ip"),
+ user_agent=d.get("user_agent") or None,
+ qualified_timestamp=d.get("qualified_timestamp"),
+ tlog_index=d.get("tlog_index") if d.get("tlog_index") is not None else None,
+ extra=extra if isinstance(extra, dict) else {"raw": extra},
+ )
+
+
+def _clean(d: dict) -> dict:
+ """Drop keys whose values are ``None`` or empty string."""
+ return {k: v for k, v in d.items() if v not in (None, "")}
+
+
+def to_splunk_hec(
+ evt: OversightEvent,
+ *,
+ source: str = "oversight:registry",
+ sourcetype: str = "oversight:beacon",
+ index: Optional[str] = None,
+ host: Optional[str] = None,
+) -> dict:
+ """Format an event as a Splunk HTTP Event Collector envelope.
+
+ Posted one-per-line (JSONL) or one-per-request against
+ ``/services/collector/event``. ``time`` is epoch seconds as a float,
+ which Splunk accepts natively.
+ """
+ envelope: dict = {
+ "time": float(evt.occurred_unix),
+ "host": host or evt.registry_id,
+ "source": source,
+ "sourcetype": sourcetype,
+ "event": _clean({
+ "schema": SCHEMA_VERSION,
+ "kind": evt.event_kind,
+ "action": ACTION_BY_KIND.get(evt.event_kind, f"beacon-{evt.event_kind}"),
+ "event_id": evt.event_id,
+ "occurred_at": evt.occurred_at,
+ "token_id": evt.token_id,
+ "file_id": evt.file_id,
+ "recipient_id": evt.recipient_id,
+ "issuer_id": evt.issuer_id,
+ "source_ip": evt.source_ip,
+ "user_agent": evt.user_agent,
+ "qualified_timestamp": evt.qualified_timestamp,
+ "tlog_index": evt.tlog_index,
+ "registry_id": evt.registry_id,
+ "extra": evt.extra or None,
+ }),
+ "fields": _clean({
+ "file_id": evt.file_id,
+ "recipient_id": evt.recipient_id,
+ "issuer_id": evt.issuer_id,
+ "beacon_kind": evt.event_kind,
+ }),
+ }
+ if index:
+ envelope["index"] = index
+ return envelope
+
+
+def to_ecs(evt: OversightEvent) -> dict:
+ """Format an event as Elastic Common Schema 8.x.
+
+ The custom ``oversight.*`` namespace carries protocol-native fields
+ that do not have a canonical ECS home. ECS reserves top-level
+ ``event.*``, ``source.*``, ``user_agent.*``, and ``labels.*`` for
+ the common cases so dashboards built on the Elastic Security app
+ light up without extra mapping work.
+ """
+ ecs_event = _clean({
+ "@timestamp": evt.occurred_at,
+ "ecs": {"version": ECS_VERSION},
+ "event": _clean({
+ "kind": "event",
+ "category": ["network"],
+ "type": ["access", "info"],
+ "dataset": "oversight.beacon",
+ "module": "oversight",
+ "provider": "oversight-registry",
+ "action": ACTION_BY_KIND.get(evt.event_kind, f"beacon-{evt.event_kind}"),
+ "id": evt.event_id,
+ "outcome": "success",
+ }),
+ "source": _clean({"ip": evt.source_ip}) or None,
+ "user_agent": _clean({"original": evt.user_agent}) or None,
+ "labels": _clean({
+ "oversight_token_id": evt.token_id,
+ "oversight_file_id": evt.file_id,
+ "oversight_recipient_id": evt.recipient_id,
+ "oversight_issuer_id": evt.issuer_id,
+ "oversight_beacon_kind": evt.event_kind,
+ }),
+ "oversight": _clean({
+ "schema": SCHEMA_VERSION,
+ "registry_id": evt.registry_id,
+ "token_id": evt.token_id,
+ "file_id": evt.file_id,
+ "recipient_id": evt.recipient_id,
+ "issuer_id": evt.issuer_id,
+ "beacon_kind": evt.event_kind,
+ "tlog_index": evt.tlog_index,
+ "qualified_timestamp": evt.qualified_timestamp,
+ "extra": evt.extra or None,
+ }),
+ })
+ return ecs_event
+
+
+def to_sentinel(evt: OversightEvent) -> dict:
+ """Format an event for Microsoft Sentinel's Log Analytics custom logs.
+
+ Sentinel's Data Collector API accepts flat JSON objects. Nested
+ structures are allowed but become dynamic columns that are harder
+ to KQL against, so the flat shape is the operator-friendly default.
+ """
+ return _clean({
+ "TimeGenerated": evt.occurred_at,
+ "Schema": SCHEMA_VERSION,
+ "RegistryId": evt.registry_id,
+ "EventId": evt.event_id,
+ "BeaconKind": evt.event_kind,
+ "Action": ACTION_BY_KIND.get(evt.event_kind, f"beacon-{evt.event_kind}"),
+ "TokenId": evt.token_id,
+ "FileId": evt.file_id,
+ "RecipientId": evt.recipient_id,
+ "IssuerId": evt.issuer_id,
+ "SourceIp": evt.source_ip,
+ "UserAgent": evt.user_agent,
+ "QualifiedTimestamp": evt.qualified_timestamp,
+ "TlogIndex": evt.tlog_index,
+ "ExtraJson": json.dumps(evt.extra, separators=(",", ":")) if evt.extra else None,
+ })
+
+
+FORMATTERS = {
+ "splunk": to_splunk_hec,
+ "ecs": to_ecs,
+ "sentinel": to_sentinel,
+}
+
+
+def format_event(evt: OversightEvent, fmt: str, **kwargs) -> dict:
+ if fmt not in FORMATTERS:
+ raise ValueError(f"unknown SIEM format: {fmt!r} (choices: {sorted(FORMATTERS)})")
+ return FORMATTERS[fmt](evt, **kwargs) if fmt == "splunk" else FORMATTERS[fmt](evt)
+
+
+# ---- Microsoft Sentinel HMAC signing ----------------------------------------
+
+
+def sentinel_authorization(
+ *,
+ workspace_id: str,
+ shared_key_b64: str,
+ content_length: int,
+ date_rfc1123: str,
+ method: str = "POST",
+ content_type: str = "application/json",
+ resource: str = "/api/logs",
+) -> str:
+ """Build the ``Authorization`` header for the Sentinel Data Collector API.
+
+ The signing recipe follows Microsoft's current documentation for the
+ Log Analytics HTTP Data Collector API. Callers supply an RFC 1123
+ ``x-ms-date`` value and Content-Length; this helper hashes the
+ canonical string and returns ``SharedKey {workspace_id}:{base64_hmac}``
+ ready to drop into ``Authorization``.
+ """
+ string_to_hash = (
+ f"{method}\n{content_length}\n{content_type}\nx-ms-date:{date_rfc1123}\n{resource}"
+ )
+ decoded_key = base64.b64decode(shared_key_b64)
+ digest = hmac.new(
+ decoded_key, string_to_hash.encode("utf-8"), hashlib.sha256
+ ).digest()
+ encoded_hash = base64.b64encode(digest).decode("utf-8")
+ return f"SharedKey {workspace_id}:{encoded_hash}"
+
+
+# ---- Sinks -------------------------------------------------------------------
+
+
+class Sink:
+ """Interface: ``send(records: Iterable[dict]) -> int`` returns emitted count."""
+
+ def send(self, records: Iterable[dict]) -> int: # pragma: no cover - abstract
+ raise NotImplementedError
+
+ def close(self) -> None: # pragma: no cover - default no-op
+ pass
+
+
+class FileSink(Sink):
+ """Append JSON lines to a file. Safe for log-rotation forwarders."""
+
+ def __init__(self, path: str, *, mode: str = "a"):
+ if mode not in ("a", "w"):
+ raise ValueError("FileSink mode must be 'a' or 'w'")
+ self.path = path
+ self._fh = open(path, mode, encoding="utf-8")
+
+ def send(self, records: Iterable[dict]) -> int:
+ n = 0
+ for rec in records:
+ self._fh.write(json.dumps(rec, separators=(",", ":")) + "\n")
+ n += 1
+ self._fh.flush()
+ return n
+
+ def close(self) -> None:
+ try:
+ self._fh.close()
+ except Exception:
+ pass
+
+
+class StdoutSink(Sink):
+ """JSON lines to stdout. Useful for piping into a forwarder."""
+
+ def __init__(self):
+ import sys
+ self._out = sys.stdout
+
+ def send(self, records: Iterable[dict]) -> int:
+ n = 0
+ for rec in records:
+ self._out.write(json.dumps(rec, separators=(",", ":")) + "\n")
+ n += 1
+ self._out.flush()
+ return n
+
+
+class HTTPJSONSink(Sink):
+ """POST records as a JSON array to a generic HTTP endpoint.
+
+ Covers Splunk HEC (``Authorization: Splunk <token>``), Elastic
+ ``_bulk`` when callers pre-format the payload, and any in-house
+ HTTP collector. This sink does not retry; callers retry.
+ """
+
+ def __init__(
+ self,
+ url: str,
+ *,
+ headers: Optional[Mapping[str, str]] = None,
+ timeout: float = 10.0,
+ verify: bool = True,
+ ):
+ import httpx
+ self._client = httpx.Client(timeout=timeout, verify=verify)
+ self.url = url
+ self._headers = dict(headers or {})
+
+ def send(self, records: Iterable[dict]) -> int:
+ batch = list(records)
+ if not batch:
+ return 0
+ resp = self._client.post(self.url, json=batch, headers=self._headers)
+ resp.raise_for_status()
+ return len(batch)
+
+ def close(self) -> None:
+ try:
+ self._client.close()
+ except Exception:
+ pass
+
+
+# ---- DB iteration ------------------------------------------------------------
+
+
+def iter_registry_events(
+ db_path: str,
+ *,
+ since_unix: Optional[int] = None,
+ limit: Optional[int] = None,
+ registry_id: str,
+) -> Iterator[OversightEvent]:
+ """Yield :class:`OversightEvent` records from a registry SQLite database.
+
+ Read-only; opens the DB with ``PRAGMA query_only=ON`` so an operator
+ can safely run this against a live registry. The caller is responsible
+ for passing the registry's own public identifier (typically
+ ``IDENTITY['ed25519_pub']``).
+ """
+ con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
+ try:
+ con.row_factory = sqlite3.Row
+ con.execute("PRAGMA query_only=ON")
+ sql = "SELECT * FROM events"
+ params: list = []
+ if since_unix is not None:
+ sql += " WHERE timestamp >= ?"
+ params.append(int(since_unix))
+ sql += " ORDER BY timestamp ASC, id ASC"
+ if limit is not None:
+ sql += " LIMIT ?"
+ params.append(int(limit))
+ for row in con.execute(sql, params):
+ yield from_registry_row(row, registry_id=registry_id)
+ finally:
+ con.close()
+
+
+def export_events(
+ *,
+ events: Iterable[OversightEvent],
+ fmt: str,
+ sink: Sink,
+ splunk_kwargs: Optional[dict] = None,
+) -> int:
+ """Format and push events through a sink. Returns emitted count."""
+ if fmt not in FORMATTERS:
+ raise ValueError(f"unknown SIEM format: {fmt!r}")
+ splunk_kwargs = splunk_kwargs or {}
+ def _gen():
+ for evt in events:
+ if fmt == "splunk":
+ yield to_splunk_hec(evt, **splunk_kwargs)
+ else:
+ yield FORMATTERS[fmt](evt)
+ return sink.send(_gen())
pyproject.toml +1 -1
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "oversight-protocol"
-version = "0.4.5"
+version = "0.4.6"
description = "Open protocol for cryptographic data provenance, recipient attribution, and leak detection."
readme = "README.md"
license = {text = "Apache-2.0"}
tests/test_siem_unit.py +248 -0
@@ -0,0 +1,248 @@
+#!/usr/bin/env python3
+"""Focused tests for the SIEM export formatters and registry-row mapping."""
+
+import base64
+import io
+import json
+import os
+import sqlite3
+import sys
+import tempfile
+import time
+
+ROOT = os.path.join(os.path.dirname(__file__), "..")
+sys.path.insert(0, ROOT)
+
+from oversight_core import siem
+
+
+REGISTRY_ID = "deadbeef" * 8
+
+
+def ok(msg: str) -> None:
+ print(f" [PASS] {msg}")
+
+
+def _sample_event(**overrides) -> siem.OversightEvent:
+ base = dict(
+ event_id="42",
+ event_kind="dns",
+ occurred_unix=1_735_000_000,
+ occurred_at=siem.iso8601(1_735_000_000),
+ registry_id=REGISTRY_ID,
+ token_id="tok_abc",
+ file_id="file_xyz",
+ recipient_id="rcpt_alice",
+ issuer_id="issuer_zion",
+ source_ip="198.51.100.42",
+ user_agent="Mozilla/5.0",
+ qualified_timestamp="2024-12-24T01:06:40Z",
+ tlog_index=7,
+ extra={"qname": "abc.t.example.com", "qtype": "A"},
+ )
+ base.update(overrides)
+ return siem.OversightEvent(**base)
+
+
+def t1_splunk_envelope_carries_time_host_event_and_fields():
+ evt = _sample_event()
+ out = siem.to_splunk_hec(evt, source="s", sourcetype="st", index="main", host="h")
+
+ assert out["time"] == 1_735_000_000.0
+ assert out["host"] == "h"
+ assert out["source"] == "s"
+ assert out["sourcetype"] == "st"
+ assert out["index"] == "main"
+ assert out["event"]["kind"] == "dns"
+ assert out["event"]["action"] == "beacon-dns-callback"
+ assert out["event"]["token_id"] == "tok_abc"
+ assert out["event"]["tlog_index"] == 7
+ assert out["fields"]["file_id"] == "file_xyz"
+ assert out["fields"]["beacon_kind"] == "dns"
+ ok("Splunk HEC envelope carries time, host, event, and fields")
+
+
+def t2_splunk_drops_empty_optional_fields():
+ evt = _sample_event(user_agent=None, source_ip=None, qualified_timestamp=None)
+ out = siem.to_splunk_hec(evt)
+ assert "user_agent" not in out["event"]
+ assert "source_ip" not in out["event"]
+ assert "qualified_timestamp" not in out["event"]
+ ok("Splunk envelope omits None optionals rather than emitting null")
+
+
+def t3_ecs_document_has_canonical_fields():
+ evt = _sample_event()
+ out = siem.to_ecs(evt)
+ assert out["@timestamp"] == siem.iso8601(1_735_000_000)
+ assert out["ecs"]["version"] == siem.ECS_VERSION
+ assert out["event"]["kind"] == "event"
+ assert "network" in out["event"]["category"]
+ assert out["event"]["dataset"] == "oversight.beacon"
+ assert out["event"]["action"] == "beacon-dns-callback"
+ assert out["source"]["ip"] == "198.51.100.42"
+ assert out["user_agent"]["original"] == "Mozilla/5.0"
+ assert out["labels"]["oversight_token_id"] == "tok_abc"
+ assert out["oversight"]["registry_id"] == REGISTRY_ID
+ assert out["oversight"]["tlog_index"] == 7
+ ok("ECS record carries @timestamp, event.*, source.ip, user_agent.*, oversight.*")
+
+
+def t4_ecs_ua_and_source_absent_when_empty():
+ evt = _sample_event(user_agent=None, source_ip=None)
+ out = siem.to_ecs(evt)
+ assert "source" not in out
+ assert "user_agent" not in out
+ ok("ECS record drops empty source/user_agent blocks entirely")
+
+
+def t5_sentinel_flat_row_kql_friendly():
+ evt = _sample_event()
+ out = siem.to_sentinel(evt)
+ assert out["TimeGenerated"] == siem.iso8601(1_735_000_000)
+ assert out["BeaconKind"] == "dns"
+ assert out["TokenId"] == "tok_abc"
+ assert out["SourceIp"] == "198.51.100.42"
+ assert out["TlogIndex"] == 7
+ assert json.loads(out["ExtraJson"])["qname"] == "abc.t.example.com"
+ assert "ExtraJson" in out
+ assert all(not k.startswith("@") for k in out)
+ ok("Sentinel row is flat, KQL-friendly, with JSON-serialized extras")
+
+
+def t6_from_registry_row_reads_sqlite_row():
+ tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
+ tmp.close()
+ try:
+ con = sqlite3.connect(tmp.name)
+ con.row_factory = sqlite3.Row
+ con.executescript(
+ """
+ CREATE TABLE events (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ token_id TEXT NOT NULL,
+ file_id TEXT,
+ recipient_id TEXT,
+ issuer_id TEXT,
+ kind TEXT NOT NULL,
+ source_ip TEXT,
+ user_agent TEXT,
+ extra TEXT,
+ timestamp INTEGER NOT NULL,
+ qualified_timestamp TEXT,
+ tlog_index INTEGER
+ );
+ """
+ )
+ con.execute(
+ "INSERT INTO events (token_id,file_id,recipient_id,issuer_id,kind,"
+ "source_ip,user_agent,extra,timestamp,qualified_timestamp,tlog_index) "
+ "VALUES (?,?,?,?,?,?,?,?,?,?,?)",
+ ("tok", "file", "rcpt", "iss", "dns",
+ "203.0.113.9", "curl/8", json.dumps({"qtype": "A"}),
+ 1_735_000_000, "2024-12-24T01:06:40Z", 11),
+ )
+ con.commit()
+
+ row = con.execute("SELECT * FROM events WHERE id=1").fetchone()
+ evt = siem.from_registry_row(row, registry_id=REGISTRY_ID)
+ con.close()
+
+ assert evt.event_kind == "dns"
+ assert evt.token_id == "tok"
+ assert evt.source_ip == "203.0.113.9"
+ assert evt.tlog_index == 11
+ assert evt.extra == {"qtype": "A"}
+ ok("from_registry_row reads a live SQLite row into OversightEvent")
+
+ # iter_registry_events in read-only mode.
+ events = list(siem.iter_registry_events(tmp.name, registry_id=REGISTRY_ID))
+ assert len(events) == 1
+ assert events[0].token_id == "tok"
+ ok("iter_registry_events opens the db read-only and yields rows")
+ finally:
+ os.unlink(tmp.name)
+
+
+def t7_sentinel_authorization_matches_microsoft_recipe():
+ # Known-value check: fixed inputs, recompute and confirm stability.
+ workspace = "00000000-0000-0000-0000-000000000001"
+ key_bytes = b"\x01" * 32
+ shared_key_b64 = base64.b64encode(key_bytes).decode("utf-8")
+ date = "Mon, 22 Apr 2026 12:00:00 GMT"
+ body_len = 1234
+
+ header1 = siem.sentinel_authorization(
+ workspace_id=workspace,
+ shared_key_b64=shared_key_b64,
+ content_length=body_len,
+ date_rfc1123=date,
+ )
+ header2 = siem.sentinel_authorization(
+ workspace_id=workspace,
+ shared_key_b64=shared_key_b64,
+ content_length=body_len,
+ date_rfc1123=date,
+ )
+ assert header1 == header2
+ assert header1.startswith(f"SharedKey {workspace}:")
+ assert len(header1.split(":")[-1]) >= 40
+ ok("Sentinel Authorization header is deterministic and correctly prefixed")
+
+
+def t8_filesink_and_stdoutsink_write_jsonl():
+ evts = [_sample_event(event_id=str(i)) for i in range(3)]
+ tmp = tempfile.NamedTemporaryFile(suffix=".jsonl", delete=False)
+ tmp.close()
+ try:
+ sink = siem.FileSink(tmp.name, mode="w")
+ try:
+ n = siem.export_events(events=iter(evts), fmt="ecs", sink=sink)
+ finally:
+ sink.close()
+ assert n == 3
+ with open(tmp.name) as f:
+ lines = [json.loads(l) for l in f if l.strip()]
+ assert len(lines) == 3
+ assert lines[0]["event"]["action"] == "beacon-dns-callback"
+ ok("FileSink persists one JSON line per event")
+ finally:
+ os.unlink(tmp.name)
+
+
+def t9_unknown_format_raises():
+ try:
+ siem.format_event(_sample_event(), "wazuh")
+ except ValueError as e:
+ assert "wazuh" in str(e)
+ ok("format_event rejects unknown SIEM names")
+ return
+ raise AssertionError("expected ValueError for unknown SIEM format")
+
+
+def t10_action_names_cover_all_beacon_kinds():
+ for k in ("dns", "http_img", "ocsp", "license"):
+ evt = _sample_event(event_kind=k)
+ assert siem.to_splunk_hec(evt)["event"]["action"].startswith("beacon-")
+ assert siem.to_ecs(evt)["event"]["action"].startswith("beacon-")
+ assert siem.to_sentinel(evt)["Action"].startswith("beacon-")
+ ok("every known beacon kind maps to a stable action name")
+
+
+def run():
+ print("[*] test_siem_unit.py")
+ t1_splunk_envelope_carries_time_host_event_and_fields()
+ t2_splunk_drops_empty_optional_fields()
+ t3_ecs_document_has_canonical_fields()
+ t4_ecs_ua_and_source_absent_when_empty()
+ t5_sentinel_flat_row_kql_friendly()
+ t6_from_registry_row_reads_sqlite_row()
+ t7_sentinel_authorization_matches_microsoft_recipe()
+ t8_filesink_and_stdoutsink_write_jsonl()
+ t9_unknown_format_raises()
+ t10_action_names_cover_all_beacon_kinds()
+ print("[ok] all SIEM unit tests passed")
+
+
+if __name__ == "__main__":
+ run()