Zion Boggan
repos/Oversight/oversight_core/ecc.py
zionboggan.com ↗
179 lines · python
History for this file →
1
"""
2
oversight_core.ecc
3
==================
4
 
5
Error-correcting codes for watermark bit protection.
6
 
7
Implements a simple BCH-like repetition + majority-vote coding scheme that
8
wraps L3 synonym bits. The goal: tolerate up to ~30% bit errors from
9
paraphrasing while still recovering the mark_id payload.
10
 
11
Scheme: (n=63, k=16, t=11) conceptual BCH replaced by a practical
12
repetition-code + interleaved majority-vote design that needs no GF(2^m)
13
arithmetic or external libraries.
14
 
15
Encoding:
16
  1. Take 16-bit payload (2 bytes of mark_id)
17
  2. Repeat each bit R times (R=3 by default for triple-modular redundancy)
18
  3. Interleave the repeated bits so adjacent errors don't cluster on one payload bit
19
  4. Output 48 coded bits (16 * 3)
20
 
21
Decoding:
22
  1. De-interleave
23
  2. Majority vote on each group of R bits
24
  3. Recover 16-bit payload
25
 
26
For a 64-bit mark_id, we encode 4 blocks of 16 bits = 448 coded bits total (R=7).
27
With ~150 synonym classes per page, one page provides 150 coded bits (partial),
28
three pages provide 450 (full coverage).
29
 
30
Error tolerance: with R=7, corrects up to 3 errors per group.
31
Effective tolerance: ~40% random bit error rate.
32
With R=5, corrects 2 errors per group. Tolerance: ~35%.
33
 
34
This is simpler than real BCH but achieves the goal without GF arithmetic.
35
"""
36
 
37
from __future__ import annotations
38
 
39
import hashlib
40
from typing import Optional
41
 
42
 
43
def encode(payload: bytes, repetitions: int = 7) -> list[int]:
44
    """
45
    Encode payload bytes into ECC-protected bit sequence.
46
 
47
    Each payload bit is repeated `repetitions` times consecutively.
48
    Majority vote at decode time corrects up to floor(R/2) errors per group.
49
 
50
    Args:
51
        payload: raw bytes to protect (typically 8-byte mark_id)
52
        repetitions: odd number of times each bit is repeated (default 7)
53
 
54
    Returns:
55
        list of coded bits (len = len(payload) * 8 * repetitions)
56
    """
57
    coded = []
58
    for byte in payload:
59
        for i in range(8):
60
            bit = (byte >> (7 - i)) & 1
61
            coded.extend([bit] * repetitions)
62
    return coded
63
 
64
 
65
def decode(
66
    coded_bits: list[int],
67
    payload_len: int = 8,
68
    repetitions: int = 7,
69
) -> tuple[bytes, float, int]:
70
    """
71
    Decode ECC-protected bits back to payload via majority vote.
72
 
73
    Args:
74
        coded_bits: received bits (may contain errors)
75
        payload_len: expected payload length in bytes
76
        repetitions: repetition factor used during encoding
77
 
78
    Returns:
79
        (recovered_payload, confidence, errors_corrected)
80
 
81
    confidence = fraction of groups where majority was unanimous
82
    errors_corrected = number of groups where at least one bit disagreed
83
    """
84
    n_payload_bits = payload_len * 8
85
    expected_coded = n_payload_bits * repetitions
86
 
87
    if len(coded_bits) < expected_coded:
88
        coded_bits = coded_bits + [0] * (expected_coded - len(coded_bits))
89
    coded_bits = coded_bits[:expected_coded]
90
 
91
    recovered_bits = []
92
    errors = 0
93
    unanimous = 0
94
    for g in range(n_payload_bits):
95
        group = coded_bits[g * repetitions : (g + 1) * repetitions]
96
        ones = sum(group)
97
        zeros = len(group) - ones
98
        if ones > zeros:
99
            recovered_bits.append(1)
100
        else:
101
            recovered_bits.append(0)
102
        if ones != 0 and zeros != 0:
103
            errors += 1
104
        else:
105
            unanimous += 1
106
 
107
    out = bytearray()
108
    for i in range(0, len(recovered_bits), 8):
109
        byte = 0
110
        for j in range(8):
111
            if i + j < len(recovered_bits):
112
                byte = (byte << 1) | recovered_bits[i + j]
113
            else:
114
                byte = byte << 1
115
        out.append(byte)
116
 
117
    confidence = unanimous / n_payload_bits if n_payload_bits else 0.0
118
    return bytes(out), confidence, errors
119
 
120
 
121
def mark_id_to_ecc_bits(mark_id: bytes, repetitions: int = 3) -> list[int]:
122
    """Convenience: encode a mark_id into ECC-protected bits."""
123
    return encode(mark_id, repetitions)
124
 
125
 
126
def ecc_bits_to_mark_id(
127
    bits: list[int],
128
    mark_len: int = 8,
129
    repetitions: int = 3,
130
) -> tuple[bytes, float, int]:
131
    """Convenience: decode ECC bits back to mark_id with error stats."""
132
    return decode(bits, mark_len, repetitions)
133
 
134
 
135
def verify_with_ecc(
136
    observed_variant_indices: list[int],
137
    candidate_mark_id: bytes,
138
    class_size: int = 3,
139
    repetitions: int = 3,
140
) -> tuple[bool, float, bytes]:
141
    """
142
    Verify a candidate mark_id against observed synonym choices using ECC.
143
 
144
    Instead of the old threshold-based matching, this:
145
    1. Encodes the candidate mark_id into ECC bits
146
    2. Maps the candidate's expected variant sequence
147
    3. Compares expected vs observed, producing received bits
148
    4. Decodes via ECC majority vote
149
    5. Checks if decoded payload matches candidate
150
 
151
    Returns:
152
        (match, confidence, decoded_mark_id)
153
    """
154
    from .semantic import _mark_id_to_variant_sequence
155
 
156
    n_instances = len(observed_variant_indices)
157
    if n_instances == 0:
158
        return False, 0.0, b""
159
 
160
    expected_variants = _mark_id_to_variant_sequence(
161
        candidate_mark_id, n_instances, class_size
162
    )
163
 
164
    received_bits = []
165
    for obs, exp in zip(observed_variant_indices, expected_variants):
166
        obs_mod = obs % class_size
167
        exp_mod = exp % class_size
168
        received_bits.append(1 if obs_mod == exp_mod else 0)
169
 
170
    match_ratio = sum(received_bits) / len(received_bits) if received_bits else 0.0
171
 
172
    if len(received_bits) >= len(candidate_mark_id) * 8 * repetitions:
173
        decoded, confidence, errors = decode(
174
            received_bits, len(candidate_mark_id), repetitions
175
        )
176
        match = (decoded == candidate_mark_id) and confidence >= 0.5
177
        return match, confidence, decoded
178
 
179
    return match_ratio >= 0.70, match_ratio, candidate_mark_id