coven/storage/cloud/
google_drive.rs

1//! Google Drive `CloudHome` implementation.
2//!
3//! Uses the Google Drive REST API v3 with OAuth 2.0 tokens.
4//! Files are stored flat in a single folder -- path separators are encoded as `__`.
5
6use async_trait::async_trait;
7
8use super::oauth_session::OAuthSession;
9use super::{CloudHome, CloudHomeError, CloudHomeJoinInfo};
10use crate::clock::ClockRef;
11use crate::keys::KeyService;
12use crate::oauth::{OAuthConfig, OAuthTokens};
13
14const DRIVE_API: &str = "https://www.googleapis.com/drive/v3";
15const UPLOAD_API: &str = "https://www.googleapis.com/upload/drive/v3";
16
17/// Google Drive cloud home backend.
18pub struct GoogleDriveCloudHome {
19    client: reqwest::Client,
20    folder_id: String,
21    session: OAuthSession,
22}
23
24impl GoogleDriveCloudHome {
25    pub fn new(
26        folder_id: String,
27        tokens: OAuthTokens,
28        key_service: KeyService,
29        clock: ClockRef,
30    ) -> Self {
31        Self {
32            client: reqwest::Client::new(),
33            folder_id,
34            session: OAuthSession::new(
35                tokens,
36                key_service,
37                clock,
38                Self::oauth_config(),
39                "Google Drive",
40            ),
41        }
42    }
43
44    pub fn oauth_config() -> OAuthConfig {
45        let creds = crate::oauth::oauth_client_creds("google_drive");
46        OAuthConfig {
47            client_id: creds.client_id,
48            client_secret: creds.client_secret,
49            auth_url: "https://accounts.google.com/o/oauth2/v2/auth".to_string(),
50            token_url: "https://oauth2.googleapis.com/token".to_string(),
51            scopes: vec!["https://www.googleapis.com/auth/drive.file".to_string()],
52            redirect_port: 19284,
53            extra_auth_params: vec![("access_type".to_string(), "offline".to_string())],
54        }
55    }
56
57    /// Encode a CloudHome key to a flat Google Drive filename.
58    /// `changes/dev1/42.enc` -> `changes__dev1__42.enc`
59    fn encode_key(key: &str) -> String {
60        key.replace('/', "__")
61    }
62
63    /// Decode a flat filename back to a CloudHome key.
64    /// `changes__dev1__42.enc` -> `changes/dev1/42.enc`
65    fn decode_key(filename: &str) -> String {
66        filename.replace("__", "/")
67    }
68
69    /// Encode a prefix for Google Drive query matching.
70    /// `changes/dev1/` -> `changes__dev1__`
71    fn encode_prefix(prefix: &str) -> String {
72        prefix.replace('/', "__")
73    }
74
75    /// Make an API call with automatic token refresh on 401.
76    async fn api_call(
77        &self,
78        build_request: impl Fn(&str) -> reqwest::RequestBuilder,
79    ) -> Result<reqwest::Response, CloudHomeError> {
80        self.session.api_call(build_request).await
81    }
82
83    /// Find a file's Google Drive ID by name within our folder.
84    async fn find_file_id(&self, encoded_name: &str) -> Result<Option<String>, CloudHomeError> {
85        let query = format!(
86            "'{}' in parents and name = '{}' and trashed = false",
87            self.folder_id, encoded_name
88        );
89
90        let resp = self
91            .api_call(|token| {
92                self.client
93                    .get(format!("{}/files", DRIVE_API))
94                    .bearer_auth(token)
95                    .query(&[
96                        ("q", query.as_str()),
97                        ("fields", "files(id)"),
98                        ("pageSize", "1"),
99                    ])
100            })
101            .await?;
102
103        let status = resp.status();
104        let body = resp
105            .text()
106            .await
107            .map_err(|e| CloudHomeError::Storage(format!("read body: {e}")))?;
108
109        if !status.is_success() {
110            return Err(CloudHomeError::Storage(format!(
111                "list files (HTTP {status}): {body}"
112            )));
113        }
114
115        let json: serde_json::Value = serde_json::from_str(&body)
116            .map_err(|e| CloudHomeError::Storage(format!("parse list response: {e}")))?;
117
118        if let Some(files) = json["files"].as_array() {
119            if let Some(first) = files.first() {
120                if let Some(id) = first["id"].as_str() {
121                    return Ok(Some(id.to_string()));
122                }
123            }
124        }
125
126        Ok(None)
127    }
128}
129
130#[async_trait]
131impl CloudHome for GoogleDriveCloudHome {
132    async fn write(&self, key: &str, data: Vec<u8>) -> Result<(), CloudHomeError> {
133        let encoded = Self::encode_key(key);
134
135        // Check if file already exists (update vs create)
136        if let Some(file_id) = self.find_file_id(&encoded).await? {
137            // Update existing file
138            let resp = self
139                .api_call(|token| {
140                    self.client
141                        .patch(format!("{}/files/{}?uploadType=media", UPLOAD_API, file_id))
142                        .bearer_auth(token)
143                        .header("Content-Type", "application/octet-stream")
144                        .body(data.clone())
145                })
146                .await?;
147
148            let status = resp.status();
149            if !status.is_success() {
150                let body = resp
151                    .text()
152                    .await
153                    .unwrap_or_else(|e| format!("<body read failed: {e}>"));
154                return Err(CloudHomeError::Storage(format!(
155                    "update {key} (HTTP {status}): {body}"
156                )));
157            }
158        } else {
159            // Create new file (multipart: metadata + content)
160            let metadata = serde_json::json!({
161                "name": encoded,
162                "parents": [self.folder_id],
163            });
164
165            let boundary = "bae_multipart_boundary";
166            let mut body = Vec::new();
167
168            // Part 1: metadata
169            body.extend_from_slice(format!("--{boundary}\r\n").as_bytes());
170            body.extend_from_slice(b"Content-Type: application/json; charset=UTF-8\r\n\r\n");
171            body.extend_from_slice(metadata.to_string().as_bytes());
172            body.extend_from_slice(b"\r\n");
173
174            // Part 2: file content
175            body.extend_from_slice(format!("--{boundary}\r\n").as_bytes());
176            body.extend_from_slice(b"Content-Type: application/octet-stream\r\n\r\n");
177            body.extend_from_slice(&data);
178            body.extend_from_slice(b"\r\n");
179
180            // End boundary
181            body.extend_from_slice(format!("--{boundary}--\r\n").as_bytes());
182
183            let resp = self
184                .api_call(|token| {
185                    self.client
186                        .post(format!("{}/files?uploadType=multipart", UPLOAD_API))
187                        .bearer_auth(token)
188                        .header(
189                            "Content-Type",
190                            format!("multipart/related; boundary={boundary}"),
191                        )
192                        .body(body.clone())
193                })
194                .await?;
195
196            let status = resp.status();
197            if !status.is_success() {
198                let body = resp
199                    .text()
200                    .await
201                    .unwrap_or_else(|e| format!("<body read failed: {e}>"));
202                return Err(CloudHomeError::Storage(format!(
203                    "create {key} (HTTP {status}): {body}"
204                )));
205            }
206        }
207
208        Ok(())
209    }
210
211    async fn read(&self, key: &str) -> Result<Vec<u8>, CloudHomeError> {
212        let encoded = Self::encode_key(key);
213        let file_id = self
214            .find_file_id(&encoded)
215            .await?
216            .ok_or_else(|| CloudHomeError::NotFound(key.to_string()))?;
217
218        let resp = self
219            .api_call(|token| {
220                self.client
221                    .get(format!("{}/files/{}", DRIVE_API, file_id))
222                    .bearer_auth(token)
223                    .query(&[("alt", "media")])
224            })
225            .await?;
226
227        let status = resp.status();
228        if status == reqwest::StatusCode::NOT_FOUND {
229            return Err(CloudHomeError::NotFound(key.to_string()));
230        }
231        if !status.is_success() {
232            let body = resp
233                .text()
234                .await
235                .unwrap_or_else(|e| format!("<body read failed: {e}>"));
236            return Err(CloudHomeError::Storage(format!(
237                "read {key} (HTTP {status}): {body}"
238            )));
239        }
240
241        let bytes = resp
242            .bytes()
243            .await
244            .map_err(|e| CloudHomeError::Storage(format!("read body for {key}: {e}")))?;
245
246        Ok(bytes.to_vec())
247    }
248
249    async fn read_range(&self, key: &str, start: u64, end: u64) -> Result<Vec<u8>, CloudHomeError> {
250        let encoded = Self::encode_key(key);
251        let file_id = self
252            .find_file_id(&encoded)
253            .await?
254            .ok_or_else(|| CloudHomeError::NotFound(key.to_string()))?;
255
256        let range = format!("bytes={}-{}", start, end.saturating_sub(1));
257
258        let resp = self
259            .api_call(|token| {
260                self.client
261                    .get(format!("{}/files/{}", DRIVE_API, file_id))
262                    .bearer_auth(token)
263                    .query(&[("alt", "media")])
264                    .header("Range", &range)
265            })
266            .await?;
267
268        let status = resp.status();
269        if status == reqwest::StatusCode::NOT_FOUND {
270            return Err(CloudHomeError::NotFound(key.to_string()));
271        }
272        if !status.is_success() && status != reqwest::StatusCode::PARTIAL_CONTENT {
273            let body = resp
274                .text()
275                .await
276                .unwrap_or_else(|e| format!("<body read failed: {e}>"));
277            return Err(CloudHomeError::Storage(format!(
278                "read range {key} (HTTP {status}): {body}"
279            )));
280        }
281
282        let bytes = resp
283            .bytes()
284            .await
285            .map_err(|e| CloudHomeError::Storage(format!("read range body for {key}: {e}")))?;
286
287        Ok(bytes.to_vec())
288    }
289
290    async fn list(&self, prefix: &str) -> Result<Vec<String>, CloudHomeError> {
291        let encoded_prefix = Self::encode_prefix(prefix);
292
293        // Google Drive query: files in our folder whose name starts with the encoded prefix
294        let query = format!(
295            "'{}' in parents and name contains '{}' and trashed = false",
296            self.folder_id, encoded_prefix
297        );
298
299        let mut all_keys = Vec::new();
300        let mut page_token: Option<String> = None;
301
302        loop {
303            let query_ref = query.clone();
304            let page_ref = page_token.clone();
305
306            let resp = self
307                .api_call(|token| {
308                    let mut req = self
309                        .client
310                        .get(format!("{}/files", DRIVE_API))
311                        .bearer_auth(token)
312                        .query(&[
313                            ("q", query_ref.as_str()),
314                            ("fields", "nextPageToken,files(name)"),
315                            ("pageSize", "1000"),
316                        ]);
317                    if let Some(ref pt) = page_ref {
318                        req = req.query(&[("pageToken", pt.as_str())]);
319                    }
320                    req
321                })
322                .await?;
323
324            let status = resp.status();
325            let body = resp
326                .text()
327                .await
328                .map_err(|e| CloudHomeError::Storage(format!("read body: {e}")))?;
329
330            if !status.is_success() {
331                return Err(CloudHomeError::Storage(format!(
332                    "list {prefix} (HTTP {status}): {body}"
333                )));
334            }
335
336            let json: serde_json::Value = serde_json::from_str(&body)
337                .map_err(|e| CloudHomeError::Storage(format!("parse list: {e}")))?;
338
339            if let Some(files) = json["files"].as_array() {
340                for file in files {
341                    if let Some(name) = file["name"].as_str() {
342                        let decoded = Self::decode_key(name);
343                        // The `contains` query may match mid-string, so filter to actual prefix
344                        if decoded.starts_with(prefix) {
345                            all_keys.push(decoded);
346                        }
347                    }
348                }
349            }
350
351            if let Some(next) = json["nextPageToken"].as_str() {
352                page_token = Some(next.to_string());
353            } else {
354                break;
355            }
356        }
357
358        Ok(all_keys)
359    }
360
361    async fn delete(&self, key: &str) -> Result<(), CloudHomeError> {
362        let encoded = Self::encode_key(key);
363
364        if let Some(file_id) = self.find_file_id(&encoded).await? {
365            let resp = self
366                .api_call(|token| {
367                    self.client
368                        .delete(format!("{}/files/{}", DRIVE_API, file_id))
369                        .bearer_auth(token)
370                })
371                .await?;
372
373            let status = resp.status();
374            // 204 No Content is success, 404 is OK (already deleted)
375            if !status.is_success() && status != reqwest::StatusCode::NOT_FOUND {
376                let body = resp
377                    .text()
378                    .await
379                    .unwrap_or_else(|e| format!("<body read failed: {e}>"));
380                return Err(CloudHomeError::Storage(format!(
381                    "delete {key} (HTTP {status}): {body}"
382                )));
383            }
384        }
385
386        Ok(())
387    }
388
389    async fn exists(&self, key: &str) -> Result<bool, CloudHomeError> {
390        let encoded = Self::encode_key(key);
391        Ok(self.find_file_id(&encoded).await?.is_some())
392    }
393
394    async fn grant_access(&self, member_id: &str) -> Result<CloudHomeJoinInfo, CloudHomeError> {
395        // Share the folder with the member's Google account
396        let permission = serde_json::json!({
397            "type": "user",
398            "role": "writer",
399            "emailAddress": member_id,
400        });
401
402        let resp = self
403            .api_call(|token| {
404                self.client
405                    .post(format!(
406                        "{}/files/{}/permissions",
407                        DRIVE_API, self.folder_id
408                    ))
409                    .bearer_auth(token)
410                    .json(&permission)
411            })
412            .await?;
413
414        let status = resp.status();
415        if !status.is_success() {
416            let body = resp
417                .text()
418                .await
419                .unwrap_or_else(|e| format!("<body read failed: {e}>"));
420            return Err(CloudHomeError::Storage(format!(
421                "grant access to {member_id} (HTTP {status}): {body}"
422            )));
423        }
424
425        Ok(CloudHomeJoinInfo::GoogleDrive {
426            folder_id: self.folder_id.clone(),
427        })
428    }
429
430    async fn revoke_access(&self, member_id: &str) -> Result<(), CloudHomeError> {
431        // First, find the permission ID for this member
432        let resp = self
433            .api_call(|token| {
434                self.client
435                    .get(format!(
436                        "{}/files/{}/permissions",
437                        DRIVE_API, self.folder_id
438                    ))
439                    .bearer_auth(token)
440                    .query(&[("fields", "permissions(id,emailAddress)")])
441            })
442            .await?;
443
444        let status = resp.status();
445        let body = resp
446            .text()
447            .await
448            .map_err(|e| CloudHomeError::Storage(format!("read body: {e}")))?;
449
450        if !status.is_success() {
451            return Err(CloudHomeError::Storage(format!(
452                "list permissions (HTTP {status}): {body}"
453            )));
454        }
455
456        let json: serde_json::Value = serde_json::from_str(&body)
457            .map_err(|e| CloudHomeError::Storage(format!("parse permissions: {e}")))?;
458
459        let permission_id = json["permissions"]
460            .as_array()
461            .and_then(|perms| {
462                perms.iter().find_map(|p| {
463                    if p["emailAddress"].as_str() == Some(member_id) {
464                        p["id"].as_str().map(|s| s.to_string())
465                    } else {
466                        None
467                    }
468                })
469            })
470            .ok_or_else(|| {
471                CloudHomeError::Storage(format!("no permission found for {member_id}"))
472            })?;
473
474        // Delete the permission
475        let resp = self
476            .api_call(|token| {
477                self.client
478                    .delete(format!(
479                        "{}/files/{}/permissions/{}",
480                        DRIVE_API, self.folder_id, permission_id
481                    ))
482                    .bearer_auth(token)
483            })
484            .await?;
485
486        let status = resp.status();
487        if !status.is_success() && status != reqwest::StatusCode::NOT_FOUND {
488            let body = resp
489                .text()
490                .await
491                .unwrap_or_else(|e| format!("<body read failed: {e}>"));
492            return Err(CloudHomeError::Storage(format!(
493                "revoke access for {member_id} (HTTP {status}): {body}"
494            )));
495        }
496
497        Ok(())
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504
505    #[test]
506    fn encode_key_replaces_slashes() {
507        assert_eq!(
508            GoogleDriveCloudHome::encode_key("changes/dev1/42.enc"),
509            "changes__dev1__42.enc"
510        );
511    }
512
513    #[test]
514    fn decode_key_restores_slashes() {
515        assert_eq!(
516            GoogleDriveCloudHome::decode_key("changes__dev1__42.enc"),
517            "changes/dev1/42.enc"
518        );
519    }
520
521    #[test]
522    fn encode_decode_roundtrip() {
523        let keys = [
524            "snapshot.db.enc",
525            "changes/device-abc/1.enc",
526            "heads/device-abc.json.enc",
527            "images/cover.jpg",
528        ];
529        for key in keys {
530            let encoded = GoogleDriveCloudHome::encode_key(key);
531            let decoded = GoogleDriveCloudHome::decode_key(&encoded);
532            assert_eq!(decoded, key);
533        }
534    }
535
536    #[test]
537    fn encode_prefix_for_query() {
538        assert_eq!(
539            GoogleDriveCloudHome::encode_prefix("changes/dev1/"),
540            "changes__dev1__"
541        );
542    }
543}