1use std::path::PathBuf;
8
9use tracing::{info, warn};
10
11use crate::blob::{BlobPlan, BlobUploadObserver};
12use crate::changeset::RowChange;
13use crate::clock::ClockRef;
14use crate::config::Config;
15use crate::db::{RawDbHandle, SyncBookkeeping};
16use crate::encryption::EncryptionService;
17use crate::keys::{KeyService, UserKeypair};
18use crate::library_dir::LibraryDir;
19use crate::storage::cloud::CloudHome;
20
21use super::encrypted_storage::EncryptedSyncStorage;
22use super::hlc::{Hlc, Timestamp};
23use super::service::SyncService;
24use super::session::SyncSession;
25use super::storage::SyncStorage;
26
27pub struct SyncCycleResult {
29 pub changesets_applied: u64,
31 pub other_device_count: usize,
33 pub sync_time: String,
35 pub asset_downloads_failed: bool,
37 pub row_changes: Vec<RowChange>,
39}
40
41pub fn staging_path(library_dir: &LibraryDir) -> PathBuf {
43 library_dir.join("sync_staging.bin")
44}
45
46pub fn stage_changeset(library_dir: &LibraryDir, packed: &[u8]) {
48 if let Err(e) = std::fs::write(staging_path(library_dir), packed) {
49 warn!("Failed to stage outgoing changeset: {e}");
50 }
51}
52
53pub fn clear_staged_changeset(library_dir: &LibraryDir) {
55 let _ = std::fs::remove_file(staging_path(library_dir));
56}
57
58pub fn read_staged_changeset(library_dir: &LibraryDir) -> Option<Vec<u8>> {
60 let path = staging_path(library_dir);
61 if path.exists() {
62 match std::fs::read(&path) {
63 Ok(data) if !data.is_empty() => Some(data),
64 Ok(_) => {
65 clear_staged_changeset(library_dir);
66 None
67 }
68 Err(e) => {
69 warn!("Failed to read staged changeset: {e}");
70 clear_staged_changeset(library_dir);
71 None
72 }
73 }
74 } else {
75 None
76 }
77}
78
79pub async fn push_changeset(
81 storage: &dyn SyncStorage,
82 device_id: &str,
83 seq: u64,
84 packed: Vec<u8>,
85 snapshot_seq: Option<u64>,
86 timestamp: &str,
87) -> Result<(), super::storage::StorageError> {
88 storage.put_changeset(device_id, seq, packed).await?;
89 storage
90 .put_head(device_id, seq, snapshot_seq, timestamp)
91 .await?;
92 Ok(())
93}
94
95pub enum SyncCycleOutcome {
97 Ok(SyncCycleResult, SyncSession),
99 ErrWithSession(String, SyncSession),
101 ErrNoSession(String),
103}
104
105pub async unsafe fn run_single_sync_cycle(
118 storage: &dyn SyncStorage,
119 device_id: &str,
120 hlc: &Hlc,
121 clock: &dyn crate::clock::Clock,
122 raw_db: *mut libsqlite3_sys::sqlite3,
123 session: SyncSession,
124 encryption: &std::sync::RwLock<EncryptionService>,
125 user_keypair: &UserKeypair,
126 db: &dyn SyncBookkeeping,
127 library_dir: &LibraryDir,
128 cloud_home: Option<&dyn CloudHome>,
129 blob_plan: &dyn BlobPlan,
130 observer: Option<&dyn BlobUploadObserver>,
131) -> SyncCycleOutcome {
132 let sync_service = SyncService::new(device_id.to_string());
133
134 macro_rules! recover_session_on_err {
136 ($err:expr) => {
137 match unsafe { SyncSession::start(raw_db) } {
138 Ok(s) => return SyncCycleOutcome::ErrWithSession($err, s),
139 Err(se) => {
140 return SyncCycleOutcome::ErrNoSession(format!(
141 "{} (also failed to restart session: {se})",
142 $err
143 ))
144 }
145 }
146 };
147 }
148
149 let mut local_seq = match db.get_sync_state("local_seq").await {
153 Ok(Some(v)) => match v.parse::<u64>() {
154 Ok(n) => n,
155 Err(e) => {
156 recover_session_on_err!(format!("Corrupt local_seq value: {e}"));
157 }
158 },
159 Ok(None) => 0,
160 Err(e) => {
161 recover_session_on_err!(format!("Failed to read local_seq: {e}"));
162 }
163 };
164
165 let snapshot_seq: Option<u64> = match db.get_sync_state("snapshot_seq").await {
166 Ok(Some(v)) => match v.parse::<u64>() {
167 Ok(n) => Some(n),
168 Err(e) => {
169 recover_session_on_err!(format!("Corrupt snapshot_seq value: {e}"));
170 }
171 },
172 Ok(None) => None,
173 Err(e) => {
174 recover_session_on_err!(format!("Failed to read snapshot_seq: {e}"));
175 }
176 };
177
178 let last_snapshot_time: Option<chrono::DateTime<chrono::Utc>> =
179 match db.get_sync_state("last_snapshot_time").await {
180 Ok(Some(v)) => match chrono::DateTime::parse_from_rfc3339(&v) {
181 Ok(dt) => Some(dt.with_timezone(&chrono::Utc)),
182 Err(e) => {
183 recover_session_on_err!(format!("Corrupt last_snapshot_time value: {e}"));
184 }
185 },
186 Ok(None) => None,
187 Err(e) => {
188 recover_session_on_err!(format!("Failed to read last_snapshot_time: {e}"));
189 }
190 };
191
192 let staged_seq: Option<u64> = match db.get_sync_state("staged_seq").await {
193 Ok(Some(v)) if v.is_empty() => None,
194 Ok(Some(v)) => match v.parse::<u64>() {
195 Ok(n) => Some(n),
196 Err(e) => {
197 recover_session_on_err!(format!("Corrupt staged_seq value: {e}"));
198 }
199 },
200 Ok(None) => None,
201 Err(e) => {
202 recover_session_on_err!(format!("Failed to read staged_seq: {e}"));
203 }
204 };
205
206 if let Some(seq) = staged_seq {
208 if let Some(staged_data) = read_staged_changeset(library_dir) {
209 let timestamp = hlc.now().to_string();
210
211 info!(seq, "Retrying staged changeset push");
212
213 match push_changeset(
214 storage,
215 device_id,
216 seq,
217 staged_data,
218 snapshot_seq,
219 ×tamp,
220 )
221 .await
222 {
223 Ok(()) => {
224 info!(seq, "Staged changeset push succeeded");
225 clear_staged_changeset(library_dir);
226 local_seq = seq;
227
228 if let Err(e) = db.set_sync_state("local_seq", &seq.to_string()).await {
229 recover_session_on_err!(format!(
230 "Failed to persist local_seq after staged push: {e}"
231 ));
232 }
233 if let Err(e) = db.set_sync_state("staged_seq", "").await {
234 recover_session_on_err!(format!(
235 "Failed to clear staged_seq after staged push: {e}"
236 ));
237 }
238 }
239 Err(e) => {
240 recover_session_on_err!(format!("Staged changeset push failed: {e}"));
241 }
242 }
243 } else if let Err(e) = db.set_sync_state("staged_seq", "").await {
244 recover_session_on_err!(format!("Failed to clear stale staged_seq: {e}"));
245 }
246 }
247
248 if let Some(ch) = cloud_home {
250 match super::outbox::process_uploads(db, ch, encryption, library_dir.as_ref(), observer)
251 .await
252 {
253 Ok(n) if n > 0 => info!(count = n, "Processed outbox uploads"),
254 Err(e) => warn!("Outbox upload processing error: {e}"),
255 _ => {}
256 }
257 }
258
259 let has_pending_uploads = match db.has_pending_cloud_uploads().await {
261 Ok(v) => v,
262 Err(e) => {
263 recover_session_on_err!(format!("Failed to check pending cloud uploads: {e}"));
264 }
265 };
266
267 let cursors = match db.get_all_sync_cursors().await {
269 Ok(c) => c,
270 Err(e) => {
271 recover_session_on_err!(format!("Failed to load sync cursors: {e}"));
272 }
273 };
274
275 let timestamp = hlc.now().to_string();
276
277 let sync_result = unsafe {
279 sync_service
280 .sync(
281 raw_db,
282 session,
283 local_seq,
284 &cursors,
285 storage,
286 ×tamp,
287 "background sync",
288 user_keypair,
289 library_dir,
290 blob_plan,
291 )
292 .await
293 };
294
295 let sync_result = match sync_result {
296 Ok(r) => r,
297 Err(e) => {
298 recover_session_on_err!(format!("Sync cycle error: {e}"));
299 }
300 };
301
302 if has_pending_uploads {
306 if sync_result.outgoing.is_some() {
307 info!("Deferring changeset push: pending cloud uploads remain");
308 }
309 } else if let Some(outgoing) = &sync_result.outgoing {
310 let seq = outgoing.seq;
311
312 stage_changeset(library_dir, &outgoing.packed);
314
315 if let Err(e) = db.set_sync_state("staged_seq", &seq.to_string()).await {
316 recover_session_on_err!(format!("Failed to persist staged_seq before push: {e}"));
317 }
318
319 match push_changeset(
320 storage,
321 device_id,
322 seq,
323 outgoing.packed.clone(),
324 snapshot_seq,
325 ×tamp,
326 )
327 .await
328 {
329 Ok(()) => {
330 clear_staged_changeset(library_dir);
331 local_seq = seq;
332
333 if let Err(e) = db.set_sync_state("local_seq", &seq.to_string()).await {
334 recover_session_on_err!(format!("Failed to persist local_seq after push: {e}"));
335 }
336 if let Err(e) = db.set_sync_state("staged_seq", "").await {
337 recover_session_on_err!(format!("Failed to clear staged_seq after push: {e}"));
338 }
339
340 info!(seq, "Pushed changeset");
341 }
342 Err(e) => {
343 warn!(seq, "Push failed, changeset staged for retry: {e}");
344 }
345 }
346 }
347
348 for (cursor_device_id, cursor_seq) in &sync_result.updated_cursors {
350 if let Err(e) = db.set_sync_cursor(cursor_device_id, *cursor_seq).await {
351 warn!(
352 device_id = cursor_device_id,
353 seq = cursor_seq,
354 "Failed to persist sync cursor: {e}"
355 );
356 }
357 }
358
359 let max_remote_ts = sync_result
361 .pull
362 .remote_heads
363 .iter()
364 .filter(|h| h.device_id != device_id)
365 .filter_map(|h| h.last_sync.as_deref())
366 .filter_map(
367 |ts_str| match chrono::DateTime::parse_from_rfc3339(ts_str) {
368 Ok(dt) => Some(dt.timestamp_millis().max(0) as u64),
369 Err(e) => {
370 warn!(
371 timestamp = ts_str,
372 "Failed to parse peer HLC timestamp: {e}"
373 );
374 None
375 }
376 },
377 )
378 .max();
379
380 if let Some(remote_millis) = max_remote_ts {
381 let remote_ts = Timestamp::new(remote_millis, 0, "remote".to_string());
382 hlc.update(&remote_ts);
383 }
384
385 if let Some(ch) = cloud_home {
387 let device_head_seqs: Vec<u64> = sync_result
388 .pull
389 .remote_heads
390 .iter()
391 .map(|h| h.seq)
392 .collect();
393
394 match super::outbox::process_deletes(db, ch, &device_head_seqs).await {
395 Ok(n) if n > 0 => info!(count = n, "Processed outbox deletes"),
396 Err(e) => warn!("Outbox delete processing error: {e}"),
397 _ => {}
398 }
399 }
400
401 let new_session = match unsafe { SyncSession::start(raw_db) } {
403 Ok(s) => s,
404 Err(e) => {
405 return SyncCycleOutcome::ErrNoSession(format!("Failed to restart sync session: {e}"));
406 }
407 };
408
409 let hours_since = last_snapshot_time.map(|t| {
411 let elapsed = clock.now().signed_duration_since(t);
412 elapsed.num_hours().max(0) as u64
413 });
414
415 let is_initial_sync =
420 local_seq == 0 && snapshot_seq.is_none() && sync_result.outgoing.is_none();
421
422 if is_initial_sync
423 || super::snapshot::should_create_snapshot(local_seq, snapshot_seq, hours_since)
424 {
425 if is_initial_sync {
426 info!("Initial sync: pushing snapshot of existing library data");
427 } else {
428 info!("Snapshot policy triggered, creating snapshot");
429 }
430
431 let temp_dir = std::env::temp_dir();
432 let snapshot_result = {
433 let enc = encryption.read().unwrap();
434 unsafe { super::snapshot::create_snapshot(raw_db, &temp_dir, &enc) }
435 };
436
437 match snapshot_result {
438 Ok(encrypted) => {
439 match super::snapshot::push_snapshot(
440 storage, encrypted, device_id, local_seq, clock,
441 )
442 .await
443 {
444 Ok(()) => {
445 if let Err(e) = db
446 .set_sync_state("snapshot_seq", &local_seq.to_string())
447 .await
448 {
449 recover_session_on_err!(format!("Failed to persist snapshot_seq: {e}"));
450 }
451 if let Err(e) = db
452 .set_sync_state("last_snapshot_time", &clock.now().to_rfc3339())
453 .await
454 {
455 recover_session_on_err!(format!(
456 "Failed to persist last_snapshot_time: {e}"
457 ));
458 }
459
460 info!(local_seq, "Snapshot created and pushed");
461 }
462 Err(e) => {
463 warn!("Failed to push snapshot: {e}");
464 }
465 }
466 }
467 Err(e) => {
468 warn!("Failed to create snapshot: {e}");
469 }
470 }
471 }
472
473 let now = clock.now().to_rfc3339();
475 let core_status =
476 super::status::build_sync_status(&sync_result.pull.remote_heads, device_id, Some(&now));
477
478 let other_device_count = core_status.other_devices.len();
479
480 SyncCycleOutcome::Ok(
481 SyncCycleResult {
482 changesets_applied: sync_result.pull.changesets_applied,
483 other_device_count,
484 sync_time: now,
485 asset_downloads_failed: sync_result.pull.asset_downloads_failed,
486 row_changes: sync_result.pull.row_changes,
487 },
488 new_session,
489 )
490}
491
492pub async fn init_sync(
498 config: &Config,
499 key_service: &KeyService,
500 raw_db_handle: &dyn RawDbHandle,
501 clock: ClockRef,
502 encryption: &EncryptionService,
503) -> Option<SyncComponents> {
504 let storage = match crate::storage::cloud::setup::create_sync_storage(
505 config,
506 key_service,
507 &Some(encryption.clone()),
508 clock.clone(),
509 )
510 .await
511 {
512 Ok(s) => s,
513 Err(e) => {
514 warn!("Failed to create sync storage: {e}");
515 return None;
516 }
517 };
518 let encryption_lock = storage.shared_encryption();
519
520 let user_keypair = match key_service.get_or_create_user_keypair() {
521 Ok(kp) => kp,
522 Err(e) => {
523 warn!("Failed to get/create user keypair for sync: {e}");
524 return None;
525 }
526 };
527
528 let cloud_home = storage.cloud_home();
530
531 let existing_keys = match cloud_home.list("auth/keys/").await {
532 Ok(keys) => keys,
533 Err(e) => {
534 warn!("Failed to list auth keys: {e}");
535 return None;
536 }
537 };
538
539 if existing_keys.is_empty() {
540 let membership_entries = match storage.list_membership_entries().await {
542 Ok(entries) => entries,
543 Err(e) => {
544 warn!("Failed to list membership entries: {e}");
545 return None;
546 }
547 };
548
549 if membership_entries.is_empty() {
550 let user_pk = hex::encode(user_keypair.public_key);
552
553 if let Err(e) = cloud_home
554 .write(&format!("auth/keys/{user_pk}"), vec![])
555 .await
556 {
557 warn!("Failed to write auth key: {e}");
558 return None;
559 }
560 } else {
561 match super::membership_ops::download_chain(&storage, &membership_entries).await {
563 Ok(chain) => {
564 if let Err(e) =
565 super::membership_ops::sync_authorized_keys(cloud_home, &chain).await
566 {
567 warn!("Failed to bootstrap auth keys from membership chain: {e}");
568 return None;
569 }
570 }
571 Err(e) => {
572 warn!("Failed to download membership chain for auth key bootstrap: {e}");
573 return None;
574 }
575 }
576 }
577 }
578
579 let raw_db = match raw_db_handle.raw_write_handle().await {
588 Ok(ptr) => ptr,
589 Err(e) => {
590 warn!("Failed to extract raw write handle for sync: {e}");
591 return None;
592 }
593 };
594
595 let session = match unsafe { SyncSession::start(raw_db) } {
596 Ok(s) => s,
597 Err(e) => {
598 warn!("Failed to start initial sync session: {e}");
599 return None;
600 }
601 };
602
603 let hlc = Hlc::new(config.device_id.clone());
604
605 info!("Sync initialized (device: {})", config.device_id);
606
607 Some(SyncComponents {
608 storage: std::sync::Arc::new(storage),
609 hlc: std::sync::Arc::new(hlc),
610 device_id: config.device_id.clone(),
611 encryption: encryption_lock,
612 raw_db,
613 session,
614 user_keypair,
615 })
616}
617
618pub struct SyncComponents {
623 pub storage: std::sync::Arc<EncryptedSyncStorage>,
624 pub hlc: std::sync::Arc<Hlc>,
625 pub device_id: String,
626 pub encryption: std::sync::Arc<std::sync::RwLock<EncryptionService>>,
627 pub raw_db: *mut libsqlite3_sys::sqlite3,
628 pub session: SyncSession,
629 pub user_keypair: UserKeypair,
630}
631
632unsafe impl Send for SyncComponents {}