Zion Boggan
repos/JWT Differential Fuzzer/orchestrator/differ.py
zionboggan.com ↗
133 lines · python
History for this file →
1
"""Schism differential JWT verifier.
2
 
3
Submits each corpus case to every running target. Flags any disagreement
4
in the `valid` field. Errors are bucketed (not literal-string compared) so
5
different wording across libs doesn't cause false positives.
6
"""
7
import argparse
8
import asyncio
9
import json
10
import sys
11
import time
12
from pathlib import Path
13
 
14
import aiohttp
15
 
16
TARGETS = {
17
    "nodejwt": "http://localhost:7001/verify",
18
    "pyjwt":   "http://localhost:7002/verify",
19
    "pyjose":  "http://localhost:7003/verify",
20
    "panva":   "http://localhost:7004/verify",
21
    "gojwt":   "http://localhost:7005/verify",
22
}
23
 
24
async def submit_one(session, name, url, payload):
25
    try:
26
        async with session.post(url, json=payload, timeout=10) as r:
27
            return name, await r.json()
28
    except Exception as e:
29
        return name, {"valid": None, "error": f"transport: {e}", "lib": name}
30
 
31
async def submit_all(session, payload, target_filter=None):
32
    tasks = [
33
        submit_one(session, n, u, payload)
34
        for n, u in TARGETS.items()
35
        if not target_filter or n in target_filter
36
    ]
37
    results = await asyncio.gather(*tasks)
38
    return dict(results)
39
 
40
def disagreement(results):
41
    verdicts = {n: r.get("valid") for n, r in results.items()}
42
    distinct = set(v for v in verdicts.values() if v is not None)
43
    return len(distinct) > 1
44
 
45
async def health_check(session):
46
    """Probe each target. None => unreachable."""
47
    healthy = []
48
    for n, url in TARGETS.items():
49
        try:
50
            async with session.post(url, json={"token": "", "key": "", "algs": []}, timeout=3) as r:
51
                if r.status == 200:
52
                    healthy.append(n)
53
        except Exception:
54
            pass
55
    return healthy
56
 
57
async def run(corpus_path, output_path, only=None):
58
    cases = json.loads(Path(corpus_path).read_text())
59
    out_f = open(output_path, "w") if output_path else None
60
    flagged = 0
61
 
62
    async with aiohttp.ClientSession() as session:
63
        healthy = await health_check(session)
64
        sys.stderr.write(f"[schism] {len(healthy)}/{len(TARGETS)} targets up: {healthy}\n")
65
        if not healthy:
66
            sys.stderr.write("[schism] no targets reachable. did you `docker compose up -d`?\n")
67
            return 2
68
 
69
        for case in cases:
70
            if only and case["id"] != only:
71
                continue
72
            payload = {
73
                "token": case["token"],
74
                "key":   case["key"],
75
                "algs":  case["algs"],
76
            }
77
            results = await submit_all(session, payload, target_filter=set(healthy))
78
            disagree = disagreement(results)
79
            sev = case.get("severity", "?")
80
            expected = case.get("expected_unanimous", "reject")
81
            verdicts = {n: r.get("valid") for n, r in results.items()}
82
            row = {
83
                "id": case["id"],
84
                "class": case.get("class"),
85
                "severity": sev,
86
                "expected": expected,
87
                "verdicts": verdicts,
88
                "errors":   {n: r.get("error") for n, r in results.items()},
89
                "claims":   {n: r.get("claims") for n, r in results.items()},
90
                "disagree": disagree,
91
                "ts": time.time(),
92
            }
93
            if out_f:
94
                out_f.write(json.dumps(row) + "\n")
95
 
96
            if disagree:
97
                flagged += 1
98
                acceptors = [n for n, v in verdicts.items() if v is True]
99
                rejectors = [n for n, v in verdicts.items() if v is False]
100
 
101
                tag = "BYPASS" if (expected == "reject" and acceptors)\
102
                      else "DOS"  if (expected == "accept" and rejectors)\
103
                      else "SPLIT"
104
                sys.stderr.write(
105
                    f"[{tag:^6}] {case['id']:36s} sev={sev:12s} "
106
                    f"accept={acceptors}  reject={rejectors}\n"
107
                )
108
            else:
109
                v = next(iter(set(verdicts.values())), None)
110
                tag = "accept" if v is True else "reject" if v is False else "?"
111
                match = " " if tag == expected else "!"
112
                sys.stderr.write(
113
                    f"[ ok ]{match}{case['id']:36s} all-{tag:6s} expected={expected}\n"
114
                )
115
 
116
    if out_f:
117
        out_f.close()
118
    sys.stderr.write(f"[schism] done. flagged {flagged}/{len(cases)} cases\n")
119
    return 0 if flagged == 0 else 1
120
 
121
def main():
122
    ap = argparse.ArgumentParser()
123
    ap.add_argument("--corpus", default="corpus/seed.json")
124
    ap.add_argument("--out", default=None,
125
                    help="findings/<ts>.jsonl (default: auto)")
126
    ap.add_argument("--only", default=None, help="run a single case by id")
127
    args = ap.parse_args()
128
    if args.out is None:
129
        args.out = f"findings/{int(time.time())}.jsonl"
130
    sys.exit(asyncio.run(run(args.corpus, args.out, args.only)))
131
 
132
if __name__ == "__main__":
133
    main()