coven/sync/
conflict.rs

1/// Production conflict handler for changeset application.
2///
3/// Uses row-level Last-Writer-Wins (LWW) based on the `_updated_at` column,
4/// which contains HLC timestamps that sort lexicographically = causally.
5///
6/// The `_updated_at` column index is looked up dynamically from the schema
7/// (via `TableSchema`) so adding columns to the end of a table is safe.
8///
9/// For `release_files`, the `encryption_nonce` column is device-specific.
10/// When incoming wins a DATA conflict on that table, the row ID is recorded
11/// so the caller can restore the local value afterward.
12use std::collections::HashMap;
13use std::ffi::{c_char, c_int, CStr, CString};
14use std::ptr;
15
16use libsqlite3_sys as ffi;
17use tracing::warn;
18
19use super::session_ext::{ConflictAction, ConflictContext, ConflictType};
20
21/// Column indices for a synced table, looked up from `PRAGMA table_info`.
22pub struct TableColumns {
23    /// Index of the `_updated_at` column.
24    pub updated_at: usize,
25}
26
27/// Schema info for all synced tables: maps table name to column indices.
28pub struct TableSchema {
29    tables: HashMap<String, TableColumns>,
30}
31
32impl TableSchema {
33    /// Build schema info by querying `PRAGMA table_info` for each synced table.
34    ///
35    /// # Safety
36    /// `db` must be a valid, open sqlite3 connection pointer.
37    pub unsafe fn from_db(db: *mut ffi::sqlite3, synced_tables: &[&str]) -> Self {
38        let mut tables = HashMap::new();
39
40        for &table in synced_tables {
41            let sql = format!("PRAGMA table_info({table})");
42            let c_sql = CString::new(sql).unwrap();
43            let mut stmt: *mut ffi::sqlite3_stmt = ptr::null_mut();
44            let rc = ffi::sqlite3_prepare_v2(db, c_sql.as_ptr(), -1, &mut stmt, ptr::null_mut());
45            assert_eq!(
46                rc,
47                ffi::SQLITE_OK as c_int,
48                "PRAGMA table_info failed for {table}"
49            );
50
51            let mut updated_at = None;
52
53            while ffi::sqlite3_step(stmt) == ffi::SQLITE_ROW as c_int {
54                let col_index = ffi::sqlite3_column_int(stmt, 0) as usize;
55                let name_ptr = ffi::sqlite3_column_text(stmt, 1);
56                if name_ptr.is_null() {
57                    continue;
58                }
59                let name = CStr::from_ptr(name_ptr as *const c_char)
60                    .to_str()
61                    .expect("SQLite column names are always UTF-8");
62
63                if name == "_updated_at" {
64                    updated_at = Some(col_index);
65                }
66            }
67
68            ffi::sqlite3_finalize(stmt);
69
70            let updated_at = updated_at.unwrap_or_else(|| {
71                panic!("synced table {table} has no _updated_at column");
72            });
73
74            tables.insert(table.to_string(), TableColumns { updated_at });
75        }
76
77        TableSchema { tables }
78    }
79
80    /// Look up column info for a table. Panics if the table was not in the
81    /// synced tables list passed to `from_db`.
82    pub fn get(&self, table: &str) -> &TableColumns {
83        self.tables.get(table).unwrap_or_else(|| {
84            panic!("unknown synced table in conflict handler: {table}");
85        })
86    }
87}
88
89/// Tracks state across conflict handler invocations within a single apply.
90#[derive(Default)]
91pub struct ConflictTracker {
92    /// True if any FK constraint violations were reported.
93    pub had_constraint_conflict: bool,
94    /// Row IDs in `release_files` where incoming won a DATA conflict and
95    /// device-specific columns need to be restored afterward.
96    pub release_file_restore_ids: Vec<String>,
97}
98
99impl ConflictTracker {
100    pub fn new() -> Self {
101        Self::default()
102    }
103}
104
105/// The production conflict handler for `apply_changeset_with_context`.
106///
107/// Rules:
108/// - **DATA** (same row, both sides edited): compare `_updated_at`. Newer wins.
109///   For `release_files`, records the row ID so device-specific columns can
110///   be restored by the caller.
111/// - **NOTFOUND** (row deleted locally, incoming UPDATE): OMIT (delete wins).
112/// - **CONFLICT** (row exists, incoming INSERT): compare `_updated_at`. Newer wins.
113/// - **CONSTRAINT** (FK violation): OMIT and track for retry.
114/// - **FOREIGN_KEY**: OMIT (deferred FK check failure, handled by retry).
115pub fn lww_conflict_handler(
116    conflict_type: ConflictType,
117    ctx: &ConflictContext,
118    schema: &TableSchema,
119    tracker: &mut ConflictTracker,
120) -> ConflictAction {
121    match conflict_type {
122        ConflictType::Data => {
123            let table = ctx.table_name();
124            let cols = schema.get(table);
125
126            let incoming = ctx.new_value(cols.updated_at);
127            let local = ctx.conflict_value(cols.updated_at);
128
129            match (incoming.as_deref(), local.as_deref()) {
130                (Some(inc), Some(loc)) if inc > loc => {
131                    // Incoming wins. For release_files, record the row ID
132                    // so device-specific columns can be restored after apply.
133                    if table == "release_files" {
134                        if let Some(row_id) = ctx.conflict_value(0) {
135                            tracker.release_file_restore_ids.push(row_id);
136                        }
137                    }
138
139                    ConflictAction::Replace
140                }
141                (Some(_), Some(_)) => ConflictAction::Omit,
142                _ => {
143                    warn!(
144                        table,
145                        "DATA conflict without _updated_at values, keeping local"
146                    );
147                    ConflictAction::Omit
148                }
149            }
150        }
151
152        ConflictType::NotFound => {
153            // Row was deleted locally, incoming changeset has an UPDATE.
154            // Delete wins.
155            ConflictAction::Omit
156        }
157
158        ConflictType::Conflict => {
159            // Row already exists locally but incoming changeset has an INSERT
160            // (duplicate PK). Compare _updated_at to decide which version wins.
161            let table = ctx.table_name();
162            let cols = schema.get(table);
163
164            let incoming = ctx.new_value(cols.updated_at);
165            let local = ctx.conflict_value(cols.updated_at);
166
167            match (incoming.as_deref(), local.as_deref()) {
168                (Some(inc), Some(loc)) if inc > loc => ConflictAction::Replace,
169                (Some(_), Some(_)) => ConflictAction::Omit,
170                _ => {
171                    warn!(table, "CONFLICT without _updated_at values, keeping local");
172                    ConflictAction::Omit
173                }
174            }
175        }
176
177        ConflictType::Constraint => {
178            tracker.had_constraint_conflict = true;
179            ConflictAction::Omit
180        }
181
182        ConflictType::ForeignKey => {
183            tracker.had_constraint_conflict = true;
184            ConflictAction::Omit
185        }
186    }
187}