Zion Boggan
repos/Oversight/oversight_core/manifest.py
zionboggan.com ↗
198 lines · python
History for this file →
1
"""
2
oversight_core.manifest
3
======================
4
 
5
The manifest is the signed metadata that binds a sealed file to its recipient,
6
its watermarks, its beacons, and its policy. It's the artifact a registry stores
7
and a verifier checks.
8
 
9
Wire format (v1): canonical JSON (sorted keys, no whitespace), UTF-8, Ed25519-signed.
10
Post-quantum: ML-DSA signature slot reserved in the envelope.
11
"""
12
 
13
from __future__ import annotations
14
 
15
import json
16
import time
17
import uuid
18
from dataclasses import dataclass, field, asdict, fields
19
from typing import Optional
20
 
21
from .crypto import sign_manifest, verify_manifest, SUITE_CLASSIC_V1
22
from .jcs import jcs_dumps
23
 
24
 
25
@dataclass
26
class Recipient:
27
    recipient_id: str
28
    x25519_pub: str
29
    ed25519_pub: Optional[str] = None
30
    p256_pub: Optional[str] = None
31
 
32
 
33
@dataclass
34
class WatermarkRef:
35
    layer: str
36
    mark_id: str
37
 
38
 
39
@dataclass
40
class Manifest:
41
    file_id: str
42
    issued_at: int
43
    version: str = "OVERSIGHT-v1"
44
    suite: str = SUITE_CLASSIC_V1
45
 
46
    original_filename: str = ""
47
    content_hash: str = ""
48
    canonical_content_hash: str = ""
49
    content_type: str = "application/octet-stream"
50
    size_bytes: int = 0
51
 
52
    issuer_id: str = ""
53
    issuer_ed25519_pub: str = ""
54
 
55
    recipient: Optional[Recipient] = None
56
 
57
    watermarks: list[WatermarkRef] = field(default_factory=list)
58
    beacons: list[dict] = field(default_factory=list)
59
 
60
    policy: dict = field(default_factory=dict)
61
    l3_policy: dict = field(default_factory=dict)
62
 
63
    signature_ed25519: str = ""
64
    signature_ml_dsa: str = ""
65
 
66
 
67
    @classmethod
68
    def new(
69
        cls,
70
        original_filename: str,
71
        content_hash: str,
72
        size_bytes: int,
73
        issuer_id: str,
74
        issuer_ed25519_pub_hex: str,
75
        recipient: Recipient,
76
        registry_url: str,
77
        content_type: str = "application/octet-stream",
78
        not_after: Optional[int] = None,
79
        max_opens: Optional[int] = None,
80
        jurisdiction: str = "GLOBAL",
81
    ) -> "Manifest":
82
        policy = {
83
            "registry_url": registry_url,
84
            "jurisdiction": jurisdiction,
85
        }
86
        if not_after:
87
            policy["not_after"] = not_after
88
        if max_opens:
89
            policy["max_opens"] = max_opens
90
 
91
        return cls(
92
            file_id=str(uuid.uuid4()),
93
            issued_at=int(time.time()),
94
            original_filename=original_filename,
95
            content_hash=content_hash,
96
            canonical_content_hash=content_hash,
97
            content_type=content_type,
98
            size_bytes=size_bytes,
99
            issuer_id=issuer_id,
100
            issuer_ed25519_pub=issuer_ed25519_pub_hex,
101
            recipient=recipient,
102
            policy=policy,
103
        )
104
 
105
 
106
    def to_dict(self, include_signatures: bool = True) -> dict:
107
        d = asdict(self)
108
        if not include_signatures:
109
            d["signature_ed25519"] = ""
110
            d["signature_ml_dsa"] = ""
111
        return d
112
 
113
    @staticmethod
114
    def _strip_none(obj):
115
        """Recursively drop None values from dicts.
116
 
117
        Canonical JSON for Oversight: omit null-valued fields rather than
118
        emit `"field": null`. Matches the Rust reference's `serde(skip_serializing_if)`
119
        and the broader industry convention (Sigstore et al.).
120
        """
121
        if isinstance(obj, dict):
122
            return {k: Manifest._strip_none(v) for k, v in obj.items() if v is not None}
123
        if isinstance(obj, list):
124
            return [Manifest._strip_none(x) for x in obj]
125
        return obj
126
 
127
    def canonical_bytes(self) -> bytes:
128
        """Canonical serialization excluding signatures (what we actually sign).
129
 
130
        Rules:
131
          - Exclude the two signature fields (replace with empty string sentinel).
132
          - Drop None-valued fields recursively.
133
          - RFC 8785 JCS: keys sorted by UTF-16 code unit, no whitespace,
134
            non-ASCII output as raw UTF-8. Byte-exact match with the Rust
135
            reference's ``serde_jcs::to_vec``.
136
        """
137
        d = self.to_dict(include_signatures=False)
138
        d = self._strip_none(d)
139
        return jcs_dumps(d)
140
 
141
    def to_json(self) -> bytes:
142
        d = self._strip_none(self.to_dict())
143
        return jcs_dumps(d)
144
 
145
    @classmethod
146
    def from_json(cls, data: bytes) -> "Manifest":
147
        try:
148
            d = json.loads(data.decode("utf-8"))
149
        except (UnicodeDecodeError, json.JSONDecodeError) as exc:
150
            raise ValueError("Malformed manifest JSON") from exc
151
        if not isinstance(d, dict):
152
            raise ValueError("Malformed manifest: expected JSON object")
153
 
154
        rec = d.pop("recipient", None)
155
        wms = d.pop("watermarks", [])
156
        allowed = {f.name for f in fields(cls)}
157
        unknown = sorted(set(d) - allowed)
158
        if unknown:
159
            raise ValueError(f"Unknown manifest field: {unknown[0]}")
160
        try:
161
            m = cls(**d)
162
            if rec:
163
                if not isinstance(rec, dict):
164
                    raise ValueError("Malformed manifest recipient")
165
                rec_allowed = {f.name for f in fields(Recipient)}
166
                rec_unknown = sorted(set(rec) - rec_allowed)
167
                if rec_unknown:
168
                    raise ValueError(f"Unknown recipient field: {rec_unknown[0]}")
169
                m.recipient = Recipient(**rec)
170
            if not isinstance(wms, list):
171
                raise ValueError("Malformed manifest watermarks")
172
            wm_allowed = {f.name for f in fields(WatermarkRef)}
173
            watermarks = []
174
            for w in wms:
175
                if not isinstance(w, dict):
176
                    raise ValueError("Malformed manifest watermark")
177
                wm_unknown = sorted(set(w) - wm_allowed)
178
                if wm_unknown:
179
                    raise ValueError(f"Unknown watermark field: {wm_unknown[0]}")
180
                watermarks.append(WatermarkRef(**w))
181
            m.watermarks = watermarks
182
        except TypeError as exc:
183
            raise ValueError("Malformed manifest fields") from exc
184
        return m
185
 
186
 
187
    def sign(self, issuer_ed25519_priv: bytes) -> None:
188
        sig = sign_manifest(self.canonical_bytes(), issuer_ed25519_priv)
189
        self.signature_ed25519 = sig.hex()
190
 
191
    def verify(self) -> bool:
192
        if not self.signature_ed25519 or not self.issuer_ed25519_pub:
193
            return False
194
        return verify_manifest(
195
            self.canonical_bytes(),
196
            bytes.fromhex(self.signature_ed25519),
197
            bytes.fromhex(self.issuer_ed25519_pub),
198
        )