1use std::sync::Arc;
11
12use async_trait::async_trait;
13
14use super::{CloudHome, CloudHomeError, CloudHomeJoinInfo};
15
16const CHUNK_SIZE: usize = 10 * 1024 * 1024; pub trait CloudKitOps: Send + Sync {
22 fn write_record(&self, key: &str, data: Vec<u8>) -> Result<(), CloudHomeError>;
23 fn read_record(&self, key: &str) -> Result<Vec<u8>, CloudHomeError>;
24 fn list_records(&self, prefix: &str) -> Result<Vec<String>, CloudHomeError>;
25 fn delete_record(&self, key: &str) -> Result<(), CloudHomeError>;
26 fn record_exists(&self, key: &str) -> Result<bool, CloudHomeError>;
27 fn grant_access(&self, email: &str) -> Result<String, CloudHomeError>;
28 fn revoke_access(&self, user_record_id: &str) -> Result<(), CloudHomeError>;
29 fn accept_share(&self, share_url: &str) -> Result<(), CloudHomeError>;
30}
31
32pub struct CloudKitCloudHome {
34 ops: Arc<dyn CloudKitOps>,
35}
36
37impl CloudKitCloudHome {
38 pub fn new(ops: Arc<dyn CloudKitOps>) -> Self {
39 Self { ops }
40 }
41}
42
43fn strip_part_suffix(key: &str) -> &str {
45 if let Some(idx) = key.rfind(".part") {
46 let after = &key[idx + 5..];
47 if !after.is_empty() && after.chars().all(|c| c.is_ascii_digit()) {
48 return &key[..idx];
49 }
50 }
51 key
52}
53
54fn delete_all_variants(ops: &dyn CloudKitOps, key: &str) -> Result<(), CloudHomeError> {
56 match ops.delete_record(key) {
58 Ok(()) | Err(CloudHomeError::NotFound(_)) => {}
59 Err(e) => return Err(e),
60 }
61
62 let chunk_prefix = format!("{key}.part");
64 let chunks = ops.list_records(&chunk_prefix)?;
65 for chunk_key in chunks {
66 match ops.delete_record(&chunk_key) {
67 Ok(()) | Err(CloudHomeError::NotFound(_)) => {}
68 Err(e) => return Err(e),
69 }
70 }
71
72 Ok(())
73}
74
75#[async_trait]
76impl CloudHome for CloudKitCloudHome {
77 async fn write(&self, key: &str, data: Vec<u8>) -> Result<(), CloudHomeError> {
78 let ops = self.ops.clone();
79 let key = key.to_string();
80 tokio::task::spawn_blocking(move || {
81 delete_all_variants(&*ops, &key)?;
83
84 if data.len() <= CHUNK_SIZE {
85 ops.write_record(&key, data)
86 } else {
87 for (i, chunk) in data.chunks(CHUNK_SIZE).enumerate() {
88 let chunk_key = format!("{key}.part{i}");
89 ops.write_record(&chunk_key, chunk.to_vec())?;
90 }
91 Ok(())
92 }
93 })
94 .await
95 .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
96 }
97
98 async fn read(&self, key: &str) -> Result<Vec<u8>, CloudHomeError> {
99 let ops = self.ops.clone();
100 let key = key.to_string();
101 tokio::task::spawn_blocking(move || {
102 if ops.record_exists(&key)? {
104 return ops.read_record(&key);
105 }
106
107 let chunk_prefix = format!("{key}.part");
109 let chunk_keys = ops.list_records(&chunk_prefix)?;
110 if chunk_keys.is_empty() {
111 return Err(CloudHomeError::NotFound(key));
112 }
113
114 let mut numbered: Vec<(usize, String)> = chunk_keys
117 .into_iter()
118 .map(|k| {
119 let n = k
120 .rsplit_once(".part")
121 .and_then(|(_, suffix)| suffix.parse::<usize>().ok())
122 .ok_or_else(|| {
123 CloudHomeError::Storage(format!("chunk key {k:?} missing .part suffix"))
124 })?;
125 Ok::<_, CloudHomeError>((n, k))
126 })
127 .collect::<Result<_, _>>()?;
128 numbered.sort_by_key(|(n, _)| *n);
129
130 let mut result = Vec::new();
131 for (_, chunk_key) in &numbered {
132 let chunk = ops.read_record(chunk_key)?;
133 result.extend_from_slice(&chunk);
134 }
135 Ok(result)
136 })
137 .await
138 .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
139 }
140
141 async fn read_range(&self, key: &str, start: u64, end: u64) -> Result<Vec<u8>, CloudHomeError> {
142 if end <= start {
143 return Ok(Vec::new());
144 }
145
146 let ops = self.ops.clone();
147 let key = key.to_string();
148 tokio::task::spawn_blocking(move || {
149 let start = start as usize;
150 let end = end as usize;
151
152 if ops.record_exists(&key)? {
154 let data = ops.read_record(&key)?;
155 if end > data.len() {
156 return Err(CloudHomeError::Storage(format!(
157 "range {start}..{end} exceeds file size {}",
158 data.len()
159 )));
160 }
161 return Ok(data[start..end].to_vec());
162 }
163
164 let first_chunk = start / CHUNK_SIZE;
166 let last_chunk = (end - 1) / CHUNK_SIZE;
167
168 let mut result = Vec::with_capacity(end - start);
169 for i in first_chunk..=last_chunk {
170 let chunk_key = format!("{key}.part{i}");
171 let chunk = ops.read_record(&chunk_key)?;
172
173 let chunk_start = i * CHUNK_SIZE;
174 let slice_start = if i == first_chunk {
175 start - chunk_start
176 } else {
177 0
178 };
179 let slice_end = if i == last_chunk {
180 end - chunk_start
181 } else {
182 chunk.len()
183 };
184 result.extend_from_slice(&chunk[slice_start..slice_end]);
185 }
186 Ok(result)
187 })
188 .await
189 .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
190 }
191
192 async fn list(&self, prefix: &str) -> Result<Vec<String>, CloudHomeError> {
193 let ops = self.ops.clone();
194 let prefix = prefix.to_string();
195 tokio::task::spawn_blocking(move || {
196 let raw_keys = ops.list_records(&prefix)?;
197
198 let mut base_keys: Vec<String> = raw_keys
200 .iter()
201 .map(|k| strip_part_suffix(k).to_string())
202 .collect();
203 base_keys.sort();
204 base_keys.dedup();
205 Ok(base_keys)
206 })
207 .await
208 .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
209 }
210
211 async fn delete(&self, key: &str) -> Result<(), CloudHomeError> {
212 let ops = self.ops.clone();
213 let key = key.to_string();
214 tokio::task::spawn_blocking(move || delete_all_variants(&*ops, &key))
215 .await
216 .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
217 }
218
219 async fn exists(&self, key: &str) -> Result<bool, CloudHomeError> {
220 let ops = self.ops.clone();
221 let key = key.to_string();
222 tokio::task::spawn_blocking(move || {
223 if ops.record_exists(&key)? {
224 return Ok(true);
225 }
226 let chunk_prefix = format!("{key}.part");
227 let chunks = ops.list_records(&chunk_prefix)?;
228 Ok(!chunks.is_empty())
229 })
230 .await
231 .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
232 }
233
234 async fn grant_access(&self, member_id: &str) -> Result<CloudHomeJoinInfo, CloudHomeError> {
235 let ops = self.ops.clone();
236 let member_id = member_id.to_string();
237 tokio::task::spawn_blocking(move || {
238 let share_url = ops.grant_access(&member_id)?;
239 Ok(CloudHomeJoinInfo::CloudKit { share_url })
240 })
241 .await
242 .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
243 }
244
245 async fn revoke_access(&self, member_id: &str) -> Result<(), CloudHomeError> {
246 let ops = self.ops.clone();
247 let member_id = member_id.to_string();
248 tokio::task::spawn_blocking(move || ops.revoke_access(&member_id))
249 .await
250 .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use std::collections::HashMap;
258 use std::sync::Mutex;
259
260 struct MockCloudKitOps {
261 store: Mutex<HashMap<String, Vec<u8>>>,
262 }
263
264 impl MockCloudKitOps {
265 fn new() -> Self {
266 Self {
267 store: Mutex::new(HashMap::new()),
268 }
269 }
270 }
271
272 impl CloudKitOps for MockCloudKitOps {
273 fn write_record(&self, key: &str, data: Vec<u8>) -> Result<(), CloudHomeError> {
274 self.store.lock().unwrap().insert(key.to_string(), data);
275 Ok(())
276 }
277
278 fn read_record(&self, key: &str) -> Result<Vec<u8>, CloudHomeError> {
279 self.store
280 .lock()
281 .unwrap()
282 .get(key)
283 .cloned()
284 .ok_or_else(|| CloudHomeError::NotFound(key.to_string()))
285 }
286
287 fn list_records(&self, prefix: &str) -> Result<Vec<String>, CloudHomeError> {
288 let store = self.store.lock().unwrap();
289 let mut keys: Vec<String> = store
290 .keys()
291 .filter(|k| k.starts_with(prefix))
292 .cloned()
293 .collect();
294 keys.sort();
295 Ok(keys)
296 }
297
298 fn delete_record(&self, key: &str) -> Result<(), CloudHomeError> {
299 self.store.lock().unwrap().remove(key);
300 Ok(())
301 }
302
303 fn record_exists(&self, key: &str) -> Result<bool, CloudHomeError> {
304 Ok(self.store.lock().unwrap().contains_key(key))
305 }
306
307 fn grant_access(&self, email: &str) -> Result<String, CloudHomeError> {
308 Ok(format!("https://www.icloud.com/share/{email}"))
309 }
310
311 fn revoke_access(&self, _user_record_id: &str) -> Result<(), CloudHomeError> {
312 Ok(())
313 }
314
315 fn accept_share(&self, _share_url: &str) -> Result<(), CloudHomeError> {
316 Ok(())
317 }
318 }
319
320 fn make_cloud_home() -> CloudKitCloudHome {
321 CloudKitCloudHome::new(Arc::new(MockCloudKitOps::new()))
322 }
323
324 #[tokio::test]
325 async fn test_small_file_roundtrip() {
326 let ch = make_cloud_home();
327 let data = b"hello world".to_vec();
328 ch.write("small.bin", data.clone()).await.unwrap();
329 let read = ch.read("small.bin").await.unwrap();
330 assert_eq!(read, data);
331 }
332
333 #[tokio::test]
334 async fn test_large_file_roundtrip() {
335 let ch = make_cloud_home();
336 let data: Vec<u8> = (0..25 * 1024 * 1024).map(|i| (i % 256) as u8).collect();
338 ch.write("large.bin", data.clone()).await.unwrap();
339 let read = ch.read("large.bin").await.unwrap();
340 assert_eq!(read.len(), data.len());
341 assert_eq!(read, data);
342 }
343
344 #[tokio::test]
345 async fn test_read_range_single() {
346 let ch = make_cloud_home();
347 ch.write("range.bin", b"0123456789".to_vec()).await.unwrap();
348 let slice = ch.read_range("range.bin", 3, 7).await.unwrap();
349 assert_eq!(slice, b"3456");
350 }
351
352 #[tokio::test]
353 async fn test_read_range_chunked() {
354 let ch = make_cloud_home();
355 let data: Vec<u8> = (0..15 * 1024 * 1024).map(|i| (i % 256) as u8).collect();
357 ch.write("big.bin", data.clone()).await.unwrap();
358
359 let boundary = CHUNK_SIZE;
361 let start = (boundary - 2) as u64;
362 let end = (boundary + 3) as u64;
363 let slice = ch.read_range("big.bin", start, end).await.unwrap();
364 assert_eq!(slice.len(), 5);
365 assert_eq!(slice, &data[start as usize..end as usize]);
366 }
367
368 #[tokio::test]
369 async fn test_list_deduplicates_chunks() {
370 let ch = make_cloud_home();
371 let data: Vec<u8> = vec![0u8; 25 * 1024 * 1024];
373 ch.write("files/album.flac", data).await.unwrap();
374
375 ch.write("files/cover.jpg", b"img".to_vec()).await.unwrap();
377
378 let keys = ch.list("files/").await.unwrap();
379 assert_eq!(keys.len(), 2);
380 assert!(keys.contains(&"files/album.flac".to_string()));
381 assert!(keys.contains(&"files/cover.jpg".to_string()));
382 }
383
384 #[tokio::test]
385 async fn test_delete_removes_all_chunks() {
386 let ch = make_cloud_home();
387 let data: Vec<u8> = vec![0u8; 25 * 1024 * 1024];
388 ch.write("to-delete.bin", data).await.unwrap();
389
390 assert!(ch.exists("to-delete.bin").await.unwrap());
391
392 ch.delete("to-delete.bin").await.unwrap();
393
394 assert!(!ch.exists("to-delete.bin").await.unwrap());
395
396 let ops = &ch.ops;
398 let keys = ops.list_records("to-delete.bin").unwrap();
399 assert!(keys.is_empty());
400 }
401
402 #[tokio::test]
403 async fn test_overwrite_chunked_with_single() {
404 let ch = make_cloud_home();
405 let large_data: Vec<u8> = vec![0u8; 25 * 1024 * 1024];
407 ch.write("file.bin", large_data).await.unwrap();
408
409 let small_data = b"small".to_vec();
411 ch.write("file.bin", small_data.clone()).await.unwrap();
412
413 let read = ch.read("file.bin").await.unwrap();
414 assert_eq!(read, small_data);
415
416 let chunks = ch.ops.list_records("file.bin.part").unwrap();
418 assert!(chunks.is_empty());
419 }
420
421 #[tokio::test]
422 async fn test_overwrite_single_with_chunked() {
423 let ch = make_cloud_home();
424 ch.write("file.bin", b"small".to_vec()).await.unwrap();
426
427 let large_data: Vec<u8> = vec![1u8; 25 * 1024 * 1024];
429 ch.write("file.bin", large_data.clone()).await.unwrap();
430
431 let read = ch.read("file.bin").await.unwrap();
432 assert_eq!(read, large_data);
433
434 assert!(!ch.ops.record_exists("file.bin").unwrap());
436 }
437
438 #[tokio::test]
439 async fn test_exists() {
440 let ch = make_cloud_home();
441
442 assert!(!ch.exists("nope.bin").await.unwrap());
443
444 ch.write("yep.bin", b"data".to_vec()).await.unwrap();
445 assert!(ch.exists("yep.bin").await.unwrap());
446
447 let data: Vec<u8> = vec![0u8; 15 * 1024 * 1024];
449 ch.write("chunked.bin", data).await.unwrap();
450 assert!(ch.exists("chunked.bin").await.unwrap());
451 }
452
453 #[tokio::test]
454 async fn test_read_range_empty_when_end_leq_start() {
455 let ch = make_cloud_home();
456 ch.write("range.bin", b"0123456789".to_vec()).await.unwrap();
457
458 let slice = ch.read_range("range.bin", 3, 3).await.unwrap();
460 assert!(slice.is_empty());
461
462 let slice = ch.read_range("range.bin", 5, 2).await.unwrap();
464 assert!(slice.is_empty());
465
466 let slice = ch.read_range("range.bin", 0, 0).await.unwrap();
468 assert!(slice.is_empty());
469 }
470
471 #[test]
472 fn test_strip_part_suffix() {
473 assert_eq!(strip_part_suffix("file.bin.part0"), "file.bin");
474 assert_eq!(strip_part_suffix("file.bin.part123"), "file.bin");
475 assert_eq!(strip_part_suffix("file.bin"), "file.bin");
476 assert_eq!(strip_part_suffix("file.partition"), "file.partition");
477 assert_eq!(strip_part_suffix("file.part"), "file.part"); }
479}