Zion Boggan
repos/Oversight/oversight_core/l3_policy.py
zionboggan.com ↗
212 lines · python
History for this file →
1
"""
2
L3 semantic-watermark safety policy.
3
 
4
L3 is powerful because it changes visible prose. That also makes it unsafe for
5
classes where exact wording is the evidence: contracts, filings, code, logs,
6
structured data, and technical specifications. This module decides when L3 is
7
allowed and applies it only to conservative prose regions.
8
"""
9
 
10
from __future__ import annotations
11
 
12
from dataclasses import dataclass, asdict
13
from pathlib import Path
14
import re
15
from typing import Optional
16
 
17
from . import semantic
18
 
19
 
20
RISKY_EXTENSIONS = {
21
    ".c", ".cc", ".cpp", ".cs", ".css", ".go", ".h", ".hpp", ".java",
22
    ".js", ".jsx", ".kt", ".lua", ".php", ".py", ".rb", ".rs", ".sh",
23
    ".sql", ".swift", ".ts", ".tsx",
24
    ".json", ".jsonl", ".yaml", ".yml", ".toml", ".xml", ".csv", ".tsv",
25
    ".ini", ".conf", ".cfg", ".lock", ".env",
26
    ".log",
27
}
28
LEGAL_EXTENSIONS = {".contract", ".filing", ".nda", ".msa", ".sow"}
29
STRUCTURED_MIME_PREFIXES = (
30
    "application/json",
31
    "application/xml",
32
    "application/x-yaml",
33
    "text/csv",
34
    "text/tab-separated-values",
35
)
36
SOURCE_MIME_HINTS = ("source", "script", "sql", "json", "yaml", "xml")
37
RFC2119 = {
38
    "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT", "SHOULD",
39
    "SHOULD NOT", "RECOMMENDED", "NOT RECOMMENDED", "MAY", "OPTIONAL",
40
}
41
 
42
 
43
@dataclass
44
class L3Decision:
45
    enabled: bool
46
    mode: str
47
    document_class: str
48
    requires_ack: bool
49
    reason: str
50
    warnings: list[str]
51
 
52
    def to_dict(self) -> dict:
53
        return asdict(self)
54
 
55
 
56
def classify_document(
57
    *,
58
    filename: str = "",
59
    content_type: str = "",
60
    text: str = "",
61
    declared_class: str = "auto",
62
) -> tuple[str, list[str]]:
63
    """Classify a document for L3 safety decisions."""
64
    if declared_class and declared_class != "auto":
65
        return declared_class, [f"declared document class: {declared_class}"]
66
 
67
    suffix = Path(filename).suffix.lower()
68
    ctype = (content_type or "").lower()
69
    sample = text[:8192]
70
    reasons: list[str] = []
71
 
72
    if suffix in LEGAL_EXTENSIONS:
73
        return "legal", [f"legal-sensitive extension {suffix}"]
74
    if suffix in RISKY_EXTENSIONS:
75
        if suffix in {".sql"}:
76
            return "sql", [f"SQL extension {suffix}"]
77
        if suffix == ".log":
78
            return "log", [f"log extension {suffix}"]
79
        if suffix in {".json", ".jsonl", ".yaml", ".yml", ".toml", ".xml", ".csv", ".tsv", ".ini", ".conf", ".cfg", ".lock", ".env"}:
80
            return "structured_data", [f"structured-data extension {suffix}"]
81
        return "source_code", [f"source-code extension {suffix}"]
82
 
83
    if any(ctype.startswith(p) for p in STRUCTURED_MIME_PREFIXES):
84
        return "structured_data", [f"structured MIME type {content_type}"]
85
    if any(h in ctype for h in SOURCE_MIME_HINTS):
86
        return "source_code", [f"code-like MIME type {content_type}"]
87
 
88
    upper_hits = sum(1 for kw in RFC2119 if re.search(rf"\b{re.escape(kw)}\b", sample))
89
    if upper_hits >= 3:
90
        return "technical_spec", ["multiple RFC 2119 requirement keywords"]
91
    if re.search(r"\b(SEC|FDA|FINRA|10-K|10-Q|8-K|S-1|regulation|compliance filing)\b", sample, re.I):
92
        return "regulatory", ["regulatory/filing language detected"]
93
    if re.search(r"\b(agreement|whereas|hereby|indemnif|governing law|jurisdiction|party|parties)\b", sample, re.I):
94
        return "legal", ["contract/legal language detected"]
95
    if re.search(r"```|^\s{4,}\S|SELECT\s+.+\s+FROM|CREATE\s+TABLE", sample, re.I | re.M):
96
        return "technical_spec", ["code block or specification-like syntax detected"]
97
 
98
    reasons.append("no high-risk L3 signals detected")
99
    return "prose", reasons
100
 
101
 
102
def decide_l3(
103
    *,
104
    filename: str = "",
105
    content_type: str = "",
106
    text: str = "",
107
    declared_class: str = "auto",
108
    requested_mode: str = "auto",
109
) -> L3Decision:
110
    """Return whether L3 should run and how."""
111
    doc_class, reasons = classify_document(
112
        filename=filename,
113
        content_type=content_type,
114
        text=text,
115
        declared_class=declared_class,
116
    )
117
    risky = doc_class in {
118
        "legal", "regulatory", "technical_spec", "source_code", "sql",
119
        "log", "structured_data",
120
    }
121
    warnings: list[str] = []
122
 
123
    if requested_mode == "off":
124
        return L3Decision(False, "off", doc_class, False, "L3 disabled by user", reasons)
125
    if requested_mode == "boilerplate":
126
        return L3Decision(True, "boilerplate", doc_class, True, "boilerplate-only L3 requested", reasons)
127
    if requested_mode == "full":
128
        if risky:
129
            warnings.append(
130
                "L3 full mode was explicitly requested for a wording-sensitive document class."
131
            )
132
        return L3Decision(True, "full", doc_class, True, "full L3 explicitly requested", reasons + warnings)
133
 
134
    if risky:
135
        return L3Decision(
136
            False,
137
            "off",
138
            doc_class,
139
            False,
140
            "L3 defaults off for wording-sensitive document classes",
141
            reasons,
142
        )
143
 
144
    return L3Decision(True, "full", doc_class, True, "L3 auto-enabled for prose", reasons)
145
 
146
 
147
def apply_l3_safe(text: str, mark_id: bytes, mode: str = "full") -> str:
148
    """Apply L3 only to conservative prose regions."""
149
    if mode == "off":
150
        return text
151
 
152
    lines = text.splitlines(keepends=True)
153
    code_fence = False
154
    out: list[str] = []
155
    total = len(lines)
156
 
157
    for idx, line in enumerate(lines):
158
        stripped = line.strip()
159
        if stripped.startswith("```"):
160
            code_fence = not code_fence
161
            out.append(line)
162
            continue
163
        if code_fence or _line_is_protected(line):
164
            out.append(line)
165
            continue
166
        if mode == "boilerplate" and not _is_boilerplate_line(line, idx, total):
167
            out.append(line)
168
            continue
169
        out.append(_apply_l3_to_unquoted_segments(line, mark_id))
170
    return "".join(out)
171
 
172
 
173
def _line_is_protected(line: str) -> bool:
174
    stripped = line.strip()
175
    if not stripped:
176
        return False
177
    if line.startswith(("    ", "\t", ">>> ", "... ")):
178
        return True
179
    if re.match(r"^\s*(SELECT|INSERT|UPDATE|DELETE|CREATE|ALTER|DROP)\b", line, re.I):
180
        return True
181
    if re.search(r"`[^`]+`", line):
182
        return True
183
    if re.search(r"\b(?:MUST|SHOULD|MAY|SHALL|REQUIRED|OPTIONAL)(?:\s+NOT)?\b", line):
184
        return True
185
    if re.search(r"\b\d+(?:\.\d+)?\s*(?:%|percent|kg|g|mg|lb|oz|m|cm|mm|km|ft|in|ms|s|sec|min|h|hr|USD|EUR|GBP|MB|GB|TB)\b", line, re.I):
186
        return True
187
    if re.search(r"\b[A-Z][A-Z0-9_-]{2,}\b", line):
188
        return True
189
    return False
190
 
191
 
192
def _is_boilerplate_line(line: str, idx: int, total: int) -> bool:
193
    if idx < 6 or idx >= max(0, total - 6):
194
        return True
195
    return bool(re.search(r"\b(confidential|proprietary|notice|copyright|footer|header|cover page)\b", line, re.I))
196
 
197
 
198
def _apply_l3_to_unquoted_segments(line: str, mark_id: bytes) -> str:
199
    parts = re.split(r"((?:\"[^\"]*\")|(?:'[^']*')|(?:โ€œ[^โ€]*โ€))", line)
200
    for i in range(0, len(parts), 2):
201
        segment = parts[i]
202
        if not segment.strip():
203
            continue
204
        segment = (
205
            semantic.embed_synonyms_v2(segment, mark_id, min_instances=1)
206
            if semantic.SYNONYMS_V2_AVAILABLE
207
            else semantic.embed_synonyms(segment, mark_id, min_instances=1)
208
        )
209
        segment = semantic.embed_spelling(segment, mark_id)
210
        segment = semantic.embed_contractions(segment, mark_id)
211
        parts[i] = segment
212
    return "".join(parts)