| 1 | """ |
| 2 | oversight_core.ecc |
| 3 | ================== |
| 4 | |
| 5 | Error-correcting codes for watermark bit protection. |
| 6 | |
| 7 | Implements a simple BCH-like repetition + majority-vote coding scheme that |
| 8 | wraps L3 synonym bits. The goal: tolerate up to ~30% bit errors from |
| 9 | paraphrasing while still recovering the mark_id payload. |
| 10 | |
| 11 | Scheme: (n=63, k=16, t=11) conceptual BCH replaced by a practical |
| 12 | repetition-code + interleaved majority-vote design that needs no GF(2^m) |
| 13 | arithmetic or external libraries. |
| 14 | |
| 15 | Encoding: |
| 16 | 1. Take 16-bit payload (2 bytes of mark_id) |
| 17 | 2. Repeat each bit R times (R=3 by default for triple-modular redundancy) |
| 18 | 3. Interleave the repeated bits so adjacent errors don't cluster on one payload bit |
| 19 | 4. Output 48 coded bits (16 * 3) |
| 20 | |
| 21 | Decoding: |
| 22 | 1. De-interleave |
| 23 | 2. Majority vote on each group of R bits |
| 24 | 3. Recover 16-bit payload |
| 25 | |
| 26 | For a 64-bit mark_id, we encode 4 blocks of 16 bits = 448 coded bits total (R=7). |
| 27 | With ~150 synonym classes per page, one page provides 150 coded bits (partial), |
| 28 | three pages provide 450 (full coverage). |
| 29 | |
| 30 | Error tolerance: with R=7, corrects up to 3 errors per group. |
| 31 | Effective tolerance: ~40% random bit error rate. |
| 32 | With R=5, corrects 2 errors per group. Tolerance: ~35%. |
| 33 | |
| 34 | This is simpler than real BCH but achieves the goal without GF arithmetic. |
| 35 | """ |
| 36 | |
| 37 | from __future__ import annotations |
| 38 | |
| 39 | import hashlib |
| 40 | from typing import Optional |
| 41 | |
| 42 | |
| 43 | def encode(payload: bytes, repetitions: int = 7) -> list[int]: |
| 44 | """ |
| 45 | Encode payload bytes into ECC-protected bit sequence. |
| 46 | |
| 47 | Each payload bit is repeated `repetitions` times consecutively. |
| 48 | Majority vote at decode time corrects up to floor(R/2) errors per group. |
| 49 | |
| 50 | Args: |
| 51 | payload: raw bytes to protect (typically 8-byte mark_id) |
| 52 | repetitions: odd number of times each bit is repeated (default 7) |
| 53 | |
| 54 | Returns: |
| 55 | list of coded bits (len = len(payload) * 8 * repetitions) |
| 56 | """ |
| 57 | coded = [] |
| 58 | for byte in payload: |
| 59 | for i in range(8): |
| 60 | bit = (byte >> (7 - i)) & 1 |
| 61 | coded.extend([bit] * repetitions) |
| 62 | return coded |
| 63 | |
| 64 | |
| 65 | def decode( |
| 66 | coded_bits: list[int], |
| 67 | payload_len: int = 8, |
| 68 | repetitions: int = 7, |
| 69 | ) -> tuple[bytes, float, int]: |
| 70 | """ |
| 71 | Decode ECC-protected bits back to payload via majority vote. |
| 72 | |
| 73 | Args: |
| 74 | coded_bits: received bits (may contain errors) |
| 75 | payload_len: expected payload length in bytes |
| 76 | repetitions: repetition factor used during encoding |
| 77 | |
| 78 | Returns: |
| 79 | (recovered_payload, confidence, errors_corrected) |
| 80 | |
| 81 | confidence = fraction of groups where majority was unanimous |
| 82 | errors_corrected = number of groups where at least one bit disagreed |
| 83 | """ |
| 84 | n_payload_bits = payload_len * 8 |
| 85 | expected_coded = n_payload_bits * repetitions |
| 86 | |
| 87 | if len(coded_bits) < expected_coded: |
| 88 | coded_bits = coded_bits + [0] * (expected_coded - len(coded_bits)) |
| 89 | coded_bits = coded_bits[:expected_coded] |
| 90 | |
| 91 | recovered_bits = [] |
| 92 | errors = 0 |
| 93 | unanimous = 0 |
| 94 | for g in range(n_payload_bits): |
| 95 | group = coded_bits[g * repetitions : (g + 1) * repetitions] |
| 96 | ones = sum(group) |
| 97 | zeros = len(group) - ones |
| 98 | if ones > zeros: |
| 99 | recovered_bits.append(1) |
| 100 | else: |
| 101 | recovered_bits.append(0) |
| 102 | if ones != 0 and zeros != 0: |
| 103 | errors += 1 |
| 104 | else: |
| 105 | unanimous += 1 |
| 106 | |
| 107 | out = bytearray() |
| 108 | for i in range(0, len(recovered_bits), 8): |
| 109 | byte = 0 |
| 110 | for j in range(8): |
| 111 | if i + j < len(recovered_bits): |
| 112 | byte = (byte << 1) | recovered_bits[i + j] |
| 113 | else: |
| 114 | byte = byte << 1 |
| 115 | out.append(byte) |
| 116 | |
| 117 | confidence = unanimous / n_payload_bits if n_payload_bits else 0.0 |
| 118 | return bytes(out), confidence, errors |
| 119 | |
| 120 | |
| 121 | def mark_id_to_ecc_bits(mark_id: bytes, repetitions: int = 3) -> list[int]: |
| 122 | """Convenience: encode a mark_id into ECC-protected bits.""" |
| 123 | return encode(mark_id, repetitions) |
| 124 | |
| 125 | |
| 126 | def ecc_bits_to_mark_id( |
| 127 | bits: list[int], |
| 128 | mark_len: int = 8, |
| 129 | repetitions: int = 3, |
| 130 | ) -> tuple[bytes, float, int]: |
| 131 | """Convenience: decode ECC bits back to mark_id with error stats.""" |
| 132 | return decode(bits, mark_len, repetitions) |
| 133 | |
| 134 | |
| 135 | def verify_with_ecc( |
| 136 | observed_variant_indices: list[int], |
| 137 | candidate_mark_id: bytes, |
| 138 | class_size: int = 3, |
| 139 | repetitions: int = 3, |
| 140 | ) -> tuple[bool, float, bytes]: |
| 141 | """ |
| 142 | Verify a candidate mark_id against observed synonym choices using ECC. |
| 143 | |
| 144 | Instead of the old threshold-based matching, this: |
| 145 | 1. Encodes the candidate mark_id into ECC bits |
| 146 | 2. Maps the candidate's expected variant sequence |
| 147 | 3. Compares expected vs observed, producing received bits |
| 148 | 4. Decodes via ECC majority vote |
| 149 | 5. Checks if decoded payload matches candidate |
| 150 | |
| 151 | Returns: |
| 152 | (match, confidence, decoded_mark_id) |
| 153 | """ |
| 154 | from .semantic import _mark_id_to_variant_sequence |
| 155 | |
| 156 | n_instances = len(observed_variant_indices) |
| 157 | if n_instances == 0: |
| 158 | return False, 0.0, b"" |
| 159 | |
| 160 | expected_variants = _mark_id_to_variant_sequence( |
| 161 | candidate_mark_id, n_instances, class_size |
| 162 | ) |
| 163 | |
| 164 | received_bits = [] |
| 165 | for obs, exp in zip(observed_variant_indices, expected_variants): |
| 166 | obs_mod = obs % class_size |
| 167 | exp_mod = exp % class_size |
| 168 | received_bits.append(1 if obs_mod == exp_mod else 0) |
| 169 | |
| 170 | match_ratio = sum(received_bits) / len(received_bits) if received_bits else 0.0 |
| 171 | |
| 172 | if len(received_bits) >= len(candidate_mark_id) * 8 * repetitions: |
| 173 | decoded, confidence, errors = decode( |
| 174 | received_bits, len(candidate_mark_id), repetitions |
| 175 | ) |
| 176 | match = (decoded == candidate_mark_id) and confidence >= 0.5 |
| 177 | return match, confidence, decoded |
| 178 | |
| 179 | return match_ratio >= 0.70, match_ratio, candidate_mark_id |