1use std::collections::HashMap;
15
16use tracing::{info, warn};
17
18use crate::blob::BlobPlan;
19use crate::keys::UserKeypair;
20use crate::library_dir::LibraryDir;
21
22use super::envelope::{self, sign_envelope, ChangesetEnvelope};
23use super::pull::{self, PullResult};
24use super::push::{OutgoingChangeset, SCHEMA_VERSION};
25use super::session::SyncSession;
26use super::storage::SyncStorage;
27
28pub struct SyncService {
30 pub device_id: String,
31}
32
33pub struct SyncResult {
35 pub outgoing: Option<OutgoingChangeset>,
38 pub pull: PullResult,
40 pub updated_cursors: HashMap<String, u64>,
42}
43
44impl SyncService {
45 pub fn new(device_id: String) -> Self {
46 SyncService { device_id }
47 }
48
49 pub async unsafe fn sync(
68 &self,
69 db: *mut libsqlite3_sys::sqlite3,
70 session: SyncSession,
71 local_seq: u64,
72 cursors: &HashMap<String, u64>,
73 storage: &dyn SyncStorage,
74 timestamp: &str,
75 message: &str,
76 keypair: &UserKeypair,
77 library_dir: &LibraryDir,
78 blob_plan: &dyn BlobPlan,
79 ) -> Result<SyncResult, SyncCycleError> {
80 let _ = library_dir;
81
82 let outgoing_cs = session.changeset().map_err(SyncCycleError::Session)?;
84
85 drop(session);
87
88 if let Some(ref cs) = outgoing_cs {
91 let changes =
92 crate::changeset::walk(cs.as_bytes()).map_err(SyncCycleError::AssetScan)?;
93 for blob in blob_plan.blobs_to_push(&changes) {
94 if !blob.local_path.exists() {
95 warn!(id = %blob.id, "blob file not found locally, skipping upload");
96 continue;
97 }
98 let bytes = std::fs::read(&blob.local_path)
99 .map_err(|e| SyncCycleError::AssetUpload(e.to_string()))?;
100 storage
101 .put_blob(&blob.namespace, &blob.id, blob.scope.clone(), bytes)
102 .await
103 .map_err(|e| SyncCycleError::AssetUpload(e.to_string()))?;
104 info!(id = %blob.id, namespace = %blob.namespace, "uploaded blob");
105 }
106 }
107
108 let outgoing = outgoing_cs.map(|cs| {
109 let next_seq = local_seq + 1;
110 let mut env = ChangesetEnvelope {
111 device_id: self.device_id.clone(),
112 seq: next_seq,
113 schema_version: SCHEMA_VERSION,
114 message: message.to_string(),
115 timestamp: timestamp.to_string(),
116 changeset_size: cs.len(),
117 author_pubkey: None,
118 signature: None,
119 };
120 sign_envelope(&mut env, keypair, cs.as_bytes());
121 let packed = envelope::pack(&env, cs.as_bytes());
122 OutgoingChangeset {
123 packed,
124 seq: next_seq,
125 }
126 });
127
128 let (updated_cursors, pull_result) = pull::pull_changes(
130 db,
131 storage,
132 &self.device_id,
133 cursors,
134 library_dir,
135 blob_plan,
136 )
137 .await
138 .map_err(SyncCycleError::Pull)?;
139
140 if pull_result.changesets_applied > 0 {
141 info!(
142 applied = pull_result.changesets_applied,
143 devices = pull_result.devices_pulled,
144 "pull complete"
145 );
146 }
147
148 Ok(SyncResult {
151 outgoing,
152 pull: pull_result,
153 updated_cursors,
154 })
155 }
156}
157
158#[derive(Debug)]
159pub enum SyncCycleError {
160 Session(super::session::SyncError),
161 Pull(pull::PullError),
162 AssetScan(String),
163 AssetUpload(String),
164}
165
166impl std::fmt::Display for SyncCycleError {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 match self {
169 SyncCycleError::Session(e) => write!(f, "session error: {e}"),
170 SyncCycleError::Pull(e) => write!(f, "pull error: {e}"),
171 SyncCycleError::AssetScan(e) => write!(f, "asset scan error: {e}"),
172 SyncCycleError::AssetUpload(e) => write!(f, "asset upload error: {e}"),
173 }
174 }
175}
176
177impl std::error::Error for SyncCycleError {}