| 1 | """ |
| 2 | L3 semantic-watermark safety policy. |
| 3 | |
| 4 | L3 is powerful because it changes visible prose. That also makes it unsafe for |
| 5 | classes where exact wording is the evidence: contracts, filings, code, logs, |
| 6 | structured data, and technical specifications. This module decides when L3 is |
| 7 | allowed and applies it only to conservative prose regions. |
| 8 | """ |
| 9 | |
| 10 | from __future__ import annotations |
| 11 | |
| 12 | from dataclasses import dataclass, asdict |
| 13 | from pathlib import Path |
| 14 | import re |
| 15 | from typing import Optional |
| 16 | |
| 17 | from . import semantic |
| 18 | |
| 19 | |
| 20 | RISKY_EXTENSIONS = { |
| 21 | ".c", ".cc", ".cpp", ".cs", ".css", ".go", ".h", ".hpp", ".java", |
| 22 | ".js", ".jsx", ".kt", ".lua", ".php", ".py", ".rb", ".rs", ".sh", |
| 23 | ".sql", ".swift", ".ts", ".tsx", |
| 24 | ".json", ".jsonl", ".yaml", ".yml", ".toml", ".xml", ".csv", ".tsv", |
| 25 | ".ini", ".conf", ".cfg", ".lock", ".env", |
| 26 | ".log", |
| 27 | } |
| 28 | LEGAL_EXTENSIONS = {".contract", ".filing", ".nda", ".msa", ".sow"} |
| 29 | STRUCTURED_MIME_PREFIXES = ( |
| 30 | "application/json", |
| 31 | "application/xml", |
| 32 | "application/x-yaml", |
| 33 | "text/csv", |
| 34 | "text/tab-separated-values", |
| 35 | ) |
| 36 | SOURCE_MIME_HINTS = ("source", "script", "sql", "json", "yaml", "xml") |
| 37 | RFC2119 = { |
| 38 | "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT", "SHOULD", |
| 39 | "SHOULD NOT", "RECOMMENDED", "NOT RECOMMENDED", "MAY", "OPTIONAL", |
| 40 | } |
| 41 | |
| 42 | |
| 43 | @dataclass |
| 44 | class L3Decision: |
| 45 | enabled: bool |
| 46 | mode: str |
| 47 | document_class: str |
| 48 | requires_ack: bool |
| 49 | reason: str |
| 50 | warnings: list[str] |
| 51 | |
| 52 | def to_dict(self) -> dict: |
| 53 | return asdict(self) |
| 54 | |
| 55 | |
| 56 | def classify_document( |
| 57 | *, |
| 58 | filename: str = "", |
| 59 | content_type: str = "", |
| 60 | text: str = "", |
| 61 | declared_class: str = "auto", |
| 62 | ) -> tuple[str, list[str]]: |
| 63 | """Classify a document for L3 safety decisions.""" |
| 64 | if declared_class and declared_class != "auto": |
| 65 | return declared_class, [f"declared document class: {declared_class}"] |
| 66 | |
| 67 | suffix = Path(filename).suffix.lower() |
| 68 | ctype = (content_type or "").lower() |
| 69 | sample = text[:8192] |
| 70 | reasons: list[str] = [] |
| 71 | |
| 72 | if suffix in LEGAL_EXTENSIONS: |
| 73 | return "legal", [f"legal-sensitive extension {suffix}"] |
| 74 | if suffix in RISKY_EXTENSIONS: |
| 75 | if suffix in {".sql"}: |
| 76 | return "sql", [f"SQL extension {suffix}"] |
| 77 | if suffix == ".log": |
| 78 | return "log", [f"log extension {suffix}"] |
| 79 | if suffix in {".json", ".jsonl", ".yaml", ".yml", ".toml", ".xml", ".csv", ".tsv", ".ini", ".conf", ".cfg", ".lock", ".env"}: |
| 80 | return "structured_data", [f"structured-data extension {suffix}"] |
| 81 | return "source_code", [f"source-code extension {suffix}"] |
| 82 | |
| 83 | if any(ctype.startswith(p) for p in STRUCTURED_MIME_PREFIXES): |
| 84 | return "structured_data", [f"structured MIME type {content_type}"] |
| 85 | if any(h in ctype for h in SOURCE_MIME_HINTS): |
| 86 | return "source_code", [f"code-like MIME type {content_type}"] |
| 87 | |
| 88 | upper_hits = sum(1 for kw in RFC2119 if re.search(rf"\b{re.escape(kw)}\b", sample)) |
| 89 | if upper_hits >= 3: |
| 90 | return "technical_spec", ["multiple RFC 2119 requirement keywords"] |
| 91 | if re.search(r"\b(SEC|FDA|FINRA|10-K|10-Q|8-K|S-1|regulation|compliance filing)\b", sample, re.I): |
| 92 | return "regulatory", ["regulatory/filing language detected"] |
| 93 | if re.search(r"\b(agreement|whereas|hereby|indemnif|governing law|jurisdiction|party|parties)\b", sample, re.I): |
| 94 | return "legal", ["contract/legal language detected"] |
| 95 | if re.search(r"```|^\s{4,}\S|SELECT\s+.+\s+FROM|CREATE\s+TABLE", sample, re.I | re.M): |
| 96 | return "technical_spec", ["code block or specification-like syntax detected"] |
| 97 | |
| 98 | reasons.append("no high-risk L3 signals detected") |
| 99 | return "prose", reasons |
| 100 | |
| 101 | |
| 102 | def decide_l3( |
| 103 | *, |
| 104 | filename: str = "", |
| 105 | content_type: str = "", |
| 106 | text: str = "", |
| 107 | declared_class: str = "auto", |
| 108 | requested_mode: str = "auto", |
| 109 | ) -> L3Decision: |
| 110 | """Return whether L3 should run and how.""" |
| 111 | doc_class, reasons = classify_document( |
| 112 | filename=filename, |
| 113 | content_type=content_type, |
| 114 | text=text, |
| 115 | declared_class=declared_class, |
| 116 | ) |
| 117 | risky = doc_class in { |
| 118 | "legal", "regulatory", "technical_spec", "source_code", "sql", |
| 119 | "log", "structured_data", |
| 120 | } |
| 121 | warnings: list[str] = [] |
| 122 | |
| 123 | if requested_mode == "off": |
| 124 | return L3Decision(False, "off", doc_class, False, "L3 disabled by user", reasons) |
| 125 | if requested_mode == "boilerplate": |
| 126 | return L3Decision(True, "boilerplate", doc_class, True, "boilerplate-only L3 requested", reasons) |
| 127 | if requested_mode == "full": |
| 128 | if risky: |
| 129 | warnings.append( |
| 130 | "L3 full mode was explicitly requested for a wording-sensitive document class." |
| 131 | ) |
| 132 | return L3Decision(True, "full", doc_class, True, "full L3 explicitly requested", reasons + warnings) |
| 133 | |
| 134 | if risky: |
| 135 | return L3Decision( |
| 136 | False, |
| 137 | "off", |
| 138 | doc_class, |
| 139 | False, |
| 140 | "L3 defaults off for wording-sensitive document classes", |
| 141 | reasons, |
| 142 | ) |
| 143 | |
| 144 | return L3Decision(True, "full", doc_class, True, "L3 auto-enabled for prose", reasons) |
| 145 | |
| 146 | |
| 147 | def apply_l3_safe(text: str, mark_id: bytes, mode: str = "full") -> str: |
| 148 | """Apply L3 only to conservative prose regions.""" |
| 149 | if mode == "off": |
| 150 | return text |
| 151 | |
| 152 | lines = text.splitlines(keepends=True) |
| 153 | code_fence = False |
| 154 | out: list[str] = [] |
| 155 | total = len(lines) |
| 156 | |
| 157 | for idx, line in enumerate(lines): |
| 158 | stripped = line.strip() |
| 159 | if stripped.startswith("```"): |
| 160 | code_fence = not code_fence |
| 161 | out.append(line) |
| 162 | continue |
| 163 | if code_fence or _line_is_protected(line): |
| 164 | out.append(line) |
| 165 | continue |
| 166 | if mode == "boilerplate" and not _is_boilerplate_line(line, idx, total): |
| 167 | out.append(line) |
| 168 | continue |
| 169 | out.append(_apply_l3_to_unquoted_segments(line, mark_id)) |
| 170 | return "".join(out) |
| 171 | |
| 172 | |
| 173 | def _line_is_protected(line: str) -> bool: |
| 174 | stripped = line.strip() |
| 175 | if not stripped: |
| 176 | return False |
| 177 | if line.startswith((" ", "\t", ">>> ", "... ")): |
| 178 | return True |
| 179 | if re.match(r"^\s*(SELECT|INSERT|UPDATE|DELETE|CREATE|ALTER|DROP)\b", line, re.I): |
| 180 | return True |
| 181 | if re.search(r"`[^`]+`", line): |
| 182 | return True |
| 183 | if re.search(r"\b(?:MUST|SHOULD|MAY|SHALL|REQUIRED|OPTIONAL)(?:\s+NOT)?\b", line): |
| 184 | return True |
| 185 | if re.search(r"\b\d+(?:\.\d+)?\s*(?:%|percent|kg|g|mg|lb|oz|m|cm|mm|km|ft|in|ms|s|sec|min|h|hr|USD|EUR|GBP|MB|GB|TB)\b", line, re.I): |
| 186 | return True |
| 187 | if re.search(r"\b[A-Z][A-Z0-9_-]{2,}\b", line): |
| 188 | return True |
| 189 | return False |
| 190 | |
| 191 | |
| 192 | def _is_boilerplate_line(line: str, idx: int, total: int) -> bool: |
| 193 | if idx < 6 or idx >= max(0, total - 6): |
| 194 | return True |
| 195 | return bool(re.search(r"\b(confidential|proprietary|notice|copyright|footer|header|cover page)\b", line, re.I)) |
| 196 | |
| 197 | |
| 198 | def _apply_l3_to_unquoted_segments(line: str, mark_id: bytes) -> str: |
| 199 | parts = re.split(r"((?:\"[^\"]*\")|(?:'[^']*')|(?:โ[^โ]*โ))", line) |
| 200 | for i in range(0, len(parts), 2): |
| 201 | segment = parts[i] |
| 202 | if not segment.strip(): |
| 203 | continue |
| 204 | segment = ( |
| 205 | semantic.embed_synonyms_v2(segment, mark_id, min_instances=1) |
| 206 | if semantic.SYNONYMS_V2_AVAILABLE |
| 207 | else semantic.embed_synonyms(segment, mark_id, min_instances=1) |
| 208 | ) |
| 209 | segment = semantic.embed_spelling(segment, mark_id) |
| 210 | segment = semantic.embed_contractions(segment, mark_id) |
| 211 | parts[i] = segment |
| 212 | return "".join(parts) |