Zion Boggan zionboggan.com ↗

Fix review findings for policy and Rekor verification

Co-authored-by: Codex (GPT-5.4) <noreply@openai.com>
b5533f7   Zion Boggan committed on Apr 20, 2026 (2 months ago)
oversight_core/container.py +7 -8
@@ -181,10 +181,10 @@ def open_sealed(
2. Verify manifest signature (Ed25519).
3. If trusted_issuer_pubs provided, verify issuer is in set.
4. Policy check (not_after, not_before, jurisdiction).
- 5. Atomically check-and-bump max_opens BEFORE any decryption.
- 6. Unwrap DEK (multi-recipient: try each slot).
- 7. AEAD decrypt with AAD = content_hash (binds ciphertext to manifest).
- 8. Post-decrypt SHA-256 check.
+ 5. Unwrap DEK (multi-recipient: try each slot).
+ 6. AEAD decrypt with AAD = content_hash (binds ciphertext to manifest).
+ 7. Post-decrypt SHA-256 check.
+ 8. Atomically check-and-bump max_opens after successful decryption.
"""
from .policy import check_policy, record_open
@@ -207,10 +207,6 @@ def open_sealed(
# Cheap, read-only policy checks (may raise PolicyViolation)
check_policy(sf.manifest, policy_ctx)
- # Atomically check-and-bump the open counter BEFORE any crypto work.
- # If max_opens is exceeded this raises PolicyViolation and we never decrypt.
- record_open(sf.manifest, policy_ctx)
-
# Recover DEK. For multi-recipient files, wrapped_dek contains a 'slots'
# list; we try each slot in turn. A "wrong key" exception is expected when
# trying non-matching slots; we only bail if NO slot decrypts.
@@ -238,6 +234,9 @@ def open_sealed(
if crypto.content_hash(plaintext) != sf.manifest.content_hash:
raise ValueError("Plaintext hash does not match manifest")
+ # Count only successful opens by an authenticated recipient.
+ record_open(sf.manifest, policy_ctx)
+
return plaintext, sf.manifest
oversight_core/policy.py +41 -7
@@ -40,10 +40,11 @@ class PolicyViolation(Exception):
@dataclass
class PolicyContext:
"""State the opener needs to enforce policy. Typically constructed from env/config."""
+
jurisdiction: str = "GLOBAL"
state_dir: Optional[Path] = None # for LOCAL_ONLY open-counter persistence
registry_url: Optional[str] = None # for REGISTRY mode
- mode: str = "LOCAL_ONLY" # LOCAL_ONLY | REGISTRY | HYBRID
+ mode: str = "LOCAL_ONLY" # LOCAL_ONLY | REGISTRY | HYBRID
def __post_init__(self):
if self.state_dir:
@@ -54,7 +55,7 @@ class PolicyContext:
def _local_counter_path(ctx: PolicyContext, file_id: str) -> Path:
if ctx.state_dir is None:
raise ValueError("PolicyContext.state_dir is required for LOCAL_ONLY mode")
- # file_id is a UUID string - defense against path traversal:
+ # file_id is a UUID string; defense against path traversal.
if "/" in file_id or "\\" in file_id or ".." in file_id:
raise ValueError(f"invalid file_id for counter filename: {file_id!r}")
return ctx.state_dir / f"{file_id}.opens.json"
@@ -70,6 +71,36 @@ def _local_read_count(ctx: PolicyContext, file_id: str) -> int:
return 0
+def _lock_file(lock_file) -> None:
+ if os.name == "nt":
+ import msvcrt
+
+ lock_file.seek(0, os.SEEK_END)
+ if lock_file.tell() == 0:
+ lock_file.write("\0")
+ lock_file.flush()
+ lock_file.seek(0)
+ msvcrt.locking(lock_file.fileno(), msvcrt.LK_LOCK, 1)
+ return
+
+ import fcntl
+
+ fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
+
+
+def _unlock_file(lock_file) -> None:
+ if os.name == "nt":
+ import msvcrt
+
+ lock_file.seek(0)
+ msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
+ return
+
+ import fcntl
+
+ fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
+
+
def _local_check_and_bump(ctx: PolicyContext, file_id: str, max_opens: int) -> int:
"""
Atomically: check count < max_opens AND bump. Uses an OS file lock
@@ -78,14 +109,13 @@ def _local_check_and_bump(ctx: PolicyContext, file_id: str, max_opens: int) -> i
Raises PolicyViolation if max_opens reached.
Returns the new count.
"""
- import fcntl # POSIX only; Windows would need msvcrt.locking.
import tempfile
p = _local_counter_path(ctx, file_id)
lock_path = p.with_suffix(".lock")
# Open/create lock file, acquire exclusive lock for the critical section.
with open(lock_path, "a+") as lf:
- fcntl.flock(lf.fileno(), fcntl.LOCK_EX)
+ _lock_file(lf)
try:
cur = _local_read_count(ctx, file_id)
if cur >= max_opens:
@@ -114,7 +144,7 @@ def _local_check_and_bump(ctx: PolicyContext, file_id: str, max_opens: int) -> i
raise
return new_count
finally:
- fcntl.flock(lf.fileno(), fcntl.LOCK_UN)
+ _unlock_file(lf)
def check_policy(manifest: Manifest, ctx: Optional[PolicyContext] = None) -> None:
@@ -166,5 +196,9 @@ def record_open(manifest: Manifest, ctx: Optional[PolicyContext]) -> int:
return 0
if ctx.mode == "LOCAL_ONLY":
return _local_check_and_bump(ctx, manifest.file_id, int(mx))
- # REGISTRY/HYBRID - caller should POST to registry /policy/open
- return _local_check_and_bump(ctx, manifest.file_id, int(mx))
+ if ctx.mode in {"REGISTRY", "HYBRID"}:
+ raise PolicyViolation(
+ f"{ctx.mode} max_opens enforcement is not implemented; refusing "
+ "to fall back to LOCAL_ONLY state"
+ )
+ raise ValueError(f"unknown policy mode: {ctx.mode!r}")
oversight_core/rekor.py +10 -1
@@ -399,12 +399,14 @@ def verify_inclusion_offline(
bundle_rekor_field: dict,
envelope: DSSEEnvelope,
issuer_ed25519_pub: bytes,
+ expected_content_hash_sha256_hex: str,
) -> tuple[bool, str]:
"""Verify a bundled Rekor entry without contacting the log.
Checks (in order):
1. The DSSE envelope verifies under ``issuer_ed25519_pub``.
- 2. The envelope payload's subject digest matches the bundle's claim.
+ 2. The envelope payload's subject digest matches the bundle manifest's
+ expected plaintext SHA-256.
3. The bundled ``transparency_log_entry`` has the structural fields the
tile-backed log returns (logIndex + signed checkpoint or proof).
@@ -414,6 +416,13 @@ def verify_inclusion_offline(
"""
if not verify_dsse(envelope, issuer_ed25519_pub):
return False, "dsse signature did not verify under issuer pubkey"
+ statement = envelope_payload_statement(envelope)
+ try:
+ subject_digest = statement["subject"][0]["digest"]["sha256"]
+ except (KeyError, IndexError, TypeError):
+ return False, "dsse payload missing subject digest"
+ if subject_digest != expected_content_hash_sha256_hex:
+ return False, "dsse subject digest does not match expected content hash"
tle = bundle_rekor_field.get("transparency_log_entry") or {}
if not isinstance(tle, dict) or not tle:
return False, "bundle missing transparency_log_entry payload"
tests/test_policy_unit.py +109 -0
@@ -0,0 +1,109 @@
+"""
+test_policy_unit
+================
+
+Focused policy/container checks around successful-open counting.
+"""
+from __future__ import annotations
+
+import sys
+import tempfile
+from pathlib import Path
+
+ROOT = Path(__file__).resolve().parent.parent
+sys.path.insert(0, str(ROOT))
+
+from oversight_core import (
+ ClassicIdentity,
+ Manifest,
+ Recipient,
+ content_hash,
+ open_sealed,
+ seal,
+)
+from oversight_core.policy import PolicyContext, PolicyViolation, record_open
+
+
+def ok(msg):
+ print(f" [PASS] {msg}")
+
+
+def t1_wrong_recipient_does_not_consume_open_count():
+ issuer = ClassicIdentity.generate()
+ alice = ClassicIdentity.generate()
+ bob = ClassicIdentity.generate()
+ plaintext = b"hello policy"
+ recipient = Recipient(
+ recipient_id="alice",
+ x25519_pub=alice.x25519_pub.hex(),
+ ed25519_pub=alice.ed25519_pub.hex(),
+ )
+ manifest = Manifest.new(
+ "test.txt",
+ content_hash(plaintext),
+ len(plaintext),
+ "issuer",
+ issuer.ed25519_pub.hex(),
+ recipient,
+ "http://localhost:8765",
+ "text/plain",
+ )
+ manifest.policy["max_opens"] = 1
+ blob = seal(plaintext, manifest, issuer.ed25519_priv, alice.x25519_pub)
+
+ with tempfile.TemporaryDirectory() as td:
+ ctx = PolicyContext(state_dir=Path(td), mode="LOCAL_ONLY")
+ try:
+ open_sealed(blob, bob.x25519_priv, policy_ctx=ctx)
+ except Exception:
+ pass
+ else:
+ raise AssertionError("wrong recipient unexpectedly decrypted file")
+
+ recovered, _ = open_sealed(blob, alice.x25519_priv, policy_ctx=ctx)
+ assert recovered == plaintext
+ ok("wrong recipient attempts do not consume max_opens")
+
+
+def t2_registry_modes_fail_closed():
+ issuer = ClassicIdentity.generate()
+ alice = ClassicIdentity.generate()
+ plaintext = b"hello policy"
+ recipient = Recipient(
+ recipient_id="alice",
+ x25519_pub=alice.x25519_pub.hex(),
+ ed25519_pub=alice.ed25519_pub.hex(),
+ )
+ manifest = Manifest.new(
+ "test.txt",
+ content_hash(plaintext),
+ len(plaintext),
+ "issuer",
+ issuer.ed25519_pub.hex(),
+ recipient,
+ "http://localhost:8765",
+ "text/plain",
+ )
+ manifest.policy["max_opens"] = 1
+ for mode in ("REGISTRY", "HYBRID"):
+ try:
+ record_open(manifest, PolicyContext(mode=mode, registry_url="https://registry.test"))
+ except PolicyViolation as exc:
+ assert "refusing to fall back" in str(exc)
+ else:
+ raise AssertionError(f"{mode} should fail closed until implemented")
+ ok("REGISTRY/HYBRID refuse insecure LOCAL_ONLY fallback")
+
+
+def main():
+ print("=" * 60)
+ print(" oversight_core.policy - focused unit tests")
+ print("=" * 60)
+ t1_wrong_recipient_does_not_consume_open_count()
+ t2_registry_modes_fail_closed()
+ print()
+ print(" ALL TESTS PASSED - 2/2")
+
+
+if __name__ == "__main__":
+ main()
tests/test_rekor_unit.py +82 -38
@@ -12,6 +12,7 @@ Covers (no network):
5. build_statement produces the expected in-toto v1 shape.
6. Envelope JSON serialization is canonical (sorted keys, no whitespace).
7. verify_inclusion_offline returns False when transparency_log_entry is empty.
+ 8. verify_inclusion_offline rejects mismatched subject digests.
Running this requires no external services; e2e Rekor tests live in
test_rekor_e2e.py (added in v0.5 Session B).
@@ -65,9 +66,12 @@ def t2_sign_verify_roundtrip():
def t3_tamper_payload_rejected():
priv, pub = _new_keypair()
pred = R.OversightRegistrationPredicate(
- file_id="x", issuer_pubkey_ed25519=pub.hex(),
- recipient_id="r", recipient_pubkey_sha256="00" * 32,
- suite="s", registered_at="t",
+ file_id="x",
+ issuer_pubkey_ed25519=pub.hex(),
+ recipient_id="r",
+ recipient_pubkey_sha256="00" * 32,
+ suite="s",
+ registered_at="t",
)
env = R.sign_dsse(R.build_statement("a", "b", pred), priv)
tampered = R.DSSEEnvelope(
@@ -83,9 +87,12 @@ def t4_wrong_key_rejected():
priv, _ = _new_keypair()
_, other_pub = _new_keypair()
pred = R.OversightRegistrationPredicate(
- file_id="x", issuer_pubkey_ed25519="zz",
- recipient_id="r", recipient_pubkey_sha256="00" * 32,
- suite="s", registered_at="t",
+ file_id="x",
+ issuer_pubkey_ed25519="zz",
+ recipient_id="r",
+ recipient_pubkey_sha256="00" * 32,
+ suite="s",
+ registered_at="t",
)
env = R.sign_dsse(R.build_statement("a", "b", pred), priv)
assert not R.verify_dsse(env, other_pub), "wrong-key sig verified!"
@@ -94,9 +101,12 @@ def t4_wrong_key_rejected():
def t5_statement_shape():
pred = R.OversightRegistrationPredicate(
- file_id="fid", issuer_pubkey_ed25519="pp",
- recipient_id="rid", recipient_pubkey_sha256="rxhash",
- suite="OSGT-CLASSIC-v1", registered_at="2026-04-19T00:00:00Z",
+ file_id="fid",
+ issuer_pubkey_ed25519="pp",
+ recipient_id="rid",
+ recipient_pubkey_sha256="rxhash",
+ suite="OSGT-CLASSIC-v1",
+ registered_at="2026-04-19T00:00:00Z",
)
s = R.build_statement("mark1234", "deadbeef" * 8, pred)
assert s["_type"] == R.STATEMENT_TYPE
@@ -108,33 +118,50 @@ def t5_statement_shape():
def t6_canonical_envelope_json():
- priv, pub = _new_keypair()
+ priv, _ = _new_keypair()
pred = R.OversightRegistrationPredicate(
- file_id="x", issuer_pubkey_ed25519="pp",
- recipient_id="r", recipient_pubkey_sha256="00" * 32,
- suite="s", registered_at="t",
+ file_id="x",
+ issuer_pubkey_ed25519="pp",
+ recipient_id="r",
+ recipient_pubkey_sha256="00" * 32,
+ suite="s",
+ registered_at="t",
)
env = R.sign_dsse(R.build_statement("a", "b" * 32, pred), priv)
raw = env.to_json()
- # canonical: keys sorted alphabetically (payload, payloadType, signatures)
- parsed_keys_in_order = [k for k in json.loads(raw).keys()]
- # round-trip must produce identical bytes
again = R.DSSEEnvelope.from_json(raw).to_json()
assert raw == again, "envelope JSON not canonical (round-trip differs)"
- # no whitespace
assert " " not in raw and "\n" not in raw, "envelope JSON has whitespace"
print(" [PASS] 6. envelope JSON is canonical and round-trip stable")
+def t7_offline_verify_rejects_empty_tle():
+ priv, pub = _new_keypair()
+ pred = R.OversightRegistrationPredicate(
+ file_id="x",
+ issuer_pubkey_ed25519="pp",
+ recipient_id="r",
+ recipient_pubkey_sha256="00" * 32,
+ suite="s",
+ registered_at="t",
+ )
+ env = R.sign_dsse(R.build_statement("a", "b" * 32, pred), priv)
+ ok, reason = R.verify_inclusion_offline({}, env, pub, "b" * 32)
+ assert not ok and "transparency_log_entry" in reason, reason
+ print(f" [PASS] 7. offline verify rejects empty bundle ({reason})")
+
+
def t8_recipient_pubkey_never_appears_raw():
"""Privacy: raw X25519 recipient key must never end up in the on-log payload."""
priv, _ = _new_keypair()
- raw_pub_hex = "11" * 32 # pretend recipient X25519 pub
+ raw_pub_hex = "11" * 32
pred = R.OversightRegistrationPredicate(
- file_id="x", issuer_pubkey_ed25519="pp",
+ file_id="x",
+ issuer_pubkey_ed25519="pp",
recipient_id="r",
recipient_pubkey_sha256=R.hash_recipient_pubkey(raw_pub_hex),
- suite="s", registered_at="t",
+ suite="s",
+ registered_at="t",
)
stmt = R.build_statement("a", "b" * 32, pred)
env = R.sign_dsse(stmt, priv)
@@ -147,9 +174,12 @@ def t8_recipient_pubkey_never_appears_raw():
def t9_predicate_carries_version_int():
pred = R.OversightRegistrationPredicate(
- file_id="x", issuer_pubkey_ed25519="pp",
- recipient_id="r", recipient_pubkey_sha256="00" * 32,
- suite="s", registered_at="t",
+ file_id="x",
+ issuer_pubkey_ed25519="pp",
+ recipient_id="r",
+ recipient_pubkey_sha256="00" * 32,
+ suite="s",
+ registered_at="t",
)
d = pred.to_dict()
assert d.get("predicate_version") == 1, d
@@ -158,19 +188,24 @@ def t9_predicate_carries_version_int():
def t10_bundle_has_5year_replay_fields():
"""Bundle must carry log_pubkey, checkpoint, schema URI, schema int."""
- priv, pub = _new_keypair()
+ priv, _ = _new_keypair()
pred = R.OversightRegistrationPredicate(
- file_id="x", issuer_pubkey_ed25519="pp",
- recipient_id="r", recipient_pubkey_sha256="00" * 32,
- suite="s", registered_at="t",
+ file_id="x",
+ issuer_pubkey_ed25519="pp",
+ recipient_id="r",
+ recipient_pubkey_sha256="00" * 32,
+ suite="s",
+ registered_at="t",
)
env = R.sign_dsse(R.build_statement("a", "b" * 32, pred), priv)
upload = R.RekorUploadResult(
log_url="https://log2025-1.rekor.sigstore.dev",
- log_index=42, log_id="abc", integrated_time=1776600000,
+ log_index=42,
+ log_id="abc",
+ integrated_time=1776600000,
transparency_log_entry={"logEntry": "..."},
log_pubkey_pem="-----BEGIN PUBLIC KEY-----\nFAKE\n-----END PUBLIC KEY-----",
- checkpoint="rekor.sigstore.dev\n42\nABC=\n- rekor sig...",
+ checkpoint="rekor.sigstore.dev\n42\nABC=\n-- rekor sig...",
)
bundle = R.build_bundle(
manifest_dict={"file_id": "x"},
@@ -190,22 +225,30 @@ def t10_bundle_has_5year_replay_fields():
print(" [PASS] 10. bundle carries log_pubkey + checkpoint + schema URI + schema=2")
-def t7_offline_verify_rejects_empty_tle():
+def t11_offline_verify_rejects_digest_mismatch():
priv, pub = _new_keypair()
pred = R.OversightRegistrationPredicate(
- file_id="x", issuer_pubkey_ed25519="pp",
- recipient_id="r", recipient_pubkey_sha256="00" * 32,
- suite="s", registered_at="t",
+ file_id="x",
+ issuer_pubkey_ed25519="pp",
+ recipient_id="r",
+ recipient_pubkey_sha256="00" * 32,
+ suite="s",
+ registered_at="t",
)
env = R.sign_dsse(R.build_statement("a", "b" * 32, pred), priv)
- ok, reason = R.verify_inclusion_offline({}, env, pub)
- assert not ok and "transparency_log_entry" in reason, reason
- print(f" [PASS] 7. offline verify rejects empty bundle ({reason})")
+ ok, reason = R.verify_inclusion_offline(
+ {"transparency_log_entry": {"logEntry": {"kindVersion": {"kind": "dsse"}}}},
+ env,
+ pub,
+ "c" * 32,
+ )
+ assert not ok and "subject digest" in reason, reason
+ print(f" [PASS] 11. offline verify rejects mismatched digest ({reason})")
def main():
print("=" * 60)
- print(" oversight_core.rekor - unit tests (offline, no network)")
+ print(" oversight_core.rekor - unit tests (offline, no network)")
print("=" * 60)
t1_pae_byte_exact()
t2_sign_verify_roundtrip()
@@ -217,8 +260,9 @@ def main():
t8_recipient_pubkey_never_appears_raw()
t9_predicate_carries_version_int()
t10_bundle_has_5year_replay_fields()
+ t11_offline_verify_rejects_digest_mismatch()
print()
- print(" ALL TESTS PASSED - 10/10")
+ print(" ALL TESTS PASSED - 11/11")
print()