| 1 | """ |
| 2 | oversight_core.jcs |
| 3 | ================== |
| 4 | |
| 5 | JSON Canonicalization Scheme (RFC 8785) for Oversight. |
| 6 | |
| 7 | Byte-exact match with the Rust reference's ``serde_jcs::to_vec``. Every |
| 8 | canonical-bytes computation that gets hashed or signed in Oversight flows |
| 9 | through ``jcs_dumps``: manifest signing, transparency-log leaf payloads, |
| 10 | DSSE statement payloads, evidence bundles, and registry sidecar comparison. |
| 11 | |
| 12 | Vendored rather than pip-installed. Rationale: the canonicalization function |
| 13 | sits on the signing path of a cryptographic protocol, so every line must be |
| 14 | auditable in-tree, and the Oversight manifest schema carries no floats so we |
| 15 | implement only the RFC 8785 subset we need and reject floats explicitly rather |
| 16 | than silently producing a non-canonical float form. |
| 17 | """ |
| 18 | |
| 19 | from __future__ import annotations |
| 20 | |
| 21 | from typing import Any |
| 22 | |
| 23 | _SHORT_ESCAPES = { |
| 24 | 0x08: "\\b", |
| 25 | 0x09: "\\t", |
| 26 | 0x0A: "\\n", |
| 27 | 0x0C: "\\f", |
| 28 | 0x0D: "\\r", |
| 29 | } |
| 30 | |
| 31 | |
| 32 | def jcs_dumps(obj: Any) -> bytes: |
| 33 | """Canonicalize ``obj`` to RFC 8785 JSON bytes matching ``serde_jcs``. |
| 34 | |
| 35 | Accepts None, bool, int, str, list, tuple, dict. Floats and any other |
| 36 | type raise TypeError; Oversight manifests use only int and str for |
| 37 | numeric values, and silently emitting a non-canonical float form would |
| 38 | break cross-language signature agreement. |
| 39 | """ |
| 40 | parts: list[str] = [] |
| 41 | _serialize(obj, parts) |
| 42 | return "".join(parts).encode("utf-8") |
| 43 | |
| 44 | |
| 45 | def _serialize(obj: Any, parts: list[str]) -> None: |
| 46 | if obj is None: |
| 47 | parts.append("null") |
| 48 | elif obj is True: |
| 49 | parts.append("true") |
| 50 | elif obj is False: |
| 51 | parts.append("false") |
| 52 | elif isinstance(obj, int): |
| 53 | parts.append(str(obj)) |
| 54 | elif isinstance(obj, float): |
| 55 | raise TypeError( |
| 56 | "JCS: floats are unsupported; Oversight manifests store every " |
| 57 | "numeric value as int or string" |
| 58 | ) |
| 59 | elif isinstance(obj, str): |
| 60 | _serialize_str(obj, parts) |
| 61 | elif isinstance(obj, (list, tuple)): |
| 62 | parts.append("[") |
| 63 | for i, item in enumerate(obj): |
| 64 | if i: |
| 65 | parts.append(",") |
| 66 | _serialize(item, parts) |
| 67 | parts.append("]") |
| 68 | elif isinstance(obj, dict): |
| 69 | parts.append("{") |
| 70 | items = sorted(obj.items(), key=lambda kv: kv[0].encode("utf-16-be")) |
| 71 | for i, (k, v) in enumerate(items): |
| 72 | if not isinstance(k, str): |
| 73 | raise TypeError( |
| 74 | f"JCS: dict keys must be str, got {type(k).__name__}" |
| 75 | ) |
| 76 | if i: |
| 77 | parts.append(",") |
| 78 | _serialize_str(k, parts) |
| 79 | parts.append(":") |
| 80 | _serialize(v, parts) |
| 81 | parts.append("}") |
| 82 | else: |
| 83 | raise TypeError(f"JCS: unsupported type {type(obj).__name__}") |
| 84 | |
| 85 | |
| 86 | def _serialize_str(s: str, parts: list[str]) -> None: |
| 87 | parts.append('"') |
| 88 | for ch in s: |
| 89 | cp = ord(ch) |
| 90 | if cp == 0x22: |
| 91 | parts.append('\\"') |
| 92 | elif cp == 0x5C: |
| 93 | parts.append("\\\\") |
| 94 | elif cp < 0x20: |
| 95 | parts.append(_SHORT_ESCAPES.get(cp, f"\\u{cp:04x}")) |
| 96 | else: |
| 97 | parts.append(ch) |
| 98 | parts.append('"') |