coven/sync/
hlc.rs

1/// Hybrid Logical Clock (HLC) for causal ordering of writes across devices.
2///
3/// Used as the `_updated_at` column value on all synced tables. Provides
4/// monotonically increasing timestamps that handle clock skew between devices.
5///
6/// Format: `{millis:013}-{counter:04}-{device_id}`
7/// Lexicographic string comparison gives correct causal ordering.
8use std::sync::Mutex;
9use std::time::{SystemTime, UNIX_EPOCH};
10
11/// Maximum allowed clock skew from a remote timestamp (24 hours in ms).
12/// If an incoming timestamp is more than this far ahead of local wall time,
13/// we accept it but don't advance our local clock past wall time.
14const MAX_CLOCK_DRIFT_MS: u64 = 24 * 60 * 60 * 1000;
15
16/// A parsed HLC timestamp.
17#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
18pub struct Timestamp {
19    pub millis: u64,
20    pub counter: u16,
21    pub device_id: String,
22}
23
24impl Timestamp {
25    pub fn new(millis: u64, counter: u16, device_id: String) -> Self {
26        Self {
27            millis,
28            counter,
29            device_id,
30        }
31    }
32
33    /// Parse from the string format.
34    pub fn parse(s: &str) -> Option<Self> {
35        let mut parts = s.splitn(3, '-');
36        let millis = parts.next()?.parse::<u64>().ok()?;
37        let counter = parts.next()?.parse::<u16>().ok()?;
38        let device_id = parts.next()?;
39        if device_id.is_empty() {
40            return None;
41        }
42        Some(Self {
43            millis,
44            counter,
45            device_id: device_id.to_string(),
46        })
47    }
48}
49
50impl std::fmt::Display for Timestamp {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        write!(
53            f,
54            "{:013}-{:04}-{}",
55            self.millis, self.counter, self.device_id
56        )
57    }
58}
59
60struct HlcState {
61    millis: u64,
62    counter: u16,
63}
64
65/// Hybrid Logical Clock.
66///
67/// Thread-safe via interior `Mutex`. Create one per application lifetime,
68/// pass by reference to write methods.
69pub struct Hlc {
70    device_id: String,
71    state: Mutex<HlcState>,
72    /// Injected wall clock for testing. Returns milliseconds since epoch.
73    wall_clock: Box<dyn Fn() -> u64 + Send + Sync>,
74}
75
76impl Hlc {
77    /// Create a new HLC with the given device ID.
78    pub fn new(device_id: String) -> Self {
79        Self {
80            device_id,
81            state: Mutex::new(HlcState {
82                millis: 0,
83                counter: 0,
84            }),
85            wall_clock: Box::new(wall_clock_ms),
86        }
87    }
88
89    /// Generate a new timestamp. Guaranteed to be greater than any previous
90    /// timestamp returned by this clock.
91    pub fn now(&self) -> Timestamp {
92        let wall = (self.wall_clock)();
93        let mut state = self.state.lock().unwrap();
94
95        if wall > state.millis {
96            state.millis = wall;
97            state.counter = 0;
98        } else {
99            state.counter += 1;
100        }
101
102        Timestamp::new(state.millis, state.counter, self.device_id.clone())
103    }
104
105    /// Merge with a remote timestamp. Advances the local clock to maintain
106    /// the "happened after" relationship. Returns the new local timestamp.
107    ///
108    /// Implements a clock skew guard: if the remote's wall time is more than
109    /// 24 hours ahead of local wall time, we accept the remote but don't
110    /// advance our physical clock past local wall time.
111    pub fn update(&self, remote: &Timestamp) -> Timestamp {
112        let wall = (self.wall_clock)();
113        let mut state = self.state.lock().unwrap();
114
115        let remote_millis = if remote.millis > wall + MAX_CLOCK_DRIFT_MS {
116            // Remote clock is unreasonably far ahead. Don't let it pull us
117            // into the future -- use our wall clock instead.
118            wall
119        } else {
120            remote.millis
121        };
122
123        if wall > state.millis && wall > remote_millis {
124            // Wall clock is ahead of both local and remote: reset counter.
125            state.millis = wall;
126            state.counter = 0;
127        } else if remote_millis > state.millis {
128            // Remote is ahead of local: adopt remote's time, increment counter.
129            state.millis = remote_millis;
130            state.counter = remote.counter + 1;
131        } else if state.millis > remote_millis {
132            // Local is ahead: keep local time, increment counter.
133            state.counter += 1;
134        } else {
135            // Same millis: take the higher counter + 1.
136            state.counter = state.counter.max(remote.counter) + 1;
137        }
138
139        Timestamp::new(state.millis, state.counter, self.device_id.clone())
140    }
141
142    #[cfg(test)]
143    fn with_wall_clock(device_id: String, clock: impl Fn() -> u64 + Send + Sync + 'static) -> Self {
144        Self {
145            device_id,
146            state: Mutex::new(HlcState {
147                millis: 0,
148                counter: 0,
149            }),
150            wall_clock: Box::new(clock),
151        }
152    }
153}
154
155fn wall_clock_ms() -> u64 {
156    SystemTime::now()
157        .duration_since(UNIX_EPOCH)
158        .expect("system clock before UNIX epoch")
159        .as_millis() as u64
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use std::sync::atomic::{AtomicU64, Ordering};
166    use std::sync::Arc;
167
168    fn fixed_clock(ms: u64) -> impl Fn() -> u64 + Send + Sync + 'static {
169        move || ms
170    }
171
172    fn advancing_clock(start: u64) -> (Arc<AtomicU64>, impl Fn() -> u64 + Send + Sync + 'static) {
173        let time = Arc::new(AtomicU64::new(start));
174        let time_clone = time.clone();
175        (time, move || time_clone.load(Ordering::SeqCst))
176    }
177
178    #[test]
179    fn basic_monotonicity() {
180        let hlc = Hlc::new("dev-1".into());
181        let t1 = hlc.now();
182        let t2 = hlc.now();
183        let t3 = hlc.now();
184
185        assert!(t2 > t1, "t2={t2} should be > t1={t1}");
186        assert!(t3 > t2, "t3={t3} should be > t2={t2}");
187    }
188
189    #[test]
190    fn counter_increments_when_clock_stalls() {
191        let hlc = Hlc::with_wall_clock("dev-1".into(), fixed_clock(1000));
192
193        let t1 = hlc.now();
194        assert_eq!(t1.millis, 1000);
195        assert_eq!(t1.counter, 0);
196
197        let t2 = hlc.now();
198        assert_eq!(t2.millis, 1000);
199        assert_eq!(t2.counter, 1);
200
201        let t3 = hlc.now();
202        assert_eq!(t3.millis, 1000);
203        assert_eq!(t3.counter, 2);
204
205        assert!(t3 > t2);
206        assert!(t2 > t1);
207    }
208
209    #[test]
210    fn wall_clock_advance_resets_counter() {
211        let (time, clock) = advancing_clock(1000);
212        let hlc = Hlc::with_wall_clock("dev-1".into(), clock);
213
214        let t1 = hlc.now();
215        assert_eq!(t1.millis, 1000);
216        assert_eq!(t1.counter, 0);
217
218        // Stall the clock -- counter increments.
219        let t2 = hlc.now();
220        assert_eq!(t2.counter, 1);
221
222        // Advance the clock -- counter resets.
223        time.store(2000, Ordering::SeqCst);
224        let t3 = hlc.now();
225        assert_eq!(t3.millis, 2000);
226        assert_eq!(t3.counter, 0);
227
228        assert!(t3 > t2);
229    }
230
231    #[test]
232    fn merge_with_remote_ahead() {
233        let hlc = Hlc::with_wall_clock("dev-local".into(), fixed_clock(1000));
234
235        // Local clock is at 1000. Remote is at 5000.
236        let remote = Timestamp::new(5000, 3, "dev-remote".into());
237        let t = hlc.update(&remote);
238
239        assert_eq!(t.millis, 5000);
240        assert_eq!(t.counter, 4); // remote counter + 1
241        assert_eq!(t.device_id, "dev-local");
242
243        // Subsequent now() should still be >= the merged timestamp.
244        let t2 = hlc.now();
245        assert!(t2 > t, "t2={t2} should be > t={t}");
246    }
247
248    #[test]
249    fn merge_with_remote_behind() {
250        let hlc = Hlc::with_wall_clock("dev-local".into(), fixed_clock(5000));
251
252        // Prime the local clock.
253        hlc.now();
254
255        // Remote is behind.
256        let remote = Timestamp::new(1000, 10, "dev-remote".into());
257        let t = hlc.update(&remote);
258
259        // Local millis should stay at 5000 (ahead), counter increments.
260        assert_eq!(t.millis, 5000);
261        assert_eq!(t.counter, 1); // was 0, now incremented
262        assert_eq!(t.device_id, "dev-local");
263    }
264
265    #[test]
266    fn merge_with_same_millis() {
267        let hlc = Hlc::with_wall_clock("dev-local".into(), fixed_clock(3000));
268
269        // Prime: millis=3000, counter=0.
270        hlc.now();
271
272        // Remote also at 3000 but with counter=5.
273        let remote = Timestamp::new(3000, 5, "dev-remote".into());
274        let t = hlc.update(&remote);
275
276        assert_eq!(t.millis, 3000);
277        // max(local_counter=0, remote_counter=5) + 1 = 6
278        assert_eq!(t.counter, 6);
279    }
280
281    #[test]
282    fn clock_skew_guard_rejects_far_future() {
283        let hlc = Hlc::with_wall_clock("dev-local".into(), fixed_clock(1000));
284
285        // Remote claims a time 48 hours in the future -- beyond the 24h guard.
286        let far_future = 1000 + MAX_CLOCK_DRIFT_MS + 1;
287        let remote = Timestamp::new(far_future, 0, "dev-remote".into());
288        let t = hlc.update(&remote);
289
290        // Should NOT adopt the far-future millis. Should use wall clock instead.
291        assert_eq!(t.millis, 1000);
292    }
293
294    #[test]
295    fn clock_skew_guard_accepts_near_future() {
296        let hlc = Hlc::with_wall_clock("dev-local".into(), fixed_clock(1000));
297
298        // Remote is 1 hour ahead -- within the 24h guard.
299        let near_future = 1000 + 60 * 60 * 1000;
300        let remote = Timestamp::new(near_future, 0, "dev-remote".into());
301        let t = hlc.update(&remote);
302
303        // Should adopt the near-future millis.
304        assert_eq!(t.millis, near_future);
305    }
306
307    #[test]
308    fn string_roundtrip() {
309        let ts = Timestamp::new(1707580800000, 42, "dev-abc123".into());
310        let s = ts.to_string();
311        let parsed = Timestamp::parse(&s).expect("parse should succeed");
312
313        assert_eq!(parsed, ts);
314        assert_eq!(s, "1707580800000-0042-dev-abc123");
315    }
316
317    #[test]
318    fn string_format_is_zero_padded() {
319        let ts = Timestamp::new(1000, 0, "d".into());
320        assert_eq!(ts.to_string(), "0000000001000-0000-d");
321
322        let ts2 = Timestamp::new(9999999999999, 9999, "d".into());
323        assert_eq!(ts2.to_string(), "9999999999999-9999-d");
324    }
325
326    #[test]
327    fn lexicographic_ordering_matches_causal_ordering() {
328        let timestamps = [
329            Timestamp::new(1000, 0, "dev-a".into()),
330            Timestamp::new(1000, 1, "dev-a".into()),
331            Timestamp::new(1000, 1, "dev-b".into()),
332            Timestamp::new(2000, 0, "dev-a".into()),
333            Timestamp::new(2000, 0, "dev-b".into()),
334        ];
335
336        let strings: Vec<String> = timestamps.iter().map(|t| t.to_string()).collect();
337
338        // Verify the string list is sorted.
339        for i in 1..strings.len() {
340            assert!(
341                strings[i] > strings[i - 1],
342                "Expected {:?} > {:?}",
343                strings[i],
344                strings[i - 1]
345            );
346        }
347    }
348
349    #[test]
350    fn device_id_breaks_ties() {
351        let ts_a = Timestamp::new(5000, 3, "aaa".into());
352        let ts_b = Timestamp::new(5000, 3, "bbb".into());
353
354        // Derived ordering: same millis, same counter, device_id decides.
355        assert!(ts_b > ts_a);
356
357        // String comparison should agree.
358        assert!(ts_b.to_string() > ts_a.to_string());
359    }
360
361    #[test]
362    fn parse_rejects_invalid_input() {
363        assert!(Timestamp::parse("").is_none());
364        assert!(Timestamp::parse("not-a-timestamp").is_none());
365        assert!(Timestamp::parse("1000-0000").is_none()); // missing device_id
366        assert!(Timestamp::parse("1000-0000-").is_none()); // empty device_id
367        assert!(Timestamp::parse("abc-0000-dev").is_none()); // non-numeric millis
368        assert!(Timestamp::parse("1000-xyz-dev").is_none()); // non-numeric counter
369    }
370
371    #[test]
372    fn parse_handles_device_id_with_dashes() {
373        // Device IDs are UUIDs, which contain dashes. splitn(3, '-') must
374        // correctly capture the remainder as the device_id.
375        let ts = Timestamp::new(1000, 0, "550e8400-e29b-41d4-a716-446655440000".into());
376        let s = ts.to_string();
377        let parsed = Timestamp::parse(&s).expect("parse should handle UUID device_id");
378        assert_eq!(parsed.device_id, "550e8400-e29b-41d4-a716-446655440000");
379        assert_eq!(parsed, ts);
380    }
381}