coven/sync/
encrypted_storage.rs

1//! `SyncStorage` implementation backed by any `CloudHome`.
2//!
3//! Handles the cloud home path layout (where keys, heads, images, etc. live)
4//! and encryption/decryption. The underlying `CloudHome` only deals in raw
5//! bytes and flat keys.
6
7use async_trait::async_trait;
8use serde::{Deserialize, Serialize};
9use std::sync::{Arc, RwLock};
10
11use super::storage::{DeviceHead, StorageError, SyncStorage};
12use crate::encryption::EncryptionService;
13use crate::storage::cloud::CloudHome;
14
15/// Serialized form of a device head stored in `heads/{device_id}.json.enc`.
16#[derive(Serialize, Deserialize)]
17struct HeadJson {
18    seq: u64,
19    #[serde(skip_serializing_if = "Option::is_none")]
20    snapshot_seq: Option<u64>,
21    /// RFC 3339 timestamp of when this head was last written.
22    #[serde(skip_serializing_if = "Option::is_none")]
23    last_sync: Option<String>,
24}
25
26/// Serialized form of `min_schema_version.json.enc`.
27#[derive(Serialize, Deserialize)]
28struct MinSchemaVersionJson {
29    min_schema_version: u32,
30}
31
32/// `SyncStorage` that delegates raw I/O to a `CloudHome` and handles
33/// the path layout and encryption layer.
34pub struct EncryptedSyncStorage {
35    home: Box<dyn CloudHome>,
36    encryption: Arc<RwLock<EncryptionService>>,
37}
38
39impl EncryptedSyncStorage {
40    pub fn new(home: Box<dyn CloudHome>, encryption: EncryptionService) -> Self {
41        EncryptedSyncStorage {
42            home,
43            encryption: Arc::new(RwLock::new(encryption)),
44        }
45    }
46
47    /// Return a shared reference to the encryption lock for external use
48    /// (e.g., SyncHandle can share the same instance for snapshot creation).
49    pub fn shared_encryption(&self) -> Arc<RwLock<EncryptionService>> {
50        self.encryption.clone()
51    }
52
53    /// Borrow the underlying CloudHome for direct access (e.g., grant_access/revoke_access).
54    pub fn cloud_home(&self) -> &dyn CloudHome {
55        &*self.home
56    }
57
58    /// Convenience: read-lock the encryption service.
59    fn enc(&self) -> std::sync::RwLockReadGuard<'_, EncryptionService> {
60        self.encryption.read().unwrap()
61    }
62
63    /// Blob key: `{namespace}/{ab}/{cd}/{id}`.
64    pub fn blob_key(namespace: &str, id: &str) -> String {
65        crate::library_dir::LibraryDir::hashed_path(namespace, id)
66    }
67}
68
69#[async_trait]
70impl SyncStorage for EncryptedSyncStorage {
71    async fn list_heads(&self) -> Result<Vec<DeviceHead>, StorageError> {
72        let keys = self.home.list("heads/").await?;
73        let mut heads = Vec::new();
74
75        for key in &keys {
76            // key = "heads/{device_id}.json.enc"
77            let device_id = key
78                .strip_prefix("heads/")
79                .and_then(|s| s.strip_suffix(".json.enc"))
80                .ok_or_else(|| StorageError::S3(format!("unexpected head key format: {key}")))?;
81
82            let encrypted = self.home.read(key).await?;
83            let decrypted = self
84                .enc()
85                .decrypt(&encrypted)
86                .map_err(|e| StorageError::Decryption(format!("head {device_id}: {e}")))?;
87
88            let head_json: HeadJson = serde_json::from_slice(&decrypted)
89                .map_err(|e| StorageError::S3(format!("parse head {device_id}: {e}")))?;
90
91            heads.push(DeviceHead {
92                device_id: device_id.to_string(),
93                seq: head_json.seq,
94                snapshot_seq: head_json.snapshot_seq,
95                last_sync: head_json.last_sync,
96            });
97        }
98
99        Ok(heads)
100    }
101
102    async fn get_changeset(&self, device_id: &str, seq: u64) -> Result<Vec<u8>, StorageError> {
103        let key = format!("changes/{device_id}/{seq}.enc");
104        let encrypted = self.home.read(&key).await?;
105        self.enc()
106            .decrypt(&encrypted)
107            .map_err(|e| StorageError::Decryption(format!("changeset {device_id}/{seq}: {e}")))
108    }
109
110    async fn put_changeset(
111        &self,
112        device_id: &str,
113        seq: u64,
114        data: Vec<u8>,
115    ) -> Result<(), StorageError> {
116        let key = format!("changes/{device_id}/{seq}.enc");
117        let encrypted = self.enc().encrypt(&data);
118        self.home.write(&key, encrypted).await?;
119        Ok(())
120    }
121
122    async fn put_head(
123        &self,
124        device_id: &str,
125        seq: u64,
126        snapshot_seq: Option<u64>,
127        timestamp: &str,
128    ) -> Result<(), StorageError> {
129        let head = HeadJson {
130            seq,
131            snapshot_seq,
132            last_sync: Some(timestamp.to_string()),
133        };
134        let json = serde_json::to_vec(&head)
135            .map_err(|e| StorageError::S3(format!("serialize head: {e}")))?;
136        let encrypted = self.enc().encrypt(&json);
137        let key = format!("heads/{device_id}.json.enc");
138        self.home.write(&key, encrypted).await?;
139        Ok(())
140    }
141
142    async fn put_blob(
143        &self,
144        namespace: &str,
145        id: &str,
146        scope: crate::blob::BlobScope,
147        data: Vec<u8>,
148    ) -> Result<(), StorageError> {
149        let key = Self::blob_key(namespace, id);
150        let enc = match scope {
151            crate::blob::BlobScope::Master => self.enc().clone(),
152            crate::blob::BlobScope::Derived(s) => self.enc().derive_scoped(&s),
153        };
154        let encrypted = enc.encrypt(&data);
155        self.home.write(&key, encrypted).await?;
156        Ok(())
157    }
158
159    async fn get_blob(
160        &self,
161        namespace: &str,
162        id: &str,
163        scope: crate::blob::BlobScope,
164    ) -> Result<Vec<u8>, StorageError> {
165        let key = Self::blob_key(namespace, id);
166        let encrypted = self.home.read(&key).await?;
167        let enc = match scope {
168            crate::blob::BlobScope::Master => self.enc().clone(),
169            crate::blob::BlobScope::Derived(s) => self.enc().derive_scoped(&s),
170        };
171        enc.decrypt(&encrypted)
172            .map_err(|e| StorageError::Decryption(format!("blob {namespace}/{id}: {e}")))
173    }
174
175    async fn put_snapshot(&self, data: Vec<u8>) -> Result<(), StorageError> {
176        self.home.write("snapshot.db.enc", data).await?;
177        Ok(())
178    }
179
180    async fn get_snapshot(&self) -> Result<Vec<u8>, StorageError> {
181        self.home
182            .read("snapshot.db.enc")
183            .await
184            .map_err(StorageError::from)
185    }
186
187    async fn delete_changeset(&self, device_id: &str, seq: u64) -> Result<(), StorageError> {
188        let key = format!("changes/{device_id}/{seq}.enc");
189        self.home.delete(&key).await?;
190        Ok(())
191    }
192
193    async fn list_changesets(&self, device_id: &str) -> Result<Vec<u64>, StorageError> {
194        let prefix = format!("changes/{device_id}/");
195        let keys = self.home.list(&prefix).await?;
196
197        let mut seqs: Vec<u64> = keys
198            .iter()
199            .filter_map(|k| {
200                k.strip_prefix(&prefix)
201                    .and_then(|s| s.strip_suffix(".enc"))
202                    .and_then(|s| s.parse().ok())
203            })
204            .collect();
205        seqs.sort();
206        Ok(seqs)
207    }
208
209    async fn get_min_schema_version(&self) -> Result<Option<u32>, StorageError> {
210        let key = "min_schema_version.json.enc";
211        let encrypted = match self.home.read(key).await {
212            Ok(data) => data,
213            Err(crate::storage::cloud::CloudHomeError::NotFound(_)) => return Ok(None),
214            Err(e) => return Err(StorageError::from(e)),
215        };
216
217        let decrypted = self
218            .enc()
219            .decrypt(&encrypted)
220            .map_err(|e| StorageError::Decryption(format!("min_schema_version: {e}")))?;
221
222        let parsed: MinSchemaVersionJson = serde_json::from_slice(&decrypted)
223            .map_err(|e| StorageError::S3(format!("parse min_schema_version: {e}")))?;
224
225        Ok(Some(parsed.min_schema_version))
226    }
227
228    async fn set_min_schema_version(&self, version: u32) -> Result<(), StorageError> {
229        let payload = MinSchemaVersionJson {
230            min_schema_version: version,
231        };
232        let json = serde_json::to_vec(&payload)
233            .map_err(|e| StorageError::S3(format!("serialize min_schema_version: {e}")))?;
234        let encrypted = self.enc().encrypt(&json);
235        self.home
236            .write("min_schema_version.json.enc", encrypted)
237            .await?;
238        Ok(())
239    }
240
241    async fn put_membership_entry(
242        &self,
243        author_pubkey: &str,
244        seq: u64,
245        data: Vec<u8>,
246    ) -> Result<(), StorageError> {
247        let key = format!("membership/{author_pubkey}/{seq}.enc");
248        let encrypted = self.enc().encrypt(&data);
249        self.home.write(&key, encrypted).await?;
250        Ok(())
251    }
252
253    async fn get_membership_entry(
254        &self,
255        author_pubkey: &str,
256        seq: u64,
257    ) -> Result<Vec<u8>, StorageError> {
258        let key = format!("membership/{author_pubkey}/{seq}.enc");
259        let encrypted = self.home.read(&key).await?;
260        self.enc()
261            .decrypt(&encrypted)
262            .map_err(|e| StorageError::Decryption(format!("membership {author_pubkey}/{seq}: {e}")))
263    }
264
265    async fn list_membership_entries(&self) -> Result<Vec<(String, u64)>, StorageError> {
266        let keys = self.home.list("membership/").await?;
267        let mut entries = Vec::new();
268
269        for key in &keys {
270            // key = "membership/{author_pubkey}/{seq}.enc"
271            let rest = match key.strip_prefix("membership/") {
272                Some(r) => r,
273                None => continue,
274            };
275            let rest = match rest.strip_suffix(".enc") {
276                Some(r) => r,
277                None => continue,
278            };
279
280            // Split into author_pubkey and seq. The pubkey is hex (no slashes),
281            // so the last '/' separates pubkey from seq.
282            if let Some(slash_pos) = rest.rfind('/') {
283                let author = &rest[..slash_pos];
284                if let Ok(seq) = rest[slash_pos + 1..].parse::<u64>() {
285                    entries.push((author.to_string(), seq));
286                }
287            }
288        }
289
290        Ok(entries)
291    }
292
293    async fn put_wrapped_key(&self, user_pubkey: &str, data: Vec<u8>) -> Result<(), StorageError> {
294        let key = format!("keys/{user_pubkey}.enc");
295        // Wrapped keys are already encrypted (sealed box), store as-is.
296        self.home.write(&key, data).await?;
297        Ok(())
298    }
299
300    async fn get_wrapped_key(&self, user_pubkey: &str) -> Result<Vec<u8>, StorageError> {
301        let key = format!("keys/{user_pubkey}.enc");
302        // Wrapped keys are already encrypted (sealed box), return as-is.
303        self.home.read(&key).await.map_err(StorageError::from)
304    }
305
306    async fn delete_wrapped_key(&self, user_pubkey: &str) -> Result<(), StorageError> {
307        let key = format!("keys/{user_pubkey}.enc");
308        self.home.delete(&key).await?;
309        Ok(())
310    }
311
312    async fn put_snapshot_meta(&self, data: Vec<u8>) -> Result<(), StorageError> {
313        let encrypted = self.enc().encrypt(&data);
314        self.home.write("snapshot_meta.json.enc", encrypted).await?;
315        Ok(())
316    }
317
318    async fn get_snapshot_meta(&self) -> Result<Vec<u8>, StorageError> {
319        let encrypted = self.home.read("snapshot_meta.json.enc").await?;
320        self.enc()
321            .decrypt(&encrypted)
322            .map_err(|e| StorageError::Decryption(format!("snapshot_meta: {e}")))
323    }
324}