| | @@ -44,6 +44,7 @@ pub struct RegistryIntegrityReport { |
| | pub negative_event_tlog_indexes: i64, |
| | pub events_without_tlog_index: i64, |
| | pub event_tlog_indexes_out_of_range: i64, |
| + | pub event_tlog_leaf_mismatches: i64, |
| | pub tlog_size: Option<usize>, |
| | pub malformed_manifest_json: i64, |
| | pub invalid_manifest_signatures: i64, |
| | @@ -232,8 +233,9 @@ pub async fn migrate_from_sqlite( |
| | |
| | pub async fn validate_registry_integrity( |
| | pool: &SqlitePool, |
| - | tlog_size: Option<usize>, |
| + | tlog: Option<&oversight_tlog::TransparencyLog>, |
| | ) -> Result<RegistryIntegrityReport> { |
| + | let tlog_size = tlog.map(|log| log.size()); |
| | let counts = registry_counts(pool).await?; |
| | let orphan_beacons = count_query( |
| | pool, |
| | @@ -315,6 +317,37 @@ pub async fn validate_registry_integrity( |
| | .filter(|metadata| serde_json::from_str::<serde_json::Value>(metadata).is_err()) |
| | .count() as i64; |
| | |
| + | let event_rows: Vec<EventRow> = sqlx::query_as( |
| + | "SELECT id, token_id, file_id, recipient_id, issuer_id, kind, source_ip, user_agent, extra, timestamp, qualified_timestamp, tlog_index FROM events", |
| + | ) |
| + | .fetch_all(pool) |
| + | .await?; |
| + | let mut event_tlog_leaf_mismatches = 0; |
| + | if let Some(log) = tlog { |
| + | for event in &event_rows { |
| + | let Some(idx) = event.tlog_index else { |
| + | continue; |
| + | }; |
| + | if idx < 0 || idx as usize >= log.size() { |
| + | continue; |
| + | } |
| + | let Some(record) = log |
| + | .leaf_record(idx as usize) |
| + | .map_err(|e| RegistryError::Internal(format!("tlog leaf read failed: {e}")))? |
| + | else { |
| + | event_tlog_leaf_mismatches += 1; |
| + | continue; |
| + | }; |
| + | let Ok(leaf) = serde_json::from_str::<serde_json::Value>(&record.leaf_data) else { |
| + | event_tlog_leaf_mismatches += 1; |
| + | continue; |
| + | }; |
| + | if !event_matches_tlog_leaf(event, &leaf) { |
| + | event_tlog_leaf_mismatches += 1; |
| + | } |
| + | } |
| + | } |
| + | |
| | let mut malformed_manifest_json = 0; |
| | let mut invalid_manifest_signatures = 0; |
| | let mut mismatched_manifest_file_ids = 0; |
| | @@ -352,6 +385,7 @@ pub async fn validate_registry_integrity( |
| | && negative_event_tlog_indexes == 0 |
| | && events_without_tlog_index == 0 |
| | && event_tlog_indexes_out_of_range == 0 |
| + | && event_tlog_leaf_mismatches == 0 |
| | && malformed_manifest_json == 0 |
| | && invalid_manifest_signatures == 0 |
| | && mismatched_manifest_file_ids == 0; |
| | @@ -372,6 +406,7 @@ pub async fn validate_registry_integrity( |
| | negative_event_tlog_indexes, |
| | events_without_tlog_index, |
| | event_tlog_indexes_out_of_range, |
| + | event_tlog_leaf_mismatches, |
| | tlog_size, |
| | malformed_manifest_json, |
| | invalid_manifest_signatures, |
| | @@ -379,6 +414,43 @@ pub async fn validate_registry_integrity( |
| | }) |
| | } |
| | |
| + | fn event_matches_tlog_leaf(event: &EventRow, leaf: &serde_json::Value) -> bool { |
| + | let user_agent_matches = |
| + | event.kind == "dns" || json_opt_str(leaf, "user_agent", event.user_agent.as_deref()); |
| + | leaf.get("event").and_then(|v| v.as_str()) == Some("beacon") |
| + | && leaf.get("kind").and_then(|v| v.as_str()) == Some(event.kind.as_str()) |
| + | && leaf.get("token_id").and_then(|v| v.as_str()) == Some(event.token_id.as_str()) |
| + | && json_opt_str(leaf, "file_id", event.file_id.as_deref()) |
| + | && json_opt_str(leaf, "recipient_id", event.recipient_id.as_deref()) |
| + | && json_opt_str(leaf, "source_ip", event.source_ip.as_deref()) |
| + | && user_agent_matches |
| + | && json_opt_str(leaf, "timestamp", event.qualified_timestamp.as_deref()) |
| + | && dns_extra_matches_tlog_leaf(event, leaf) |
| + | } |
| + | |
| + | fn dns_extra_matches_tlog_leaf(event: &EventRow, leaf: &serde_json::Value) -> bool { |
| + | if event.kind != "dns" { |
| + | return true; |
| + | } |
| + | let extra = event |
| + | .extra |
| + | .as_deref() |
| + | .and_then(|raw| serde_json::from_str::<serde_json::Value>(raw).ok()) |
| + | .unwrap_or_else(|| serde_json::json!({})); |
| + | json_opt_str(leaf, "qname", extra.get("qname").and_then(|v| v.as_str())) |
| + | && json_opt_str(leaf, "qtype", extra.get("qtype").and_then(|v| v.as_str())) |
| + | } |
| + | |
| + | fn json_opt_str(value: &serde_json::Value, key: &str, expected: Option<&str>) -> bool { |
| + | match expected { |
| + | Some(s) => value.get(key).and_then(|v| v.as_str()) == Some(s), |
| + | None => match value.get(key) { |
| + | Some(v) => v.is_null(), |
| + | None => true, |
| + | }, |
| + | } |
| + | } |
| + | |
| | async fn registry_counts(pool: &SqlitePool) -> Result<RegistryCounts> { |
| | Ok(RegistryCounts { |
| | manifests: count_query(pool, "SELECT COUNT(*) FROM manifests").await?, |
| | @@ -978,12 +1050,43 @@ mod tests { |
| | assert_eq!(report.negative_event_tlog_indexes, 0); |
| | assert_eq!(report.events_without_tlog_index, 0); |
| | assert_eq!(report.event_tlog_indexes_out_of_range, 0); |
| + | assert_eq!(report.event_tlog_leaf_mismatches, 0); |
| | assert_eq!(report.tlog_size, None); |
| | |
| | pool.close().await; |
| | let _ = std::fs::remove_dir_all(dir); |
| | } |
| | |
| + | #[test] |
| + | fn event_leaf_matching_accepts_dns_without_user_agent() { |
| + | let event = EventRow { |
| + | id: 1, |
| + | token_id: "token-1".into(), |
| + | file_id: Some("file-1".into()), |
| + | recipient_id: Some("recipient-1".into()), |
| + | issuer_id: Some("issuer-1".into()), |
| + | kind: "dns".into(), |
| + | source_ip: Some("198.51.100.10".into()), |
| + | user_agent: Some(String::new()), |
| + | extra: Some(r#"{"qname":"b.example","qtype":"A"}"#.into()), |
| + | timestamp: 1, |
| + | qualified_timestamp: Some("2026-05-24T00:00:00Z".into()), |
| + | tlog_index: Some(0), |
| + | }; |
| + | let leaf = serde_json::json!({ |
| + | "event": "beacon", |
| + | "kind": "dns", |
| + | "token_id": "token-1", |
| + | "file_id": "file-1", |
| + | "recipient_id": "recipient-1", |
| + | "source_ip": "198.51.100.10", |
| + | "qname": "b.example", |
| + | "qtype": "A", |
| + | "timestamp": "2026-05-24T00:00:00Z", |
| + | }); |
| + | assert!(event_matches_tlog_leaf(&event, &leaf)); |
| + | } |
| + | |
| | #[tokio::test] |
| | async fn validate_registry_integrity_reports_bad_rows() { |
| | let dir = temp_dir("validate-bad"); |
| | @@ -992,6 +1095,13 @@ mod tests { |
| | let pool = create_pool(&db_path).await.unwrap(); |
| | run_migrations(&pool).await.unwrap(); |
| | seed_source(&pool).await; |
| + | let tlog = oversight_tlog::TransparencyLog::open(dir.join("tlog")).unwrap(); |
| + | tlog.append_event(&serde_json::json!({ |
| + | "event": "beacon", |
| + | "kind": "dns", |
| + | "token_id": "different-token", |
| + | })) |
| + | .unwrap(); |
| | |
| | sqlx::query( |
| | "INSERT INTO manifests (file_id, recipient_id, issuer_id, issuer_ed25519_pub, manifest_json, registered_at) VALUES (?, ?, ?, ?, ?, ?)", |
| | @@ -1059,6 +1169,22 @@ mod tests { |
| | ) |
| | .await |
| | .unwrap(); |
| + | insert_event( |
| + | &pool, |
| + | "token-mismatch", |
| + | Some("file-1"), |
| + | Some("recipient-1"), |
| + | Some("issuer-1"), |
| + | "dns", |
| + | Some("127.0.0.1"), |
| + | Some("agent"), |
| + | Some(r#"{"qtype":"A"}"#), |
| + | 24, |
| + | Some("2026-05-24T00:00:00Z"), |
| + | Some(0), |
| + | ) |
| + | .await |
| + | .unwrap(); |
| | sqlx::query( |
| | "INSERT INTO corpus (file_id, hash_kind, hash_value, metadata, registered_at) VALUES (?, ?, ?, ?, ?)", |
| | ) |
| | @@ -1071,7 +1197,9 @@ mod tests { |
| | .await |
| | .unwrap(); |
| | |
| - | let report = validate_registry_integrity(&pool, Some(1)).await.unwrap(); |
| + | let report = validate_registry_integrity(&pool, Some(&tlog)) |
| + | .await |
| + | .unwrap(); |
| | assert!(!report.ok); |
| | assert_eq!(report.orphan_beacons, 1); |
| | assert_eq!(report.orphan_watermarks, 1); |
| | @@ -1084,6 +1212,7 @@ mod tests { |
| | assert_eq!(report.negative_event_tlog_indexes, 1); |
| | assert_eq!(report.events_without_tlog_index, 1); |
| | assert_eq!(report.event_tlog_indexes_out_of_range, 2); |
| + | assert_eq!(report.event_tlog_leaf_mismatches, 1); |
| | assert_eq!(report.tlog_size, Some(1)); |
| | |
| | pool.close().await; |