1use std::collections::HashMap;
14
15use tracing::{debug, info, warn};
16
17use super::apply::apply_changeset_lww;
18use super::envelope::{self, verify_changeset_signature};
19use super::membership::MembershipChain;
20use super::push::SCHEMA_VERSION;
21use super::session_ext::Changeset;
22use super::storage::{DeviceHead, SyncStorage};
23use crate::blob::BlobPlan;
24use crate::changeset::RowChange;
25use crate::library_dir::LibraryDir;
26
27const INITIAL_CURSOR: u64 = 0;
34
35fn cursor_for_device(cursors: &HashMap<String, u64>, device_id: &str) -> u64 {
40 match cursors.get(device_id) {
41 Some(seq) => *seq,
42 None => {
43 debug!(%device_id, "no cursor for device, starting from initial");
44 INITIAL_CURSOR
45 }
46 }
47}
48
49#[derive(Debug)]
51pub struct PullResult {
52 pub changesets_applied: u64,
54 pub devices_pulled: u64,
56 pub asset_downloads_failed: bool,
58 pub skipped_schema: u64,
60 pub remote_heads: Vec<DeviceHead>,
63 pub row_changes: Vec<RowChange>,
66}
67
68struct DeferredChangeset {
70 device_id: String,
71 seq: u64,
72 changeset: Changeset,
73}
74
75pub async unsafe fn pull_changes(
88 db: *mut libsqlite3_sys::sqlite3,
89 storage: &dyn SyncStorage,
90 our_device_id: &str,
91 cursors: &HashMap<String, u64>,
92 library_dir: &LibraryDir,
93 blob_plan: &dyn BlobPlan,
94) -> Result<(HashMap<String, u64>, PullResult), PullError> {
95 let _ = library_dir;
96 if let Some(min_version) = storage
99 .get_min_schema_version()
100 .await
101 .map_err(PullError::Storage)?
102 {
103 if SCHEMA_VERSION < min_version {
104 return Err(PullError::SchemaVersionTooOld {
105 local_version: SCHEMA_VERSION,
106 min_version,
107 });
108 }
109 }
110
111 let heads = storage.list_heads().await.map_err(PullError::Storage)?;
112
113 let membership_chain: Option<MembershipChain> = match storage.list_membership_entries().await {
117 Ok(entries) if !entries.is_empty() => {
118 match super::membership_ops::download_chain(storage, &entries).await {
119 Ok(chain) => Some(chain),
120 Err(e) => {
121 warn!("failed to load membership chain for validation: {e}");
122 None
123 }
124 }
125 }
126 Ok(_) => None,
127 Err(e) => {
128 warn!("failed to list membership entries for validation: {e}");
129 None
130 }
131 };
132
133 let mut updated_cursors = cursors.clone();
134 let mut result = PullResult {
135 changesets_applied: 0,
136 devices_pulled: 0,
137 asset_downloads_failed: false,
138 skipped_schema: 0,
139 remote_heads: heads.clone(),
140 row_changes: Vec::new(),
141 };
142 let mut deferred: Vec<DeferredChangeset> = Vec::new();
143
144 for head in &heads {
145 if head.device_id == our_device_id {
147 continue;
148 }
149
150 let local_seq = cursor_for_device(cursors, &head.device_id);
151 if head.seq <= local_seq {
152 continue;
153 }
154
155 info!(
156 device_id = %head.device_id,
157 local_seq,
158 remote_seq = head.seq,
159 "pulling changesets"
160 );
161
162 let mut pulled_any = false;
163
164 for seq in (local_seq + 1)..=head.seq {
165 let envelope_bytes = match storage.get_changeset(&head.device_id, seq).await {
168 Ok(data) => data,
169 Err(e) => {
170 warn!(
171 device_id = %head.device_id,
172 seq,
173 error = %e,
174 "failed to fetch changeset, stopping pull for this device"
175 );
176 break;
177 }
178 };
179
180 let (env, changeset_bytes) =
181 envelope::unpack(&envelope_bytes).map_err(PullError::InvalidEnvelope)?;
182
183 if env.changeset_size != changeset_bytes.len() {
186 warn!(
187 device_id = %head.device_id,
188 seq,
189 expected = env.changeset_size,
190 actual = changeset_bytes.len(),
191 "changeset_size mismatch in envelope"
192 );
193 }
194
195 if env.schema_version > SCHEMA_VERSION {
197 warn!(
198 device_id = %head.device_id,
199 seq,
200 remote_version = env.schema_version,
201 local_version = SCHEMA_VERSION,
202 "skipping changeset with newer schema version"
203 );
204 result.skipped_schema += 1;
205 updated_cursors.insert(head.device_id.clone(), seq);
208 continue;
209 }
210
211 if !verify_changeset_signature(&env, &changeset_bytes) {
213 warn!(
214 device_id = %head.device_id,
215 seq,
216 "changeset has invalid signature, skipping"
217 );
218 updated_cursors.insert(head.device_id.clone(), seq);
219 continue;
220 }
221
222 if let Some(chain) = membership_chain.as_ref() {
228 let authorized = env
229 .author_pubkey
230 .as_ref()
231 .is_some_and(|pk| chain.is_member_at(pk, &env.timestamp));
232 if !authorized {
233 warn!(
234 device_id = %head.device_id,
235 seq,
236 author = ?env.author_pubkey,
237 "changeset not signed by a current member, skipping"
238 );
239 updated_cursors.insert(head.device_id.clone(), seq);
240 continue;
241 }
242 }
243
244 if changeset_bytes.is_empty() {
245 updated_cursors.insert(head.device_id.clone(), seq);
246 continue;
247 }
248
249 let cs = Changeset::from_bytes(&changeset_bytes);
250 let apply_result = apply_changeset_lww(db, &cs).map_err(PullError::Apply)?;
251
252 let changes = match crate::changeset::walk(&changeset_bytes) {
255 Ok(c) => c,
256 Err(e) => {
257 warn!("Failed to walk changeset: {e}");
258 Vec::new()
259 }
260 };
261
262 let blobs_ok = download_changeset_blobs(&changes, blob_plan, storage).await;
265
266 if apply_result.had_fk_violations {
267 deferred.push(DeferredChangeset {
268 device_id: head.device_id.clone(),
269 seq,
270 changeset: Changeset::from_bytes(&changeset_bytes),
271 });
272 }
273
274 result.changesets_applied += 1;
275 result.row_changes.extend(changes);
276
277 pulled_any = true;
278 if blobs_ok {
279 updated_cursors.insert(head.device_id.clone(), seq);
280 } else {
281 warn!(
282 "Blob download failed for {}/{}, cursor not advanced",
283 head.device_id, seq
284 );
285 result.asset_downloads_failed = true;
286 }
287 }
288
289 if pulled_any {
290 result.devices_pulled += 1;
291 }
292 }
293
294 if !deferred.is_empty() {
297 info!(
298 count = deferred.len(),
299 "retrying changesets with FK violations"
300 );
301
302 for d in &deferred {
303 let retry_result = apply_changeset_lww(db, &d.changeset).map_err(PullError::Apply)?;
304
305 if retry_result.had_fk_violations {
306 warn!(
307 device_id = %d.device_id,
308 seq = d.seq,
309 "changeset still has FK violations after retry, skipping"
310 );
311 }
312 }
313 }
314
315 Ok((updated_cursors, result))
316}
317
318async fn download_changeset_blobs(
323 changes: &[RowChange],
324 blob_plan: &dyn BlobPlan,
325 storage: &dyn SyncStorage,
326) -> bool {
327 let mut all_ok = true;
328 for blob in blob_plan.blobs_to_pull(changes) {
329 if blob.local_path.exists() {
330 continue;
331 }
332
333 match storage
334 .get_blob(&blob.namespace, &blob.id, blob.scope.clone())
335 .await
336 {
337 Ok(bytes) => {
338 if let Some(parent) = blob.local_path.parent() {
339 if let Err(e) = std::fs::create_dir_all(parent) {
340 warn!(id = %blob.id, error = %e, "failed to create blob directory");
341 all_ok = false;
342 continue;
343 }
344 }
345
346 if let Err(e) = std::fs::write(&blob.local_path, bytes) {
347 warn!(id = %blob.id, error = %e, "failed to write blob");
348 all_ok = false;
349 }
350 }
351 Err(e) => {
352 warn!(id = %blob.id, namespace = %blob.namespace, error = %e, "failed to download blob");
353 all_ok = false;
354 }
355 }
356 }
357 all_ok
358}
359
360#[derive(Debug)]
361pub enum PullError {
362 Storage(super::storage::StorageError),
363 InvalidEnvelope(super::envelope::UnpackError),
364 Apply(super::session::SyncError),
365 SchemaVersionTooOld {
368 local_version: u32,
369 min_version: u32,
370 },
371}
372
373impl std::fmt::Display for PullError {
374 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
375 match self {
376 PullError::Storage(e) => write!(f, "storage error: {e}"),
377 PullError::InvalidEnvelope(e) => write!(f, "invalid changeset envelope: {e}"),
378 PullError::Apply(e) => write!(f, "changeset apply failed: {e}"),
379 PullError::SchemaVersionTooOld {
380 local_version,
381 min_version,
382 } => write!(
383 f,
384 "local schema version {local_version} is below the storage minimum {min_version}, upgrade required"
385 ),
386 }
387 }
388}
389
390impl std::error::Error for PullError {}