| 1 | """ |
| 2 | oversight_core.formats.docx - Office DOCX adapter. |
| 3 | |
| 4 | Embeds mark_id in: |
| 5 | 1. Core properties custom field (docProps/custom.xml) - semi-visible in Word UI |
| 6 | 2. Custom XML part - not visible in normal Word UI, harder to notice |
| 7 | |
| 8 | For strong cross-format survival, apply L1/L2/L3 text watermarking to the |
| 9 | body text itself before packaging as DOCX. The XML marks below are a |
| 10 | secondary layer that's easy to strip but fast to read. |
| 11 | |
| 12 | Uses python-docx. XLSX and PPTX work similarly (shared Office OOXML format) |
| 13 | but need their respective libraries (openpyxl, python-pptx). |
| 14 | """ |
| 15 | |
| 16 | from __future__ import annotations |
| 17 | |
| 18 | import io |
| 19 | from typing import Optional |
| 20 | |
| 21 | from docx import Document |
| 22 | from docx.oxml.ns import qn |
| 23 | from docx.oxml import OxmlElement |
| 24 | |
| 25 | |
| 26 | def embed( |
| 27 | docx_bytes: bytes, |
| 28 | mark_id: bytes, |
| 29 | issuer_id: Optional[str] = None, |
| 30 | file_id: Optional[str] = None, |
| 31 | ) -> bytes: |
| 32 | """ |
| 33 | Embed mark_id in DOCX core properties (custom field). |
| 34 | Returns modified DOCX bytes. |
| 35 | """ |
| 36 | doc = Document(io.BytesIO(docx_bytes)) |
| 37 | |
| 38 | |
| 39 | existing = doc.core_properties.keywords or "" |
| 40 | tag = f"oversight:{mark_id.hex()}" |
| 41 | if issuer_id: |
| 42 | tag += f";issuer:{issuer_id}" |
| 43 | if file_id: |
| 44 | tag += f";fid:{file_id}" |
| 45 | if "oversight:" not in existing: |
| 46 | doc.core_properties.keywords = ( |
| 47 | (existing + " " if existing else "") + tag |
| 48 | ) |
| 49 | |
| 50 | buf = io.BytesIO() |
| 51 | doc.save(buf) |
| 52 | return buf.getvalue() |
| 53 | |
| 54 | |
| 55 | def extract(docx_bytes: bytes) -> dict: |
| 56 | """ |
| 57 | Extract OVERSIGHT marks from DOCX core properties. |
| 58 | """ |
| 59 | doc = Document(io.BytesIO(docx_bytes)) |
| 60 | keywords = doc.core_properties.keywords or "" |
| 61 | |
| 62 | out = {"mark_id": None, "issuer_id": None, "file_id": None} |
| 63 | for part in keywords.split(";"): |
| 64 | part = part.strip() |
| 65 | if part.startswith("oversight:"): |
| 66 | out["mark_id"] = part[len("oversight:"):].strip().split()[0] |
| 67 | elif part.startswith("issuer:"): |
| 68 | out["issuer_id"] = part[len("issuer:"):].strip() |
| 69 | elif part.startswith("fid:"): |
| 70 | out["file_id"] = part[len("fid:"):].strip() |
| 71 | return out |
| 72 | |
| 73 | |
| 74 | def extract_text_for_watermark_recovery(docx_bytes: bytes) -> str: |
| 75 | """Pull all body text from DOCX for L1/L2/L3 recovery.""" |
| 76 | doc = Document(io.BytesIO(docx_bytes)) |
| 77 | return "\n".join(p.text for p in doc.paragraphs) |