coven/
changeset.rs

1//! Changeset walking: the single primitive for inspecting SQLite changesets.
2//!
3//! coven uses it internally (to find blobs a changeset references); the host
4//! uses it to map row-changes to its own domain events. It is the crate's only
5//! changeset iterator.
6
7use std::ffi::{c_char, c_int, c_void, CStr};
8use std::ptr;
9
10use libsqlite3_sys as ffi;
11
12use crate::sync::session_ext::value_to_string;
13
14/// The operation type for a changeset entry.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum ChangeOp {
17    Insert,
18    Update,
19    Delete,
20}
21
22/// One row change extracted from a changeset.
23///
24/// `columns` holds the row's column values in schema order. The value chosen
25/// per column follows the changeset's old/new semantics:
26/// - `Insert`: the new value.
27/// - `Delete`: the old value.
28/// - `Update`: the old value if present, else the new value (so unchanged
29///   columns — primary keys, foreign keys — are still available).
30///
31/// `None` means SQL NULL or a column absent from the changeset.
32#[derive(Debug, Clone)]
33pub struct RowChange {
34    pub table: String,
35    pub op: ChangeOp,
36    pub columns: Vec<Option<String>>,
37}
38
39impl RowChange {
40    /// The primary key (column 0).
41    pub fn pk(&self) -> Option<&str> {
42        self.col(0)
43    }
44
45    /// A column value by index.
46    pub fn col(&self, i: usize) -> Option<&str> {
47        self.columns.get(i).and_then(|c| c.as_deref())
48    }
49}
50
51/// Walk a changeset and return every row change with its column values.
52///
53/// Returns an empty vec for an empty changeset.
54pub fn walk(changeset_bytes: &[u8]) -> Result<Vec<RowChange>, String> {
55    if changeset_bytes.is_empty() {
56        return Ok(Vec::new());
57    }
58
59    let mut changes = Vec::new();
60
61    unsafe {
62        let mut iter: *mut ffi::sqlite3_changeset_iter = ptr::null_mut();
63        let rc = ffi::sqlite3changeset_start(
64            &mut iter,
65            changeset_bytes.len() as c_int,
66            changeset_bytes.as_ptr() as *mut c_void,
67        );
68        if rc != ffi::SQLITE_OK as c_int {
69            return Err(format!("sqlite3changeset_start failed (rc={rc})"));
70        }
71
72        loop {
73            let step = ffi::sqlite3changeset_next(iter);
74            if step == ffi::SQLITE_DONE as c_int {
75                break;
76            }
77            if step != ffi::SQLITE_ROW as c_int {
78                ffi::sqlite3changeset_finalize(iter);
79                return Err(format!("sqlite3changeset_next failed (rc={step})"));
80            }
81
82            let mut table: *const c_char = ptr::null();
83            let mut ncol: c_int = 0;
84            let mut op: c_int = 0;
85            let mut indirect: c_int = 0;
86            ffi::sqlite3changeset_op(iter, &mut table, &mut ncol, &mut op, &mut indirect);
87
88            let table_name = CStr::from_ptr(table)
89                .to_str()
90                .expect("SQLite table names are always UTF-8")
91                .to_string();
92
93            let change_op = match op {
94                ffi::SQLITE_INSERT => ChangeOp::Insert,
95                ffi::SQLITE_UPDATE => ChangeOp::Update,
96                ffi::SQLITE_DELETE => ChangeOp::Delete,
97                _ => continue,
98            };
99
100            let columns = (0..ncol).map(|c| extract_col(iter, c, change_op)).collect();
101
102            changes.push(RowChange {
103                table: table_name,
104                op: change_op,
105                columns,
106            });
107        }
108
109        let rc = ffi::sqlite3changeset_finalize(iter);
110        if rc != ffi::SQLITE_OK as c_int {
111            return Err(format!("sqlite3changeset_finalize failed (rc={rc})"));
112        }
113    }
114
115    Ok(changes)
116}
117
118/// Extract a column value following the op's old/new semantics.
119unsafe fn extract_col(
120    iter: *mut ffi::sqlite3_changeset_iter,
121    col: c_int,
122    op: ChangeOp,
123) -> Option<String> {
124    match op {
125        ChangeOp::Insert => extract_new_value(iter, col),
126        ChangeOp::Delete => extract_old_value(iter, col),
127        ChangeOp::Update => extract_old_value(iter, col).or_else(|| extract_new_value(iter, col)),
128    }
129}
130
131unsafe fn extract_new_value(iter: *mut ffi::sqlite3_changeset_iter, col: c_int) -> Option<String> {
132    let mut val: *mut ffi::sqlite3_value = ptr::null_mut();
133    let rc = ffi::sqlite3changeset_new(iter, col, &mut val);
134    if rc != ffi::SQLITE_OK as c_int || val.is_null() {
135        return None;
136    }
137    value_to_string(val)
138}
139
140unsafe fn extract_old_value(iter: *mut ffi::sqlite3_changeset_iter, col: c_int) -> Option<String> {
141    let mut val: *mut ffi::sqlite3_value = ptr::null_mut();
142    let rc = ffi::sqlite3changeset_old(iter, col, &mut val);
143    if rc != ffi::SQLITE_OK as c_int || val.is_null() {
144        return None;
145    }
146    value_to_string(val)
147}