1use std::ffi::{c_char, c_int, c_void, CStr, CString};
6use std::ptr;
7
8use libsqlite3_sys as ffi;
9
10pub struct Changeset {
12 buf: *mut c_void,
13 len: c_int,
14}
15
16impl Changeset {
17 pub fn from_bytes(bytes: &[u8]) -> Self {
20 if bytes.is_empty() {
21 return Changeset {
22 buf: ptr::null_mut(),
23 len: 0,
24 };
25 }
26 unsafe {
27 let buf = ffi::sqlite3_malloc(bytes.len() as c_int);
28 assert!(!buf.is_null(), "sqlite3_malloc failed");
29 ptr::copy_nonoverlapping(bytes.as_ptr(), buf as *mut u8, bytes.len());
30 Changeset {
31 buf,
32 len: bytes.len() as c_int,
33 }
34 }
35 }
36
37 pub fn as_bytes(&self) -> &[u8] {
38 if self.buf.is_null() || self.len == 0 {
39 return &[];
40 }
41 unsafe { std::slice::from_raw_parts(self.buf as *const u8, self.len as usize) }
42 }
43
44 pub fn len(&self) -> usize {
45 self.len as usize
46 }
47
48 pub fn is_empty(&self) -> bool {
49 self.len == 0
50 }
51}
52
53impl Drop for Changeset {
54 fn drop(&mut self) {
55 if !self.buf.is_null() {
56 unsafe { ffi::sqlite3_free(self.buf) };
57 }
58 }
59}
60
61unsafe impl Send for Changeset {}
64
65#[repr(i32)]
67pub enum ConflictAction {
68 Omit = ffi::SQLITE_CHANGESET_OMIT,
69 Replace = ffi::SQLITE_CHANGESET_REPLACE,
70 Abort = ffi::SQLITE_CHANGESET_ABORT,
71}
72
73#[repr(i32)]
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum ConflictType {
77 Data = ffi::SQLITE_CHANGESET_DATA,
78 NotFound = ffi::SQLITE_CHANGESET_NOTFOUND,
79 Conflict = ffi::SQLITE_CHANGESET_CONFLICT,
80 Constraint = ffi::SQLITE_CHANGESET_CONSTRAINT,
81 ForeignKey = ffi::SQLITE_CHANGESET_FOREIGN_KEY,
82}
83
84impl ConflictType {
85 fn from_raw(val: c_int) -> Self {
86 match val {
87 ffi::SQLITE_CHANGESET_DATA => ConflictType::Data,
88 ffi::SQLITE_CHANGESET_NOTFOUND => ConflictType::NotFound,
89 ffi::SQLITE_CHANGESET_CONFLICT => ConflictType::Conflict,
90 ffi::SQLITE_CHANGESET_CONSTRAINT => ConflictType::Constraint,
91 ffi::SQLITE_CHANGESET_FOREIGN_KEY => ConflictType::ForeignKey,
92 other => panic!("unknown conflict type: {other}"),
93 }
94 }
95}
96
97pub struct ConflictContext {
102 iter: *mut ffi::sqlite3_changeset_iter,
103}
104
105impl ConflictContext {
106 pub fn table_name(&self) -> &str {
108 unsafe {
109 let mut table: *const c_char = ptr::null();
110 let mut ncol: c_int = 0;
111 let mut op: c_int = 0;
112 let mut indirect: c_int = 0;
113 ffi::sqlite3changeset_op(self.iter, &mut table, &mut ncol, &mut op, &mut indirect);
114 CStr::from_ptr(table)
115 .to_str()
116 .expect("SQLite table names are always UTF-8")
117 }
118 }
119
120 pub fn column_count(&self) -> usize {
122 unsafe {
123 let mut table: *const c_char = ptr::null();
124 let mut ncol: c_int = 0;
125 let mut op: c_int = 0;
126 let mut indirect: c_int = 0;
127 ffi::sqlite3changeset_op(self.iter, &mut table, &mut ncol, &mut op, &mut indirect);
128 ncol as usize
129 }
130 }
131
132 pub fn new_value(&self, col: usize) -> Option<String> {
136 unsafe {
137 let mut val: *mut ffi::sqlite3_value = ptr::null_mut();
138 let rc = ffi::sqlite3changeset_new(self.iter, col as c_int, &mut val);
139 if rc != ffi::SQLITE_OK as c_int || val.is_null() {
140 return None;
141 }
142 value_to_string(val)
143 }
144 }
145
146 pub fn conflict_value(&self, col: usize) -> Option<String> {
149 unsafe {
150 let mut val: *mut ffi::sqlite3_value = ptr::null_mut();
151 let rc = ffi::sqlite3changeset_conflict(self.iter, col as c_int, &mut val);
152 if rc != ffi::SQLITE_OK as c_int || val.is_null() {
153 return None;
154 }
155 value_to_string(val)
156 }
157 }
158
159 pub fn old_value(&self, col: usize) -> Option<String> {
162 unsafe {
163 let mut val: *mut ffi::sqlite3_value = ptr::null_mut();
164 let rc = ffi::sqlite3changeset_old(self.iter, col as c_int, &mut val);
165 if rc != ffi::SQLITE_OK as c_int || val.is_null() {
166 return None;
167 }
168 value_to_string(val)
169 }
170 }
171}
172
173pub(crate) unsafe fn value_to_string(val: *mut ffi::sqlite3_value) -> Option<String> {
175 let vtype = ffi::sqlite3_value_type(val);
176 if vtype == ffi::SQLITE_NULL as c_int {
177 return None;
178 }
179 let text = ffi::sqlite3_value_text(val);
180 if text.is_null() {
181 return None;
182 }
183 Some(
184 CStr::from_ptr(text as *const c_char)
185 .to_string_lossy()
186 .into_owned(),
187 )
188}
189
190pub struct Session {
195 raw: *mut ffi::sqlite3_session,
196}
197
198unsafe impl Send for Session {}
203
204impl Session {
205 pub unsafe fn new(db: *mut ffi::sqlite3) -> Result<Self, i32> {
211 let db_name = CString::new("main").unwrap();
212 let mut raw: *mut ffi::sqlite3_session = ptr::null_mut();
213 let rc = ffi::sqlite3session_create(db, db_name.as_ptr(), &mut raw);
214 if rc != ffi::SQLITE_OK as c_int {
215 return Err(rc);
216 }
217 Ok(Session { raw })
218 }
219
220 pub fn attach(&self, table: Option<&str>) -> Result<(), i32> {
222 let c_table: Option<CString> = table.map(|t| CString::new(t).unwrap());
223 let ptr: *const c_char = c_table.as_ref().map(|c| c.as_ptr()).unwrap_or(ptr::null());
224 let rc = unsafe { ffi::sqlite3session_attach(self.raw, ptr) };
225 if rc != ffi::SQLITE_OK as c_int {
226 return Err(rc);
227 }
228 Ok(())
229 }
230
231 pub fn changeset(&self) -> Result<Changeset, i32> {
233 let mut len: c_int = 0;
234 let mut buf: *mut c_void = ptr::null_mut();
235 let rc = unsafe { ffi::sqlite3session_changeset(self.raw, &mut len, &mut buf) };
236 if rc != ffi::SQLITE_OK as c_int {
237 return Err(rc);
238 }
239 Ok(Changeset { buf, len })
240 }
241}
242
243impl Drop for Session {
244 fn drop(&mut self) {
245 unsafe { ffi::sqlite3session_delete(self.raw) };
246 }
247}
248
249pub unsafe fn apply_changeset<F>(
257 db: *mut ffi::sqlite3,
258 changeset: &Changeset,
259 mut conflict_handler: F,
260) -> Result<(), i32>
261where
262 F: FnMut(ConflictType) -> ConflictAction,
263{
264 apply_changeset_with_context(db, changeset, |ct, _ctx| conflict_handler(ct))
265}
266
267pub unsafe fn apply_changeset_with_context<F>(
275 db: *mut ffi::sqlite3,
276 changeset: &Changeset,
277 mut conflict_handler: F,
278) -> Result<(), i32>
279where
280 F: FnMut(ConflictType, &ConflictContext) -> ConflictAction,
281{
282 unsafe extern "C" fn filter_cb(_ctx: *mut c_void, _table: *const c_char) -> c_int {
283 1
285 }
286
287 unsafe extern "C" fn conflict_cb<F>(
288 ctx: *mut c_void,
289 conflict_type: c_int,
290 iter: *mut ffi::sqlite3_changeset_iter,
291 ) -> c_int
292 where
293 F: FnMut(ConflictType, &ConflictContext) -> ConflictAction,
294 {
295 let handler = &mut *(ctx as *mut F);
296 let ct = ConflictType::from_raw(conflict_type);
297 let context = ConflictContext { iter };
298 handler(ct, &context) as c_int
299 }
300
301 let rc = ffi::sqlite3changeset_apply(
302 db,
303 changeset.len,
304 changeset.buf,
305 Some(filter_cb),
306 Some(conflict_cb::<F>),
307 &mut conflict_handler as *mut F as *mut c_void,
308 );
309
310 if rc != ffi::SQLITE_OK as c_int {
311 return Err(rc);
312 }
313 Ok(())
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use crate::sync::test_helpers::*;
320
321 #[test]
322 fn test_basic_changeset_capture() {
323 unsafe {
324 let db = open_memory_db();
325 exec(db, "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)");
326
327 let session = Session::new(db).expect("session create");
328 session.attach(Some("items")).expect("attach");
329
330 exec(db, "INSERT INTO items VALUES (1, 'hello')");
331
332 let cs = session.changeset().expect("changeset");
333 assert!(!cs.is_empty(), "changeset should not be empty");
334
335 drop(session);
336 ffi::sqlite3_close(db);
337 }
338 }
339
340 #[test]
341 fn test_changeset_application() {
342 unsafe {
343 let db1 = open_memory_db();
345 exec(
346 db1,
347 "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)",
348 );
349
350 let session = Session::new(db1).expect("session create");
351 session.attach(Some("items")).expect("attach");
352
353 exec(db1, "INSERT INTO items VALUES (1, 'alpha')");
354 exec(db1, "INSERT INTO items VALUES (2, 'beta')");
355
356 let cs = session.changeset().expect("changeset");
357 drop(session);
358
359 let db2 = open_memory_db();
361 exec(
362 db2,
363 "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)",
364 );
365
366 apply_changeset(db2, &cs, |_conflict_type| ConflictAction::Abort)
367 .expect("apply changeset");
368
369 let count = query_int(db2, "SELECT COUNT(*) FROM items");
370 assert_eq!(count, 2, "DB2 should have 2 rows");
371
372 let name = query_text(db2, "SELECT name FROM items WHERE id = 1");
373 assert_eq!(name, "alpha");
374
375 let name = query_text(db2, "SELECT name FROM items WHERE id = 2");
376 assert_eq!(name, "beta");
377
378 ffi::sqlite3_close(db1);
379 ffi::sqlite3_close(db2);
380 }
381 }
382
383 #[test]
384 fn test_conflict_handler() {
385 unsafe {
386 let db1 = open_memory_db();
388 exec(
389 db1,
390 "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, updated_at TEXT)",
391 );
392 exec(
393 db1,
394 "INSERT INTO items VALUES (1, 'original', '2026-01-01T00:00:00Z')",
395 );
396
397 let session = Session::new(db1).expect("session create");
398 session.attach(Some("items")).expect("attach");
399
400 exec(
402 db1,
403 "UPDATE items SET name = 'from_db1', updated_at = '2026-01-03T00:00:00Z' WHERE id = 1",
404 );
405
406 let cs = session.changeset().expect("changeset");
407 assert!(!cs.is_empty());
408 drop(session);
409
410 let db2 = open_memory_db();
412 exec(
413 db2,
414 "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, updated_at TEXT)",
415 );
416 exec(
417 db2,
418 "INSERT INTO items VALUES (1, 'from_db2', '2026-01-02T00:00:00Z')",
419 );
420
421 let mut conflict_called = false;
425 let mut conflict_type_seen = None;
426
427 apply_changeset(db2, &cs, |ct| {
428 conflict_called = true;
429 conflict_type_seen = Some(ct);
430 ConflictAction::Replace
432 })
433 .expect("apply changeset");
434
435 assert!(conflict_called, "conflict handler should have been called");
436 assert_eq!(
437 conflict_type_seen,
438 Some(ConflictType::Data),
439 "should be a DATA conflict"
440 );
441
442 let name = query_text(db2, "SELECT name FROM items WHERE id = 1");
444 assert_eq!(name, "from_db1", "incoming changeset should win");
445
446 let updated = query_text(db2, "SELECT updated_at FROM items WHERE id = 1");
447 assert_eq!(updated, "2026-01-03T00:00:00Z");
448
449 ffi::sqlite3_close(db1);
450 ffi::sqlite3_close(db2);
451 }
452 }
453
454 #[test]
455 fn test_conflict_handler_omit() {
456 unsafe {
457 let db1 = open_memory_db();
459 exec(
460 db1,
461 "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, updated_at TEXT)",
462 );
463 exec(
464 db1,
465 "INSERT INTO items VALUES (1, 'original', '2026-01-01T00:00:00Z')",
466 );
467
468 let session = Session::new(db1).expect("session create");
469 session.attach(Some("items")).expect("attach");
470 exec(
471 db1,
472 "UPDATE items SET name = 'from_db1', updated_at = '2026-01-02T00:00:00Z' WHERE id = 1",
473 );
474
475 let cs = session.changeset().expect("changeset");
476 drop(session);
477
478 let db2 = open_memory_db();
479 exec(
480 db2,
481 "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, updated_at TEXT)",
482 );
483 exec(
484 db2,
485 "INSERT INTO items VALUES (1, 'from_db2', '2026-01-05T00:00:00Z')",
486 );
487
488 apply_changeset(db2, &cs, |_ct| {
489 ConflictAction::Omit
491 })
492 .expect("apply changeset");
493
494 let name = query_text(db2, "SELECT name FROM items WHERE id = 1");
496 assert_eq!(name, "from_db2", "local data should be preserved with OMIT");
497
498 ffi::sqlite3_close(db1);
499 ffi::sqlite3_close(db2);
500 }
501 }
502
503 #[test]
504 fn test_attach_all_tables() {
505 unsafe {
506 let db = open_memory_db();
507 exec(db, "CREATE TABLE t1 (id INTEGER PRIMARY KEY, val TEXT)");
508 exec(db, "CREATE TABLE t2 (id INTEGER PRIMARY KEY, val TEXT)");
509
510 let session = Session::new(db).expect("session create");
511 session.attach(None).expect("attach all");
513
514 exec(db, "INSERT INTO t1 VALUES (1, 'a')");
515 exec(db, "INSERT INTO t2 VALUES (1, 'b')");
516
517 let cs = session.changeset().expect("changeset");
518 assert!(
519 !cs.is_empty(),
520 "changeset should capture changes from both tables"
521 );
522
523 let db2 = open_memory_db();
525 exec(db2, "CREATE TABLE t1 (id INTEGER PRIMARY KEY, val TEXT)");
526 exec(db2, "CREATE TABLE t2 (id INTEGER PRIMARY KEY, val TEXT)");
527
528 apply_changeset(db2, &cs, |_| ConflictAction::Abort).expect("apply");
529
530 let v1 = query_text(db2, "SELECT val FROM t1 WHERE id = 1");
531 assert_eq!(v1, "a");
532 let v2 = query_text(db2, "SELECT val FROM t2 WHERE id = 1");
533 assert_eq!(v2, "b");
534
535 drop(session);
536 ffi::sqlite3_close(db);
537 ffi::sqlite3_close(db2);
538 }
539 }
540
541 #[test]
542 fn test_conflict_context_reads_values() {
543 unsafe {
544 let db1 = open_memory_db();
545 exec(
546 db1,
547 "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, _updated_at TEXT)",
548 );
549 exec(
550 db1,
551 "INSERT INTO items VALUES (1, 'orig', '2026-01-01T00:00:00Z')",
552 );
553
554 let session = Session::new(db1).expect("session create");
555 session.attach(Some("items")).expect("attach");
556 exec(
557 db1,
558 "UPDATE items SET name = 'updated', _updated_at = '2026-01-03T00:00:00Z' WHERE id = 1",
559 );
560 let cs = session.changeset().expect("changeset");
561 drop(session);
562
563 let db2 = open_memory_db();
564 exec(
565 db2,
566 "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, _updated_at TEXT)",
567 );
568 exec(
569 db2,
570 "INSERT INTO items VALUES (1, 'local', '2026-01-02T00:00:00Z')",
571 );
572
573 let mut seen_table = String::new();
574 let mut seen_new_ts = String::new();
575 let mut seen_conflict_ts = String::new();
576
577 apply_changeset_with_context(db2, &cs, |_ct, ctx| {
578 seen_table = ctx.table_name().to_string();
579 seen_new_ts = ctx.new_value(2).unwrap_or_default();
581 seen_conflict_ts = ctx.conflict_value(2).unwrap_or_default();
582 ConflictAction::Replace
583 })
584 .expect("apply");
585
586 assert_eq!(seen_table, "items");
587 assert_eq!(seen_new_ts, "2026-01-03T00:00:00Z");
588 assert_eq!(seen_conflict_ts, "2026-01-02T00:00:00Z");
589
590 ffi::sqlite3_close(db1);
591 ffi::sqlite3_close(db2);
592 }
593 }
594}