| | @@ -10,6 +10,19 @@ use std::str::FromStr; |
| | use crate::error::{RegistryError, Result}; |
| | use crate::models::*; |
| | |
| + | const MIGRATED_TABLES: &[&str] = &["manifests", "beacons", "watermarks", "events", "corpus"]; |
| + | |
| + | #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] |
| + | pub struct MigrationReport { |
| + | pub source: String, |
| + | pub dry_run: bool, |
| + | pub manifests: i64, |
| + | pub beacons: i64, |
| + | pub watermarks: i64, |
| + | pub events: i64, |
| + | pub corpus: i64, |
| + | } |
| + | |
| | /// Create a SQLite connection pool with WAL mode and sensible defaults. |
| | pub async fn create_pool(db_path: &Path) -> Result<SqlitePool> { |
| | // Ensure parent directory exists. |
| | @@ -132,6 +145,169 @@ pub async fn run_migrations(pool: &SqlitePool) -> Result<()> { |
| | Ok(()) |
| | } |
| | |
| + | /// Copy rows from the Python reference registry SQLite database into the Rust |
| + | /// registry schema. The schemas are intentionally identical for these tables. |
| + | pub async fn migrate_from_sqlite( |
| + | pool: &SqlitePool, |
| + | source_path: &Path, |
| + | dry_run: bool, |
| + | ) -> Result<MigrationReport> { |
| + | if !source_path.is_file() { |
| + | return Err(RegistryError::BadRequest(format!( |
| + | "migration source is not a file: {}", |
| + | source_path.display() |
| + | ))); |
| + | } |
| + | |
| + | let source = source_path |
| + | .canonicalize() |
| + | .map_err(|e| RegistryError::BadRequest(format!("cannot resolve migration source: {e}")))?; |
| + | let source_uri = source.to_string_lossy().to_string(); |
| + | |
| + | let mut conn = pool.acquire().await?; |
| + | sqlx::query("ATTACH DATABASE ? AS source_registry") |
| + | .bind(&source_uri) |
| + | .execute(&mut *conn) |
| + | .await?; |
| + | |
| + | let result = async { |
| + | validate_source_schema(&mut conn).await?; |
| + | let report = MigrationReport { |
| + | source: source_uri, |
| + | dry_run, |
| + | manifests: source_count(&mut conn, "manifests").await?, |
| + | beacons: source_count(&mut conn, "beacons").await?, |
| + | watermarks: source_count(&mut conn, "watermarks").await?, |
| + | events: source_count(&mut conn, "events").await?, |
| + | corpus: source_count(&mut conn, "corpus").await?, |
| + | }; |
| + | |
| + | if dry_run { |
| + | return Ok(report); |
| + | } |
| + | |
| + | sqlx::query("BEGIN IMMEDIATE").execute(&mut *conn).await?; |
| + | let copy_result = copy_attached_source(&mut conn).await; |
| + | match copy_result { |
| + | Ok(()) => { |
| + | sqlx::query("COMMIT").execute(&mut *conn).await?; |
| + | Ok(report) |
| + | } |
| + | Err(err) => { |
| + | let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await; |
| + | Err(err) |
| + | } |
| + | } |
| + | } |
| + | .await; |
| + | |
| + | let detach_result = sqlx::query("DETACH DATABASE source_registry") |
| + | .execute(&mut *conn) |
| + | .await; |
| + | if let Err(err) = detach_result { |
| + | return Err(RegistryError::Database(err)); |
| + | } |
| + | result |
| + | } |
| + | |
| + | async fn validate_source_schema(conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>) -> Result<()> { |
| + | for table in MIGRATED_TABLES { |
| + | let exists: Option<(String,)> = sqlx::query_as( |
| + | "SELECT name FROM source_registry.sqlite_master WHERE type = 'table' AND name = ?", |
| + | ) |
| + | .bind(table) |
| + | .fetch_optional(&mut **conn) |
| + | .await?; |
| + | if exists.is_none() { |
| + | return Err(RegistryError::BadRequest(format!( |
| + | "migration source missing required table: {table}" |
| + | ))); |
| + | } |
| + | } |
| + | Ok(()) |
| + | } |
| + | |
| + | async fn source_count( |
| + | conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>, |
| + | table: &str, |
| + | ) -> Result<i64> { |
| + | let sql = match table { |
| + | "manifests" => "SELECT COUNT(*) FROM source_registry.manifests", |
| + | "beacons" => "SELECT COUNT(*) FROM source_registry.beacons", |
| + | "watermarks" => "SELECT COUNT(*) FROM source_registry.watermarks", |
| + | "events" => "SELECT COUNT(*) FROM source_registry.events", |
| + | "corpus" => "SELECT COUNT(*) FROM source_registry.corpus", |
| + | _ => { |
| + | return Err(RegistryError::Internal( |
| + | "unsupported migration table".into(), |
| + | )) |
| + | } |
| + | }; |
| + | let (count,): (i64,) = sqlx::query_as(sql).fetch_one(&mut **conn).await?; |
| + | Ok(count) |
| + | } |
| + | |
| + | async fn copy_attached_source(conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>) -> Result<()> { |
| + | sqlx::query( |
| + | r#" |
| + | INSERT OR REPLACE INTO manifests |
| + | (file_id, recipient_id, issuer_id, issuer_ed25519_pub, manifest_json, registered_at) |
| + | SELECT file_id, recipient_id, issuer_id, issuer_ed25519_pub, manifest_json, registered_at |
| + | FROM source_registry.manifests |
| + | "#, |
| + | ) |
| + | .execute(&mut **conn) |
| + | .await?; |
| + | |
| + | sqlx::query( |
| + | r#" |
| + | INSERT OR REPLACE INTO beacons |
| + | (token_id, file_id, recipient_id, issuer_id, kind, registered_at) |
| + | SELECT token_id, file_id, recipient_id, issuer_id, kind, registered_at |
| + | FROM source_registry.beacons |
| + | "#, |
| + | ) |
| + | .execute(&mut **conn) |
| + | .await?; |
| + | |
| + | sqlx::query( |
| + | r#" |
| + | INSERT OR REPLACE INTO watermarks |
| + | (mark_id, layer, file_id, recipient_id, issuer_id, registered_at) |
| + | SELECT mark_id, layer, file_id, recipient_id, issuer_id, registered_at |
| + | FROM source_registry.watermarks |
| + | "#, |
| + | ) |
| + | .execute(&mut **conn) |
| + | .await?; |
| + | |
| + | sqlx::query( |
| + | r#" |
| + | INSERT OR REPLACE INTO events |
| + | (id, token_id, file_id, recipient_id, issuer_id, kind, source_ip, |
| + | user_agent, extra, timestamp, qualified_timestamp, tlog_index) |
| + | SELECT id, token_id, file_id, recipient_id, issuer_id, kind, source_ip, |
| + | user_agent, extra, timestamp, qualified_timestamp, tlog_index |
| + | FROM source_registry.events |
| + | "#, |
| + | ) |
| + | .execute(&mut **conn) |
| + | .await?; |
| + | |
| + | sqlx::query( |
| + | r#" |
| + | INSERT OR REPLACE INTO corpus |
| + | (file_id, hash_kind, hash_value, metadata, registered_at) |
| + | SELECT file_id, hash_kind, hash_value, metadata, registered_at |
| + | FROM source_registry.corpus |
| + | "#, |
| + | ) |
| + | .execute(&mut **conn) |
| + | .await?; |
| + | |
| + | Ok(()) |
| + | } |
| + | |
| | // ---- Manifest queries --------------------------------------------------- |
| | |
| | /// Look up the issuer pubkey for an existing file_id. Returns None if not found. |
| | @@ -418,3 +594,175 @@ pub async fn get_semantic_candidates( |
| | }; |
| | Ok(rows) |
| | } |
| + | |
| + | #[cfg(test)] |
| + | mod tests { |
| + | use super::*; |
| + | use std::path::PathBuf; |
| + | |
| + | fn temp_dir(label: &str) -> PathBuf { |
| + | let unique = format!( |
| + | "oversight-registry-migrate-{label}-{}", |
| + | std::time::SystemTime::now() |
| + | .duration_since(std::time::UNIX_EPOCH) |
| + | .unwrap() |
| + | .as_nanos() |
| + | ); |
| + | std::env::temp_dir().join(unique) |
| + | } |
| + | |
| + | async fn seed_source(pool: &SqlitePool) { |
| + | upsert_manifest( |
| + | pool, |
| + | "file-1", |
| + | "recipient-1", |
| + | "issuer-1", |
| + | &"ab".repeat(32), |
| + | r#"{"file_id":"file-1"}"#, |
| + | 10, |
| + | ) |
| + | .await |
| + | .unwrap(); |
| + | upsert_beacon( |
| + | pool, |
| + | "token-1", |
| + | "file-1", |
| + | "recipient-1", |
| + | "issuer-1", |
| + | "dns", |
| + | 10, |
| + | ) |
| + | .await |
| + | .unwrap(); |
| + | upsert_watermark( |
| + | pool, |
| + | "mark-1", |
| + | "L1_zero_width", |
| + | "file-1", |
| + | "recipient-1", |
| + | "issuer-1", |
| + | 10, |
| + | ) |
| + | .await |
| + | .unwrap(); |
| + | insert_event( |
| + | pool, |
| + | "token-1", |
| + | Some("file-1"), |
| + | Some("recipient-1"), |
| + | Some("issuer-1"), |
| + | "dns", |
| + | Some("198.51.100.10"), |
| + | Some("agent"), |
| + | Some(r#"{"qtype":"A"}"#), |
| + | 11, |
| + | Some("2026-05-17T00:00:00Z"), |
| + | Some(7), |
| + | ) |
| + | .await |
| + | .unwrap(); |
| + | sqlx::query( |
| + | "INSERT INTO corpus (file_id, hash_kind, hash_value, metadata, registered_at) VALUES (?, ?, ?, ?, ?)", |
| + | ) |
| + | .bind("file-1") |
| + | .bind("perceptual") |
| + | .bind("phash-1") |
| + | .bind(r#"{"source":"fixture"}"#) |
| + | .bind(12_i64) |
| + | .execute(pool) |
| + | .await |
| + | .unwrap(); |
| + | } |
| + | |
| + | #[tokio::test] |
| + | async fn migrate_from_sqlite_copies_python_registry_tables() { |
| + | let source_dir = temp_dir("source"); |
| + | let dest_dir = temp_dir("dest"); |
| + | std::fs::create_dir_all(&source_dir).unwrap(); |
| + | std::fs::create_dir_all(&dest_dir).unwrap(); |
| + | let source_path = source_dir.join("registry.sqlite"); |
| + | let dest_path = dest_dir.join("registry.sqlite"); |
| + | |
| + | let source_pool = create_pool(&source_path).await.unwrap(); |
| + | run_migrations(&source_pool).await.unwrap(); |
| + | seed_source(&source_pool).await; |
| + | source_pool.close().await; |
| + | |
| + | let dest_pool = create_pool(&dest_path).await.unwrap(); |
| + | run_migrations(&dest_pool).await.unwrap(); |
| + | let report = migrate_from_sqlite(&dest_pool, &source_path, false) |
| + | .await |
| + | .unwrap(); |
| + | |
| + | assert_eq!(report.manifests, 1); |
| + | assert_eq!(report.beacons, 1); |
| + | assert_eq!(report.watermarks, 1); |
| + | assert_eq!(report.events, 1); |
| + | assert_eq!(report.corpus, 1); |
| + | |
| + | assert!(get_manifest(&dest_pool, "file-1").await.unwrap().is_some()); |
| + | assert!(get_beacon(&dest_pool, "token-1").await.unwrap().is_some()); |
| + | assert!(get_watermark(&dest_pool, "mark-1", None) |
| + | .await |
| + | .unwrap() |
| + | .is_some()); |
| + | assert_eq!( |
| + | lookup_by_perceptual_hash(&dest_pool, "phash-1") |
| + | .await |
| + | .unwrap() |
| + | .unwrap() |
| + | .0, |
| + | "file-1" |
| + | ); |
| + | let event_row: (i64, Option<String>) = |
| + | sqlx::query_as("SELECT id, extra FROM events WHERE token_id = ?") |
| + | .bind("token-1") |
| + | .fetch_one(&dest_pool) |
| + | .await |
| + | .unwrap(); |
| + | assert_eq!(event_row.0, 1); |
| + | assert_eq!(event_row.1.as_deref(), Some(r#"{"qtype":"A"}"#)); |
| + | let corpus_metadata: (Option<String>,) = |
| + | sqlx::query_as("SELECT metadata FROM corpus WHERE hash_value = ?") |
| + | .bind("phash-1") |
| + | .fetch_one(&dest_pool) |
| + | .await |
| + | .unwrap(); |
| + | assert_eq!( |
| + | corpus_metadata.0.as_deref(), |
| + | Some(r#"{"source":"fixture"}"#) |
| + | ); |
| + | |
| + | dest_pool.close().await; |
| + | let _ = std::fs::remove_dir_all(source_dir); |
| + | let _ = std::fs::remove_dir_all(dest_dir); |
| + | } |
| + | |
| + | #[tokio::test] |
| + | async fn migrate_from_sqlite_dry_run_only_counts_rows() { |
| + | let source_dir = temp_dir("dry-source"); |
| + | let dest_dir = temp_dir("dry-dest"); |
| + | std::fs::create_dir_all(&source_dir).unwrap(); |
| + | std::fs::create_dir_all(&dest_dir).unwrap(); |
| + | let source_path = source_dir.join("registry.sqlite"); |
| + | let dest_path = dest_dir.join("registry.sqlite"); |
| + | |
| + | let source_pool = create_pool(&source_path).await.unwrap(); |
| + | run_migrations(&source_pool).await.unwrap(); |
| + | seed_source(&source_pool).await; |
| + | source_pool.close().await; |
| + | |
| + | let dest_pool = create_pool(&dest_path).await.unwrap(); |
| + | run_migrations(&dest_pool).await.unwrap(); |
| + | let report = migrate_from_sqlite(&dest_pool, &source_path, true) |
| + | .await |
| + | .unwrap(); |
| + | assert!(report.dry_run); |
| + | assert_eq!(report.events, 1); |
| + | assert!(get_manifest(&dest_pool, "file-1").await.unwrap().is_none()); |
| + | |
| + | dest_pool.close().await; |
| + | let _ = std::fs::remove_dir_all(source_dir); |
| + | let _ = std::fs::remove_dir_all(dest_dir); |
| + | } |
| + | } |