coven/sync/
cycle.rs

1//! Sync cycle orchestration.
2//!
3//! Contains the logic for running a single sync cycle (push local changes,
4//! pull remote changes, manage snapshots) and for initializing sync
5//! infrastructure.
6
7use std::path::PathBuf;
8
9use tracing::{info, warn};
10
11use crate::blob::{BlobPlan, BlobUploadObserver};
12use crate::changeset::RowChange;
13use crate::clock::ClockRef;
14use crate::config::Config;
15use crate::db::{RawDbHandle, SyncBookkeeping};
16use crate::encryption::EncryptionService;
17use crate::keys::{KeyService, UserKeypair};
18use crate::library_dir::LibraryDir;
19use crate::storage::cloud::CloudHome;
20
21use super::encrypted_storage::EncryptedSyncStorage;
22use super::hlc::{Hlc, Timestamp};
23use super::service::SyncService;
24use super::session::SyncSession;
25use super::storage::SyncStorage;
26
27/// Result of a single sync cycle.
28pub struct SyncCycleResult {
29    /// Number of remote changesets that were applied.
30    pub changesets_applied: u64,
31    /// Number of other devices seen in the sync storage.
32    pub other_device_count: usize,
33    /// RFC 3339 timestamp of when this cycle completed.
34    pub sync_time: String,
35    /// Asset downloads failed — cursor not advanced for those changesets.
36    pub asset_downloads_failed: bool,
37    /// Row changes from applied changesets, for the host to map to domain events.
38    pub row_changes: Vec<RowChange>,
39}
40
41/// Path for staging outgoing changeset bytes that survived a push failure.
42pub fn staging_path(library_dir: &LibraryDir) -> PathBuf {
43    library_dir.join("sync_staging.bin")
44}
45
46/// Stage outgoing changeset bytes to disk before pushing.
47pub fn stage_changeset(library_dir: &LibraryDir, packed: &[u8]) {
48    if let Err(e) = std::fs::write(staging_path(library_dir), packed) {
49        warn!("Failed to stage outgoing changeset: {e}");
50    }
51}
52
53/// Clear the staged changeset after a successful push.
54pub fn clear_staged_changeset(library_dir: &LibraryDir) {
55    let _ = std::fs::remove_file(staging_path(library_dir));
56}
57
58/// Read a previously staged changeset (if any) for retry.
59pub fn read_staged_changeset(library_dir: &LibraryDir) -> Option<Vec<u8>> {
60    let path = staging_path(library_dir);
61    if path.exists() {
62        match std::fs::read(&path) {
63            Ok(data) if !data.is_empty() => Some(data),
64            Ok(_) => {
65                clear_staged_changeset(library_dir);
66                None
67            }
68            Err(e) => {
69                warn!("Failed to read staged changeset: {e}");
70                clear_staged_changeset(library_dir);
71                None
72            }
73        }
74    } else {
75        None
76    }
77}
78
79/// Push a changeset to the sync storage and update the device head.
80pub async fn push_changeset(
81    storage: &dyn SyncStorage,
82    device_id: &str,
83    seq: u64,
84    packed: Vec<u8>,
85    snapshot_seq: Option<u64>,
86    timestamp: &str,
87) -> Result<(), super::storage::StorageError> {
88    storage.put_changeset(device_id, seq, packed).await?;
89    storage
90        .put_head(device_id, seq, snapshot_seq, timestamp)
91        .await?;
92    Ok(())
93}
94
95/// Outcome of a sync cycle attempt.
96pub enum SyncCycleOutcome {
97    /// Cycle completed successfully. Contains the result and a new session.
98    Ok(SyncCycleResult, SyncSession),
99    /// Cycle failed but we recovered a session for next time.
100    ErrWithSession(String, SyncSession),
101    /// Cycle failed and we couldn't recover a session either.
102    ErrNoSession(String),
103}
104
105/// Run a single sync cycle: grab changeset, push, pull, restart session.
106///
107/// This manages all the state (local_seq, cursors, staging, snapshots) by
108/// loading/persisting from the database each cycle, rather than keeping
109/// mutable state across calls.
110///
111/// Always tries to return a usable session, even on error.
112///
113/// # Safety
114///
115/// `raw_db` must be a valid sqlite3 write connection pointer that outlives
116/// this call. `session` is consumed and a new one is started on `raw_db`.
117pub async unsafe fn run_single_sync_cycle(
118    storage: &dyn SyncStorage,
119    device_id: &str,
120    hlc: &Hlc,
121    clock: &dyn crate::clock::Clock,
122    raw_db: *mut libsqlite3_sys::sqlite3,
123    session: SyncSession,
124    encryption: &std::sync::RwLock<EncryptionService>,
125    user_keypair: &UserKeypair,
126    db: &dyn SyncBookkeeping,
127    library_dir: &LibraryDir,
128    cloud_home: Option<&dyn CloudHome>,
129    blob_plan: &dyn BlobPlan,
130    observer: Option<&dyn BlobUploadObserver>,
131) -> SyncCycleOutcome {
132    let sync_service = SyncService::new(device_id.to_string());
133
134    // Helper macro to recover a session on error
135    macro_rules! recover_session_on_err {
136        ($err:expr) => {
137            match unsafe { SyncSession::start(raw_db) } {
138                Ok(s) => return SyncCycleOutcome::ErrWithSession($err, s),
139                Err(se) => {
140                    return SyncCycleOutcome::ErrNoSession(format!(
141                        "{} (also failed to restart session: {se})",
142                        $err
143                    ))
144                }
145            }
146        };
147    }
148
149    // Load persisted sync state — DB errors abort the cycle (a transient
150    // SQLite error must not make us treat the device as brand-new at seq 0).
151    // None (key not set yet) legitimately defaults to 0 / None.
152    let mut local_seq = match db.get_sync_state("local_seq").await {
153        Ok(Some(v)) => match v.parse::<u64>() {
154            Ok(n) => n,
155            Err(e) => {
156                recover_session_on_err!(format!("Corrupt local_seq value: {e}"));
157            }
158        },
159        Ok(None) => 0,
160        Err(e) => {
161            recover_session_on_err!(format!("Failed to read local_seq: {e}"));
162        }
163    };
164
165    let snapshot_seq: Option<u64> = match db.get_sync_state("snapshot_seq").await {
166        Ok(Some(v)) => match v.parse::<u64>() {
167            Ok(n) => Some(n),
168            Err(e) => {
169                recover_session_on_err!(format!("Corrupt snapshot_seq value: {e}"));
170            }
171        },
172        Ok(None) => None,
173        Err(e) => {
174            recover_session_on_err!(format!("Failed to read snapshot_seq: {e}"));
175        }
176    };
177
178    let last_snapshot_time: Option<chrono::DateTime<chrono::Utc>> =
179        match db.get_sync_state("last_snapshot_time").await {
180            Ok(Some(v)) => match chrono::DateTime::parse_from_rfc3339(&v) {
181                Ok(dt) => Some(dt.with_timezone(&chrono::Utc)),
182                Err(e) => {
183                    recover_session_on_err!(format!("Corrupt last_snapshot_time value: {e}"));
184                }
185            },
186            Ok(None) => None,
187            Err(e) => {
188                recover_session_on_err!(format!("Failed to read last_snapshot_time: {e}"));
189            }
190        };
191
192    let staged_seq: Option<u64> = match db.get_sync_state("staged_seq").await {
193        Ok(Some(v)) if v.is_empty() => None,
194        Ok(Some(v)) => match v.parse::<u64>() {
195            Ok(n) => Some(n),
196            Err(e) => {
197                recover_session_on_err!(format!("Corrupt staged_seq value: {e}"));
198            }
199        },
200        Ok(None) => None,
201        Err(e) => {
202            recover_session_on_err!(format!("Failed to read staged_seq: {e}"));
203        }
204    };
205
206    // Retry any staged changeset from a previous failed push
207    if let Some(seq) = staged_seq {
208        if let Some(staged_data) = read_staged_changeset(library_dir) {
209            let timestamp = hlc.now().to_string();
210
211            info!(seq, "Retrying staged changeset push");
212
213            match push_changeset(
214                storage,
215                device_id,
216                seq,
217                staged_data,
218                snapshot_seq,
219                &timestamp,
220            )
221            .await
222            {
223                Ok(()) => {
224                    info!(seq, "Staged changeset push succeeded");
225                    clear_staged_changeset(library_dir);
226                    local_seq = seq;
227
228                    if let Err(e) = db.set_sync_state("local_seq", &seq.to_string()).await {
229                        recover_session_on_err!(format!(
230                            "Failed to persist local_seq after staged push: {e}"
231                        ));
232                    }
233                    if let Err(e) = db.set_sync_state("staged_seq", "").await {
234                        recover_session_on_err!(format!(
235                            "Failed to clear staged_seq after staged push: {e}"
236                        ));
237                    }
238                }
239                Err(e) => {
240                    recover_session_on_err!(format!("Staged changeset push failed: {e}"));
241                }
242            }
243        } else if let Err(e) = db.set_sync_state("staged_seq", "").await {
244            recover_session_on_err!(format!("Failed to clear stale staged_seq: {e}"));
245        }
246    }
247
248    // Process outbox uploads (files must be in cloud before changeset references them)
249    if let Some(ch) = cloud_home {
250        match super::outbox::process_uploads(db, ch, encryption, library_dir.as_ref(), observer)
251            .await
252        {
253            Ok(n) if n > 0 => info!(count = n, "Processed outbox uploads"),
254            Err(e) => warn!("Outbox upload processing error: {e}"),
255            _ => {}
256        }
257    }
258
259    // Check whether we should gate changeset push
260    let has_pending_uploads = match db.has_pending_cloud_uploads().await {
261        Ok(v) => v,
262        Err(e) => {
263            recover_session_on_err!(format!("Failed to check pending cloud uploads: {e}"));
264        }
265    };
266
267    // Load current cursors from DB
268    let cursors = match db.get_all_sync_cursors().await {
269        Ok(c) => c,
270        Err(e) => {
271            recover_session_on_err!(format!("Failed to load sync cursors: {e}"));
272        }
273    };
274
275    let timestamp = hlc.now().to_string();
276
277    // Run the core sync cycle
278    let sync_result = unsafe {
279        sync_service
280            .sync(
281                raw_db,
282                session,
283                local_seq,
284                &cursors,
285                storage,
286                &timestamp,
287                "background sync",
288                user_keypair,
289                library_dir,
290                blob_plan,
291            )
292            .await
293    };
294
295    let sync_result = match sync_result {
296        Ok(r) => r,
297        Err(e) => {
298            recover_session_on_err!(format!("Sync cycle error: {e}"));
299        }
300    };
301
302    // Handle outgoing changeset (push)
303    // Skip push if there are still pending cloud uploads — remote devices
304    // should not learn about releases whose audio files aren't in cloud yet.
305    if has_pending_uploads {
306        if sync_result.outgoing.is_some() {
307            info!("Deferring changeset push: pending cloud uploads remain");
308        }
309    } else if let Some(outgoing) = &sync_result.outgoing {
310        let seq = outgoing.seq;
311
312        // Stage before pushing so bytes survive a push failure
313        stage_changeset(library_dir, &outgoing.packed);
314
315        if let Err(e) = db.set_sync_state("staged_seq", &seq.to_string()).await {
316            recover_session_on_err!(format!("Failed to persist staged_seq before push: {e}"));
317        }
318
319        match push_changeset(
320            storage,
321            device_id,
322            seq,
323            outgoing.packed.clone(),
324            snapshot_seq,
325            &timestamp,
326        )
327        .await
328        {
329            Ok(()) => {
330                clear_staged_changeset(library_dir);
331                local_seq = seq;
332
333                if let Err(e) = db.set_sync_state("local_seq", &seq.to_string()).await {
334                    recover_session_on_err!(format!("Failed to persist local_seq after push: {e}"));
335                }
336                if let Err(e) = db.set_sync_state("staged_seq", "").await {
337                    recover_session_on_err!(format!("Failed to clear staged_seq after push: {e}"));
338                }
339
340                info!(seq, "Pushed changeset");
341            }
342            Err(e) => {
343                warn!(seq, "Push failed, changeset staged for retry: {e}");
344            }
345        }
346    }
347
348    // Persist updated cursors
349    for (cursor_device_id, cursor_seq) in &sync_result.updated_cursors {
350        if let Err(e) = db.set_sync_cursor(cursor_device_id, *cursor_seq).await {
351            warn!(
352                device_id = cursor_device_id,
353                seq = cursor_seq,
354                "Failed to persist sync cursor: {e}"
355            );
356        }
357    }
358
359    // Update HLC with max remote timestamp
360    let max_remote_ts = sync_result
361        .pull
362        .remote_heads
363        .iter()
364        .filter(|h| h.device_id != device_id)
365        .filter_map(|h| h.last_sync.as_deref())
366        .filter_map(
367            |ts_str| match chrono::DateTime::parse_from_rfc3339(ts_str) {
368                Ok(dt) => Some(dt.timestamp_millis().max(0) as u64),
369                Err(e) => {
370                    warn!(
371                        timestamp = ts_str,
372                        "Failed to parse peer HLC timestamp: {e}"
373                    );
374                    None
375                }
376            },
377        )
378        .max();
379
380    if let Some(remote_millis) = max_remote_ts {
381        let remote_ts = Timestamp::new(remote_millis, 0, "remote".to_string());
382        hlc.update(&remote_ts);
383    }
384
385    // Process outbox deletes (safe to delete after all devices have synced past the deletion)
386    if let Some(ch) = cloud_home {
387        let device_head_seqs: Vec<u64> = sync_result
388            .pull
389            .remote_heads
390            .iter()
391            .map(|h| h.seq)
392            .collect();
393
394        match super::outbox::process_deletes(db, ch, &device_head_seqs).await {
395            Ok(n) if n > 0 => info!(count = n, "Processed outbox deletes"),
396            Err(e) => warn!("Outbox delete processing error: {e}"),
397            _ => {}
398        }
399    }
400
401    // Start a new sync session
402    let new_session = match unsafe { SyncSession::start(raw_db) } {
403        Ok(s) => s,
404        Err(e) => {
405            return SyncCycleOutcome::ErrNoSession(format!("Failed to restart sync session: {e}"));
406        }
407    };
408
409    // Check snapshot policy
410    let hours_since = last_snapshot_time.map(|t| {
411        let elapsed = clock.now().signed_duration_since(t);
412        elapsed.num_hours().max(0) as u64
413    });
414
415    // Initial sync: library has data but the session produced no changeset
416    // (data was inserted before the sync session started — e.g., user connected
417    // a cloud provider to an existing library). Push a snapshot so the existing
418    // data reaches the cloud.
419    let is_initial_sync =
420        local_seq == 0 && snapshot_seq.is_none() && sync_result.outgoing.is_none();
421
422    if is_initial_sync
423        || super::snapshot::should_create_snapshot(local_seq, snapshot_seq, hours_since)
424    {
425        if is_initial_sync {
426            info!("Initial sync: pushing snapshot of existing library data");
427        } else {
428            info!("Snapshot policy triggered, creating snapshot");
429        }
430
431        let temp_dir = std::env::temp_dir();
432        let snapshot_result = {
433            let enc = encryption.read().unwrap();
434            unsafe { super::snapshot::create_snapshot(raw_db, &temp_dir, &enc) }
435        };
436
437        match snapshot_result {
438            Ok(encrypted) => {
439                match super::snapshot::push_snapshot(
440                    storage, encrypted, device_id, local_seq, clock,
441                )
442                .await
443                {
444                    Ok(()) => {
445                        if let Err(e) = db
446                            .set_sync_state("snapshot_seq", &local_seq.to_string())
447                            .await
448                        {
449                            recover_session_on_err!(format!("Failed to persist snapshot_seq: {e}"));
450                        }
451                        if let Err(e) = db
452                            .set_sync_state("last_snapshot_time", &clock.now().to_rfc3339())
453                            .await
454                        {
455                            recover_session_on_err!(format!(
456                                "Failed to persist last_snapshot_time: {e}"
457                            ));
458                        }
459
460                        info!(local_seq, "Snapshot created and pushed");
461                    }
462                    Err(e) => {
463                        warn!("Failed to push snapshot: {e}");
464                    }
465                }
466            }
467            Err(e) => {
468                warn!("Failed to create snapshot: {e}");
469            }
470        }
471    }
472
473    // Build status from remote heads
474    let now = clock.now().to_rfc3339();
475    let core_status =
476        super::status::build_sync_status(&sync_result.pull.remote_heads, device_id, Some(&now));
477
478    let other_device_count = core_status.other_devices.len();
479
480    SyncCycleOutcome::Ok(
481        SyncCycleResult {
482            changesets_applied: sync_result.pull.changesets_applied,
483            other_device_count,
484            sync_time: now,
485            asset_downloads_failed: sync_result.pull.asset_downloads_failed,
486            row_changes: sync_result.pull.row_changes,
487        },
488        new_session,
489    )
490}
491
492/// Initialize sync infrastructure from config and credentials.
493///
494/// Initialize sync: create storage, extract raw sqlite3 handle, start session.
495///
496/// Returns None if any component isn't available (missing config, credentials, etc.).
497pub async fn init_sync(
498    config: &Config,
499    key_service: &KeyService,
500    raw_db_handle: &dyn RawDbHandle,
501    clock: ClockRef,
502    encryption: &EncryptionService,
503) -> Option<SyncComponents> {
504    let storage = match crate::storage::cloud::setup::create_sync_storage(
505        config,
506        key_service,
507        &Some(encryption.clone()),
508        clock.clone(),
509    )
510    .await
511    {
512        Ok(s) => s,
513        Err(e) => {
514            warn!("Failed to create sync storage: {e}");
515            return None;
516        }
517    };
518    let encryption_lock = storage.shared_encryption();
519
520    let user_keypair = match key_service.get_or_create_user_keypair() {
521        Ok(kp) => kp,
522        Err(e) => {
523            warn!("Failed to get/create user keypair for sync: {e}");
524            return None;
525        }
526    };
527
528    // Bootstrap auth keys if none exist yet
529    let cloud_home = storage.cloud_home();
530
531    let existing_keys = match cloud_home.list("auth/keys/").await {
532        Ok(keys) => keys,
533        Err(e) => {
534            warn!("Failed to list auth keys: {e}");
535            return None;
536        }
537    };
538
539    if existing_keys.is_empty() {
540        // Check if membership entries exist (shared library upgrade path)
541        let membership_entries = match storage.list_membership_entries().await {
542            Ok(entries) => entries,
543            Err(e) => {
544                warn!("Failed to list membership entries: {e}");
545                return None;
546            }
547        };
548
549        if membership_entries.is_empty() {
550            // Solo library — just write our own key
551            let user_pk = hex::encode(user_keypair.public_key);
552
553            if let Err(e) = cloud_home
554                .write(&format!("auth/keys/{user_pk}"), vec![])
555                .await
556            {
557                warn!("Failed to write auth key: {e}");
558                return None;
559            }
560        } else {
561            // Shared library — download chain and write keys for all current members
562            match super::membership_ops::download_chain(&storage, &membership_entries).await {
563                Ok(chain) => {
564                    if let Err(e) =
565                        super::membership_ops::sync_authorized_keys(cloud_home, &chain).await
566                    {
567                        warn!("Failed to bootstrap auth keys from membership chain: {e}");
568                        return None;
569                    }
570                }
571                Err(e) => {
572                    warn!("Failed to download membership chain for auth key bootstrap: {e}");
573                    return None;
574                }
575            }
576        }
577    }
578
579    // Acquire the raw sqlite handle and start the session AFTER all awaits.
580    // The free `*mut sqlite3` is !Send; moving it next to `SyncComponents`
581    // (which is unsafe impl Send) keeps the future's await points Send-clean.
582    //
583    // The auth-key bootstrap above persists to cloud-home before raw_db is
584    // acquired. If raw_write_handle fails, the bootstrapped key stays put;
585    // the next init_sync call finds existing_keys non-empty and skips the
586    // bootstrap branch entirely, so partial-success is idempotent.
587    let raw_db = match raw_db_handle.raw_write_handle().await {
588        Ok(ptr) => ptr,
589        Err(e) => {
590            warn!("Failed to extract raw write handle for sync: {e}");
591            return None;
592        }
593    };
594
595    let session = match unsafe { SyncSession::start(raw_db) } {
596        Ok(s) => s,
597        Err(e) => {
598            warn!("Failed to start initial sync session: {e}");
599            return None;
600        }
601    };
602
603    let hlc = Hlc::new(config.device_id.clone());
604
605    info!("Sync initialized (device: {})", config.device_id);
606
607    Some(SyncComponents {
608        storage: std::sync::Arc::new(storage),
609        hlc: std::sync::Arc::new(hlc),
610        device_id: config.device_id.clone(),
611        encryption: encryption_lock,
612        raw_db,
613        session,
614        user_keypair,
615    })
616}
617
618/// Components needed to run sync cycles.
619///
620/// The caller is responsible for wrapping these in the appropriate
621/// thread-safe containers (Arc, Mutex, etc.) for their runtime model.
622pub struct SyncComponents {
623    pub storage: std::sync::Arc<EncryptedSyncStorage>,
624    pub hlc: std::sync::Arc<Hlc>,
625    pub device_id: String,
626    pub encryption: std::sync::Arc<std::sync::RwLock<EncryptionService>>,
627    pub raw_db: *mut libsqlite3_sys::sqlite3,
628    pub session: SyncSession,
629    pub user_keypair: UserKeypair,
630}
631
632// SAFETY: The raw sqlite3 pointer is only used for session extension operations
633// which are serialized through the sync loop. The pointer itself is stable
634// (heap-allocated write connection inside Arc<DatabaseInner>).
635unsafe impl Send for SyncComponents {}