1use async_trait::async_trait;
7use aws_config::{BehaviorVersion, Region};
8use aws_credential_types::Credentials;
9use aws_sdk_s3::Client;
10
11use super::{CloudHome, CloudHomeError, CloudHomeJoinInfo};
12
13pub struct S3CloudHome {
15 client: Client,
16 bucket: String,
17 region: String,
18 endpoint: Option<String>,
19 access_key: String,
20 secret_key: String,
21 key_prefix: Option<String>,
22}
23
24impl S3CloudHome {
25 pub async fn new(
26 bucket: String,
27 region: String,
28 endpoint: Option<String>,
29 access_key: String,
30 secret_key: String,
31 key_prefix: Option<String>,
32 ) -> Result<Self, CloudHomeError> {
33 let credentials = Credentials::new(&access_key, &secret_key, None, None, "bae-cloud-home");
34
35 let http_client = aws_smithy_http_client::Builder::new()
38 .tls_provider(aws_smithy_http_client::tls::Provider::Rustls(
39 aws_smithy_http_client::tls::rustls_provider::CryptoMode::Ring,
40 ))
41 .build_https();
42
43 let mut builder = aws_config::defaults(BehaviorVersion::latest())
44 .region(Region::new(region.clone()))
45 .credentials_provider(credentials)
46 .http_client(http_client);
47
48 if let Some(ref ep) = endpoint {
49 builder = builder.endpoint_url(ep.trim_end_matches('/'));
50 }
51
52 let aws_config = builder.load().await;
53 let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
54 .force_path_style(true)
55 .build();
56 let client = Client::from_conf(s3_config);
57
58 Ok(S3CloudHome {
59 client,
60 bucket,
61 region,
62 endpoint,
63 access_key,
64 secret_key,
65 key_prefix,
66 })
67 }
68
69 fn full_key(&self, key: &str) -> String {
71 apply_prefix(self.key_prefix.as_deref(), key)
72 }
73}
74
75fn apply_prefix(prefix: Option<&str>, key: &str) -> String {
77 match prefix {
78 Some(p) => format!("{}/{}", p.trim_end_matches('/'), key),
79 None => key.to_string(),
80 }
81}
82
83#[async_trait]
84impl CloudHome for S3CloudHome {
85 async fn probe(&self) -> Result<(), CloudHomeError> {
87 use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
88 use aws_sdk_s3::operation::head_bucket::HeadBucketError;
89
90 match self.client.head_bucket().bucket(&self.bucket).send().await {
91 Ok(_) => Ok(()),
92 Err(SdkError::ServiceError(svc)) => {
93 let status = svc.raw().status().as_u16();
94 let code: Option<String> = svc.err().code().map(str::to_string);
95 let bucket = self.bucket.clone();
96 match svc.into_err() {
97 HeadBucketError::NotFound(_) => Err(CloudHomeError::Storage(format!(
98 "bucket {bucket:?} does not exist"
99 ))),
100 other => {
101 let is_auth = status == 403
102 || matches!(
103 code.as_deref(),
104 Some("SignatureDoesNotMatch") | Some("InvalidAccessKeyId")
105 );
106 if is_auth {
107 Err(CloudHomeError::Storage(format!(
108 "S3 credentials rejected (status {status}, code {code:?})"
109 )))
110 } else {
111 Err(CloudHomeError::Storage(format!(
112 "S3 probe failed (status {status}, code {code:?}): {other}"
113 )))
114 }
115 }
116 }
117 }
118 Err(e) => Err(CloudHomeError::Storage(format!("S3 probe failed: {e}"))),
119 }
120 }
121
122 async fn write(&self, key: &str, data: Vec<u8>) -> Result<(), CloudHomeError> {
123 let full = self.full_key(key);
124 self.client
125 .put_object()
126 .bucket(&self.bucket)
127 .key(&full)
128 .body(data.into())
129 .send()
130 .await
131 .map_err(|e| CloudHomeError::Storage(format!("put {key}: {e}")))?;
132 Ok(())
133 }
134
135 async fn read(&self, key: &str) -> Result<Vec<u8>, CloudHomeError> {
136 let full = self.full_key(key);
137 let resp = self
138 .client
139 .get_object()
140 .bucket(&self.bucket)
141 .key(&full)
142 .send()
143 .await
144 .map_err(|e| {
145 let msg = format!("{e}");
146 if msg.contains("NoSuchKey") || msg.contains("not found") || msg.contains("404") {
147 CloudHomeError::NotFound(key.to_string())
148 } else {
149 CloudHomeError::Storage(format!("get {key}: {e}"))
150 }
151 })?;
152
153 let bytes = resp
154 .body
155 .collect()
156 .await
157 .map_err(|e| CloudHomeError::Storage(format!("read body for {key}: {e}")))?
158 .into_bytes()
159 .to_vec();
160
161 Ok(bytes)
162 }
163
164 async fn read_range(&self, key: &str, start: u64, end: u64) -> Result<Vec<u8>, CloudHomeError> {
165 let full = self.full_key(key);
166 let range = format!("bytes={start}-{}", end.saturating_sub(1));
167 let resp = self
168 .client
169 .get_object()
170 .bucket(&self.bucket)
171 .key(&full)
172 .range(range)
173 .send()
174 .await
175 .map_err(|e| {
176 let msg = format!("{e}");
177 if msg.contains("NoSuchKey") || msg.contains("not found") || msg.contains("404") {
178 CloudHomeError::NotFound(key.to_string())
179 } else {
180 CloudHomeError::Storage(format!("get range {key}: {e}"))
181 }
182 })?;
183
184 let bytes = resp
185 .body
186 .collect()
187 .await
188 .map_err(|e| CloudHomeError::Storage(format!("read range body for {key}: {e}")))?
189 .into_bytes()
190 .to_vec();
191
192 Ok(bytes)
193 }
194
195 async fn list(&self, prefix: &str) -> Result<Vec<String>, CloudHomeError> {
196 let full_prefix = self.full_key(prefix);
197 let strip_prefix = self
198 .key_prefix
199 .as_ref()
200 .map(|p| format!("{}/", p.trim_end_matches('/')));
201
202 let mut keys = Vec::new();
203 let mut continuation_token: Option<String> = None;
204
205 loop {
206 let mut req = self
207 .client
208 .list_objects_v2()
209 .bucket(&self.bucket)
210 .prefix(&full_prefix);
211
212 if let Some(token) = continuation_token.take() {
213 req = req.continuation_token(token);
214 }
215
216 let resp = req
217 .send()
218 .await
219 .map_err(|e| CloudHomeError::Storage(format!("list {prefix}: {e}")))?;
220
221 for obj in resp.contents() {
222 if let Some(key) = obj.key() {
223 let stripped = match &strip_prefix {
224 Some(p) => key.strip_prefix(p.as_str()).unwrap_or(key),
225 None => key,
226 };
227 keys.push(stripped.to_string());
228 }
229 }
230
231 if resp.is_truncated() == Some(true) {
232 continuation_token = resp.next_continuation_token().map(|s| s.to_string());
233 } else {
234 break;
235 }
236 }
237
238 Ok(keys)
239 }
240
241 async fn delete(&self, key: &str) -> Result<(), CloudHomeError> {
242 let full = self.full_key(key);
243 self.client
244 .delete_object()
245 .bucket(&self.bucket)
246 .key(&full)
247 .send()
248 .await
249 .map_err(|e| CloudHomeError::Storage(format!("delete {key}: {e}")))?;
250 Ok(())
251 }
252
253 async fn exists(&self, key: &str) -> Result<bool, CloudHomeError> {
254 let full = self.full_key(key);
255 match self
256 .client
257 .head_object()
258 .bucket(&self.bucket)
259 .key(&full)
260 .send()
261 .await
262 {
263 Ok(_) => Ok(true),
264 Err(e) => {
265 let msg = format!("{e}");
266 if msg.contains("NotFound")
267 || msg.contains("not found")
268 || msg.contains("404")
269 || msg.contains("NoSuchKey")
270 {
271 Ok(false)
272 } else {
273 Err(CloudHomeError::Storage(format!("head {key}: {e}")))
274 }
275 }
276 }
277 }
278
279 async fn grant_access(&self, _member_id: &str) -> Result<CloudHomeJoinInfo, CloudHomeError> {
280 Ok(CloudHomeJoinInfo::S3 {
283 bucket: self.bucket.clone(),
284 region: self.region.clone(),
285 endpoint: self.endpoint.clone(),
286 access_key: self.access_key.clone(),
287 secret_key: self.secret_key.clone(),
288 key_prefix: self.key_prefix.clone(),
289 })
290 }
291
292 async fn revoke_access(&self, _member_id: &str) -> Result<(), CloudHomeError> {
293 Ok(())
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[test]
303 fn full_key_prepends_prefix() {
304 let key = apply_prefix(Some("libs/abc"), "heads/dev1.json");
305 assert_eq!(key, "libs/abc/heads/dev1.json");
306 }
307
308 #[test]
309 fn full_key_no_prefix() {
310 let key = apply_prefix(None, "heads/dev1.json");
311 assert_eq!(key, "heads/dev1.json");
312 }
313
314 #[test]
315 fn full_key_strips_trailing_slash() {
316 let key = apply_prefix(Some("libs/abc/"), "heads/dev1.json");
317 assert_eq!(key, "libs/abc/heads/dev1.json");
318 }
319
320 fn test_env(name: &str, default: &str) -> String {
332 match std::env::var(name) {
333 Ok(s) => s,
334 Err(std::env::VarError::NotPresent) => default.to_string(),
335 Err(std::env::VarError::NotUnicode(raw)) => {
336 panic!("test env var {name} is non-utf8: {raw:?}");
337 }
338 }
339 }
340
341 struct TestCreds {
342 endpoint: String,
343 access_key: String,
344 secret_key: String,
345 }
346
347 fn test_creds() -> TestCreds {
348 TestCreds {
349 endpoint: test_env("BAE_TEST_S3_URL", "http://localhost:19000"),
350 access_key: test_env("BAE_TEST_S3_KEY", "baetest"),
351 secret_key: test_env("BAE_TEST_S3_SECRET", "baetestpass"),
352 }
353 }
354
355 async fn provision_test_bucket(home: &S3CloudHome) {
357 home.client
358 .create_bucket()
359 .bucket(&home.bucket)
360 .send()
361 .await
362 .expect("create test bucket");
363 }
364
365 #[tokio::test]
366 #[ignore]
367 async fn probe_succeeds_against_existing_bucket() {
368 let creds = test_creds();
369 let bucket = format!("bae-probe-ok-{}", uuid::Uuid::new_v4());
370 let home = S3CloudHome::new(
371 bucket,
372 "us-east-1".to_string(),
373 Some(creds.endpoint),
374 creds.access_key,
375 creds.secret_key,
376 None,
377 )
378 .await
379 .expect("construct S3CloudHome");
380 provision_test_bucket(&home).await;
381 home.probe().await.expect("probe should succeed");
382 }
383
384 #[tokio::test]
385 #[ignore]
386 async fn probe_fails_for_missing_bucket() {
387 let creds = test_creds();
388 let bucket = format!("bae-probe-missing-{}", uuid::Uuid::new_v4());
389 let home = S3CloudHome::new(
390 bucket.clone(),
391 "us-east-1".to_string(),
392 Some(creds.endpoint),
393 creds.access_key,
394 creds.secret_key,
395 None,
396 )
397 .await
398 .expect("construct S3CloudHome");
399 let err = home
401 .probe()
402 .await
403 .expect_err("probe should fail for a missing bucket");
404 let msg = format!("{err}");
405 assert!(
406 msg.contains("does not exist") || msg.contains("NoSuchBucket") || msg.contains("404"),
407 "expected missing-bucket error, got: {msg}",
408 );
409 }
410
411 #[tokio::test]
412 #[ignore]
413 async fn probe_fails_for_bad_secret_key() {
414 let creds = test_creds();
415 let bucket = format!("bae-probe-badkey-{}", uuid::Uuid::new_v4());
416 let good = S3CloudHome::new(
418 bucket.clone(),
419 "us-east-1".to_string(),
420 Some(creds.endpoint.clone()),
421 creds.access_key.clone(),
422 creds.secret_key,
423 None,
424 )
425 .await
426 .expect("construct good S3CloudHome");
427 provision_test_bucket(&good).await;
428
429 let bad = S3CloudHome::new(
430 bucket,
431 "us-east-1".to_string(),
432 Some(creds.endpoint),
433 creds.access_key,
434 "wrong-secret".to_string(),
435 None,
436 )
437 .await
438 .expect("construct bad S3CloudHome");
439 let err = bad
440 .probe()
441 .await
442 .expect_err("probe should fail for bad credentials");
443 let msg = format!("{err}");
444 assert!(
445 msg.contains("rejected")
446 || msg.contains("403")
447 || msg.contains("SignatureDoesNotMatch"),
448 "expected credentials error, got: {msg}",
449 );
450 }
451}