coven/sync/
outbox.rs

1//! Cloud outbox: async upload/delete of encrypted blobs.
2//!
3//! The host enqueues upload entries (after writing a blob locally) and delete
4//! entries (tagged with the current sync seq). The sync cycle processes the
5//! outbox: uploads before push, deletes after pull.
6
7use std::path::Path;
8
9use tracing::warn;
10
11use crate::blob::BlobUploadObserver;
12use crate::db::SyncBookkeeping;
13use crate::encryption::EncryptionService;
14use crate::storage::cloud::CloudHome;
15
16/// Process pending uploads: read local file, encrypt, write to cloud.
17///
18/// Returns the number of successful uploads. Stops at the first failure so we
19/// don't push out-of-order. After each successful upload, `observer` (if any)
20/// is notified so the host can run its own bookkeeping.
21pub async fn process_uploads(
22    db: &dyn SyncBookkeeping,
23    cloud_home: &dyn CloudHome,
24    encryption: &std::sync::RwLock<EncryptionService>,
25    library_dir: &Path,
26    observer: Option<&dyn BlobUploadObserver>,
27) -> Result<usize, String> {
28    let uploads = db
29        .get_pending_cloud_uploads()
30        .await
31        .map_err(|e| format!("Failed to get pending uploads: {e}"))?;
32
33    let mut count = 0;
34    for entry in uploads {
35        let file_path = match &entry.source_path {
36            Some(p) => std::path::PathBuf::from(p),
37            None => library_dir.join(crate::storage::local::storage_path(&entry.file_id)),
38        };
39
40        let data = match tokio::fs::read(&file_path).await {
41            Ok(d) => d,
42            Err(e) => {
43                warn!(
44                    "Upload failed: cannot read local file {}: {e}",
45                    file_path.display()
46                );
47                break;
48            }
49        };
50
51        let encrypted = {
52            let enc = encryption.read().unwrap();
53            enc.encrypt(&data)
54        };
55
56        match cloud_home.write(&entry.cloud_key, encrypted).await {
57            Ok(()) => {
58                if let Err(e) = db.remove_cloud_outbox_entry(entry.id).await {
59                    warn!("Failed to remove outbox entry {}: {e}", entry.id);
60                }
61                count += 1;
62
63                if let Some(obs) = observer {
64                    obs.on_blob_uploaded(&entry.file_id).await;
65                }
66            }
67            Err(e) => {
68                warn!("Upload failed for {}: {e}", entry.cloud_key);
69                break;
70            }
71        }
72    }
73
74    Ok(count)
75}
76
77/// Process pending deletes: remove cloud files whose deletion has been synced.
78///
79/// A delete is only safe when all known device heads have advanced past the
80/// entry's `min_seq`. Returns the number of successful deletes.
81pub async fn process_deletes(
82    db: &dyn SyncBookkeeping,
83    cloud_home: &dyn CloudHome,
84    device_head_seqs: &[u64],
85) -> Result<usize, String> {
86    let deletes = db
87        .get_pending_cloud_deletes()
88        .await
89        .map_err(|e| format!("Failed to get pending deletes: {e}"))?;
90
91    if deletes.is_empty() {
92        return Ok(0);
93    }
94
95    let min_head = device_head_seqs.iter().copied().min();
96
97    let mut count = 0;
98    for entry in deletes {
99        if let Some(min_seq) = entry.min_seq {
100            if let Some(head) = min_head {
101                if head <= min_seq {
102                    continue;
103                }
104            }
105        }
106
107        match cloud_home.delete(&entry.cloud_key).await {
108            Ok(()) => {
109                if let Err(e) = db.remove_cloud_outbox_entry(entry.id).await {
110                    warn!("Failed to remove outbox entry {}: {e}", entry.id);
111                }
112                count += 1;
113            }
114            Err(e) => {
115                warn!("Delete failed for {}: {e}", entry.cloud_key);
116                // Continue trying other deletes — unlike uploads, order doesn't matter
117            }
118        }
119    }
120
121    Ok(count)
122}