LCOV - code coverage report
Current view: top level - object_storage/src - app.rs (source / functions) Coverage Total Hit
Test: 5e392a02abbad1ab595f4dba672e219a49f7f539.info Lines: 88.1 % 268 236
Test Date: 2025-04-11 22:43:24 Functions: 82.3 % 62 51

            Line data    Source code
       1              : use anyhow::anyhow;
       2              : use axum::body::{Body, Bytes};
       3              : use axum::response::{IntoResponse, Response};
       4              : use axum::{Router, http::StatusCode};
       5              : use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
       6              : use remote_storage::TimeoutOrCancel;
       7              : use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
       8              : use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
       9              : use tokio_util::sync::CancellationToken;
      10              : use tracing::{error, info};
      11              : use utils::backoff::retry;
      12              : 
      13           34 : pub fn app(state: Arc<Storage>) -> Router<()> {
      14              :     use axum::routing::{delete as _delete, get as _get};
      15           34 :     let delete_prefix = _delete(delete_prefix);
      16           34 :     Router::new()
      17           34 :         .route(
      18           34 :             "/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",
      19           34 :             _get(get).put(set).delete(delete),
      20           34 :         )
      21           34 :         .route(
      22           34 :             "/{tenant_id}/{timeline_id}/{endpoint_id}",
      23           34 :             delete_prefix.clone(),
      24           34 :         )
      25           34 :         .route("/{tenant_id}/{timeline_id}", delete_prefix.clone())
      26           34 :         .route("/{tenant_id}", delete_prefix)
      27           34 :         .route("/metrics", _get(metrics))
      28           34 :         .route("/status", _get(async || StatusCode::OK.into_response()))
      29           34 :         .with_state(state)
      30           34 : }
      31              : 
      32              : type Result = anyhow::Result<Response, Response>;
      33              : type State = axum::extract::State<Arc<Storage>>;
      34              : 
      35              : const CONTENT_TYPE: &str = "content-type";
      36              : const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
      37              : const WARN_THRESHOLD: u32 = 3;
      38              : const MAX_RETRIES: u32 = 10;
      39              : 
      40            1 : async fn metrics() -> Result {
      41            1 :     prometheus::TextEncoder::new()
      42            1 :         .encode_to_string(&prometheus::gather())
      43            1 :         .map(|s| s.into_response())
      44            1 :         .map_err(|e| internal_error(e, "/metrics", "collecting metrics"))
      45            1 : }
      46              : 
      47           20 : async fn get(S3Path { path }: S3Path, state: State) -> Result {
      48           20 :     info!(%path, "downloading");
      49           20 :     let download_err = |e| {
      50           14 :         if let DownloadError::NotFound = e {
      51           14 :             info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service
      52           14 :             return not_found(&path);
      53            0 :         }
      54            0 :         internal_error(e, &path, "downloading")
      55           14 :     };
      56           20 :     let cancel = state.cancel.clone();
      57           20 :     let opts = &DownloadOpts::default();
      58              : 
      59           20 :     let stream = retry(
      60           20 :         async || state.storage.download(&path, opts, &cancel).await,
      61           20 :         DownloadError::is_permanent,
      62           20 :         WARN_THRESHOLD,
      63           20 :         MAX_RETRIES,
      64           20 :         "downloading",
      65           20 :         &cancel,
      66           20 :     )
      67           20 :     .await
      68           20 :     .unwrap_or(Err(DownloadError::Cancelled))
      69           20 :     .map_err(download_err)?
      70              :     .download_stream;
      71              : 
      72            6 :     Response::builder()
      73            6 :         .status(StatusCode::OK)
      74            6 :         .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
      75            6 :         .body(Body::from_stream(stream))
      76            6 :         .map_err(|e| internal_error(e, path, "reading response"))
      77           20 : }
      78              : 
      79              : // Best solution for files is multipart upload, but remote_storage doesn't support it,
      80              : // so we can either read Bytes in memory and push at once or forward BodyDataStream to
      81              : // remote_storage. The latter may seem more peformant, but BodyDataStream doesn't have a
      82              : // guaranteed size() which may produce issues while uploading to s3.
      83              : // So, currently we're going with an in-memory copy plus a boundary to prevent uploading
      84              : // very large files.
      85           13 : async fn set(S3Path { path }: S3Path, state: State, bytes: Bytes) -> Result {
      86           13 :     info!(%path, "uploading");
      87           13 :     let request_len = bytes.len();
      88           13 :     let max_len = state.max_upload_file_limit;
      89           13 :     if request_len > max_len {
      90            0 :         return Err(bad_request(
      91            0 :             anyhow!("File size {request_len} exceeds max {max_len}"),
      92            0 :             "uploading",
      93            0 :         ));
      94           13 :     }
      95           13 : 
      96           13 :     let cancel = state.cancel.clone();
      97           13 :     let fun = async || {
      98            0 :         let stream = bytes_to_stream(bytes.clone());
      99            0 :         state
     100            0 :             .storage
     101            0 :             .upload(stream, request_len, &path, None, &cancel)
     102            0 :             .await
     103           13 :     };
     104           13 :     retry(
     105           13 :         fun,
     106           13 :         TimeoutOrCancel::caused_by_cancel,
     107           13 :         WARN_THRESHOLD,
     108           13 :         MAX_RETRIES,
     109           13 :         "uploading",
     110           13 :         &cancel,
     111           13 :     )
     112           13 :     .await
     113           13 :     .unwrap_or(Err(anyhow!("uploading cancelled")))
     114           13 :     .map_err(|e| internal_error(e, path, "reading response"))?;
     115           13 :     Ok(ok())
     116           13 : }
     117              : 
     118            2 : async fn delete(S3Path { path }: S3Path, state: State) -> Result {
     119            2 :     info!(%path, "deleting");
     120            2 :     let cancel = state.cancel.clone();
     121            2 :     retry(
     122            2 :         async || state.storage.delete(&path, &cancel).await,
     123            2 :         TimeoutOrCancel::caused_by_cancel,
     124            2 :         WARN_THRESHOLD,
     125            2 :         MAX_RETRIES,
     126            2 :         "deleting",
     127            2 :         &cancel,
     128            2 :     )
     129            2 :     .await
     130            2 :     .unwrap_or(Err(anyhow!("deleting cancelled")))
     131            2 :     .map_err(|e| internal_error(e, path, "deleting"))?;
     132            2 :     Ok(ok())
     133            2 : }
     134              : 
     135            6 : async fn delete_prefix(PrefixS3Path { path }: PrefixS3Path, state: State) -> Result {
     136            6 :     info!(%path, "deleting prefix");
     137            6 :     let cancel = state.cancel.clone();
     138            6 :     retry(
     139            6 :         async || state.storage.delete_prefix(&path, &cancel).await,
     140            6 :         TimeoutOrCancel::caused_by_cancel,
     141            6 :         WARN_THRESHOLD,
     142            6 :         MAX_RETRIES,
     143            6 :         "deleting prefix",
     144            6 :         &cancel,
     145            6 :     )
     146            6 :     .await
     147            6 :     .unwrap_or(Err(anyhow!("deleting prefix cancelled")))
     148            6 :     .map_err(|e| internal_error(e, path, "deleting prefix"))?;
     149            6 :     Ok(ok())
     150            6 : }
     151              : 
     152           34 : pub async fn check_storage_permissions(
     153           34 :     client: &GenericRemoteStorage,
     154           34 :     cancel: CancellationToken,
     155           34 : ) -> anyhow::Result<()> {
     156           34 :     info!("storage permissions check");
     157              : 
     158              :     // as_nanos() as multiple instances proxying same bucket may be started at once
     159           34 :     let now = SystemTime::now()
     160           34 :         .duration_since(UNIX_EPOCH)?
     161           34 :         .as_nanos()
     162           34 :         .to_string();
     163              : 
     164           34 :     let path = RemotePath::from_string(&format!("write_access_{now}"))?;
     165           34 :     info!(%path, "uploading");
     166              : 
     167           34 :     let body = now.to_string();
     168           34 :     let stream = bytes_to_stream(Bytes::from(body.clone()));
     169           34 :     client
     170           34 :         .upload(stream, body.len(), &path, None, &cancel)
     171           34 :         .await?;
     172              : 
     173              :     use tokio::io::AsyncReadExt;
     174           34 :     info!(%path, "downloading");
     175           34 :     let download_opts = DownloadOpts {
     176           34 :         kind: remote_storage::DownloadKind::Small,
     177           34 :         ..Default::default()
     178           34 :     };
     179           34 :     let mut body_read_buf = Vec::new();
     180           34 :     let stream = client
     181           34 :         .download(&path, &download_opts, &cancel)
     182           34 :         .await?
     183              :         .download_stream;
     184           34 :     tokio_util::io::StreamReader::new(stream)
     185           34 :         .read_to_end(&mut body_read_buf)
     186           34 :         .await?;
     187           34 :     let body_read = String::from_utf8(body_read_buf)?;
     188           34 :     if body != body_read {
     189            0 :         error!(%body, %body_read, "File contents do not match");
     190            0 :         anyhow::bail!("Read back file doesn't match original")
     191           34 :     }
     192           34 : 
     193           34 :     info!(%path, "removing");
     194           34 :     client.delete(&path, &cancel).await
     195           34 : }
     196              : 
     197           47 : fn bytes_to_stream(bytes: Bytes) -> impl futures::Stream<Item = std::io::Result<Bytes>> {
     198           47 :     futures::stream::once(futures::future::ready(Ok(bytes)))
     199           47 : }
     200              : 
     201              : #[cfg(test)]
     202              : mod tests {
     203              :     use super::*;
     204              :     use axum::{body::Body, extract::Request, response::Response};
     205              :     use http_body_util::BodyExt;
     206              :     use itertools::iproduct;
     207              :     use std::env::var;
     208              :     use std::sync::Arc;
     209              :     use std::time::Duration;
     210              :     use test_log::test as testlog;
     211              :     use tower::{Service, util::ServiceExt};
     212              :     use utils::id::{TenantId, TimelineId};
     213              : 
     214              :     // see libs/remote_storage/tests/test_real_s3.rs
     215              :     const REAL_S3_ENV: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
     216              :     const REAL_S3_BUCKET: &str = "REMOTE_STORAGE_S3_BUCKET";
     217              :     const REAL_S3_REGION: &str = "REMOTE_STORAGE_S3_REGION";
     218              : 
     219           34 :     async fn proxy() -> (Storage, Option<camino_tempfile::Utf8TempDir>) {
     220           34 :         let cancel = CancellationToken::new();
     221           34 :         let (dir, storage) = if var(REAL_S3_ENV).is_err() {
     222              :             // tests execute in parallel and we need a new directory for each of them
     223           34 :             let dir = camino_tempfile::tempdir().unwrap();
     224           34 :             let fs =
     225           34 :                 remote_storage::LocalFs::new(dir.path().into(), Duration::from_secs(5)).unwrap();
     226           34 :             (Some(dir), GenericRemoteStorage::LocalFs(fs))
     227              :         } else {
     228              :             // test_real_s3::create_s3_client is hard to reference, reimplementing here
     229            0 :             let millis = SystemTime::now()
     230            0 :                 .duration_since(UNIX_EPOCH)
     231            0 :                 .unwrap()
     232            0 :                 .as_millis();
     233              :             use rand::Rng;
     234            0 :             let random = rand::thread_rng().r#gen::<u32>();
     235            0 : 
     236            0 :             let s3_config = remote_storage::S3Config {
     237            0 :                 bucket_name: var(REAL_S3_BUCKET).unwrap(),
     238            0 :                 bucket_region: var(REAL_S3_REGION).unwrap(),
     239            0 :                 prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
     240            0 :                 endpoint: None,
     241            0 :                 concurrency_limit: std::num::NonZeroUsize::new(100).unwrap(),
     242            0 :                 max_keys_per_list_response: None,
     243            0 :                 upload_storage_class: None,
     244            0 :             };
     245            0 :             let bucket = remote_storage::S3Bucket::new(&s3_config, Duration::from_secs(1))
     246            0 :                 .await
     247            0 :                 .unwrap();
     248            0 :             (None, GenericRemoteStorage::AwsS3(Arc::new(bucket)))
     249              :         };
     250              : 
     251           34 :         let proxy = Storage {
     252           34 :             auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
     253           34 :             storage,
     254           34 :             cancel: cancel.clone(),
     255           34 :             max_upload_file_limit: usize::MAX,
     256           34 :         };
     257           34 :         check_storage_permissions(&proxy.storage, cancel)
     258           34 :             .await
     259           34 :             .unwrap();
     260           34 :         (proxy, dir)
     261           34 :     }
     262              : 
     263              :     // see libs/utils/src/auth.rs
     264              :     const TEST_PUB_KEY_ED25519: &[u8] = b"
     265              : -----BEGIN PUBLIC KEY-----
     266              : MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w=
     267              : -----END PUBLIC KEY-----
     268              : ";
     269              : 
     270              :     const TEST_PRIV_KEY_ED25519: &[u8] = br#"
     271              : -----BEGIN PRIVATE KEY-----
     272              : MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
     273              : -----END PRIVATE KEY-----
     274              : "#;
     275              : 
     276           30 :     async fn request(req: Request<Body>) -> Response<Body> {
     277           30 :         let (proxy, _) = proxy().await;
     278           30 :         app(Arc::new(proxy))
     279           30 :             .into_service()
     280           30 :             .oneshot(req)
     281           30 :             .await
     282           30 :             .unwrap()
     283           30 :     }
     284              : 
     285            2 :     #[testlog(tokio::test)]
     286              :     async fn status() {
     287              :         let res = Request::builder()
     288              :             .uri("/status")
     289              :             .body(Body::empty())
     290              :             .map(request)
     291              :             .unwrap()
     292              :             .await;
     293              :         assert_eq!(res.status(), StatusCode::OK);
     294              :     }
     295              : 
     296            3 :     fn routes() -> impl Iterator<Item = (&'static str, &'static str)> {
     297            3 :         iproduct!(
     298            3 :             vec!["/1", "/1/2", "/1/2/3", "/1/2/3/4"],
     299            3 :             vec!["GET", "PUT", "DELETE"]
     300            3 :         )
     301            3 :     }
     302              : 
     303            2 :     #[testlog(tokio::test)]
     304              :     async fn no_token() {
     305              :         for (uri, method) in routes() {
     306              :             info!(%uri, %method);
     307              :             let res = Request::builder()
     308              :                 .uri(uri)
     309              :                 .method(method)
     310              :                 .body(Body::empty())
     311              :                 .map(request)
     312              :                 .unwrap()
     313              :                 .await;
     314              :             assert!(matches!(
     315              :                 res.status(),
     316              :                 StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
     317              :             ));
     318              :         }
     319              :     }
     320              : 
     321            2 :     #[testlog(tokio::test)]
     322              :     async fn invalid_token() {
     323              :         for (uri, method) in routes() {
     324              :             info!(%uri, %method);
     325              :             let status = Request::builder()
     326              :                 .uri(uri)
     327              :                 .header("Authorization", "Bearer 123")
     328              :                 .method(method)
     329              :                 .body(Body::empty())
     330              :                 .map(request)
     331              :                 .unwrap()
     332              :                 .await;
     333              :             assert!(matches!(
     334              :                 status.status(),
     335              :                 StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
     336              :             ));
     337              :         }
     338              :     }
     339              : 
     340              :     const TENANT_ID: TenantId =
     341              :         TenantId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
     342              :     const TIMELINE_ID: TimelineId =
     343              :         TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
     344              :     const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
     345            9 :     fn token() -> String {
     346            9 :         let claims = object_storage::Claims {
     347            9 :             tenant_id: TENANT_ID,
     348            9 :             timeline_id: TIMELINE_ID,
     349            9 :             endpoint_id: ENDPOINT_ID.into(),
     350            9 :             exp: u64::MAX,
     351            9 :         };
     352            9 :         let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
     353            9 :         let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
     354            9 :         jsonwebtoken::encode(&header, &claims, &key).unwrap()
     355            9 :     }
     356              : 
     357            2 :     #[testlog(tokio::test)]
     358              :     async fn unauthorized() {
     359              :         let (proxy, _) = proxy().await;
     360              :         let mut app = app(Arc::new(proxy)).into_service();
     361              :         let token = token();
     362              :         let args = itertools::iproduct!(
     363              :             vec![TENANT_ID.to_string(), TenantId::generate().to_string()],
     364              :             vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
     365              :             vec![ENDPOINT_ID, "ep-ololo"]
     366              :         )
     367              :         .skip(1);
     368              : 
     369              :         for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
     370              :             info!(%uri, %method, %tenant, %timeline, %endpoint);
     371              :             let request = Request::builder()
     372              :                 .uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
     373              :                 .method(method)
     374              :                 .header("Authorization", format!("Bearer {}", token))
     375              :                 .body(Body::empty())
     376              :                 .unwrap();
     377              :             let status = ServiceExt::ready(&mut app)
     378              :                 .await
     379              :                 .unwrap()
     380              :                 .call(request)
     381              :                 .await
     382              :                 .unwrap()
     383              :                 .status();
     384              :             assert_eq!(status, StatusCode::UNAUTHORIZED);
     385              :         }
     386              :     }
     387              : 
     388            2 :     #[testlog(tokio::test)]
     389              :     async fn method_not_allowed() {
     390              :         let token = token();
     391              :         let iter = iproduct!(vec!["", "/.."], vec!["GET", "PUT"]);
     392              :         for (key, method) in iter {
     393              :             let status = Request::builder()
     394              :                 .uri(format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}{key}"))
     395              :                 .method(method)
     396              :                 .header("Authorization", format!("Bearer {token}"))
     397              :                 .body(Body::empty())
     398              :                 .map(request)
     399              :                 .unwrap()
     400              :                 .await
     401              :                 .status();
     402              :             assert!(matches!(
     403              :                 status,
     404              :                 StatusCode::BAD_REQUEST | StatusCode::METHOD_NOT_ALLOWED
     405              :             ));
     406              :         }
     407              :     }
     408              : 
     409            3 :     async fn requests_chain(
     410            3 :         chain: impl Iterator<Item = (String, &str, &'static str, StatusCode, bool)>,
     411            3 :         token: impl Fn(&str) -> String,
     412            3 :     ) {
     413            3 :         let (proxy, _) = proxy().await;
     414            3 :         let mut app = app(Arc::new(proxy)).into_service();
     415           44 :         for (uri, method, body, expected_status, compare_body) in chain {
     416           41 :             info!(%uri, %method, %body, %expected_status);
     417           41 :             let bearer = format!("Bearer {}", token(&uri));
     418           41 :             let request = Request::builder()
     419           41 :                 .uri(uri)
     420           41 :                 .method(method)
     421           41 :                 .header("Authorization", &bearer)
     422           41 :                 .body(Body::from(body))
     423           41 :                 .unwrap();
     424           41 :             let response = ServiceExt::ready(&mut app)
     425           41 :                 .await
     426           41 :                 .unwrap()
     427           41 :                 .call(request)
     428           41 :                 .await
     429           41 :                 .unwrap();
     430           41 :             assert_eq!(response.status(), expected_status);
     431           41 :             if !compare_body {
     432           40 :                 continue;
     433            1 :             }
     434            1 :             let read_body = response.into_body().collect().await.unwrap().to_bytes();
     435            1 :             assert_eq!(body, read_body);
     436              :         }
     437            3 :     }
     438              : 
     439            2 :     #[testlog(tokio::test)]
     440              :     async fn metrics() {
     441              :         let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
     442              :         let req = vec![
     443              :             (uri.clone(), "PUT", "body", StatusCode::OK, false),
     444              :             (uri.clone(), "DELETE", "", StatusCode::OK, false),
     445              :         ];
     446            2 :         requests_chain(req.into_iter(), |_| token()).await;
     447              : 
     448              :         let res = Request::builder()
     449              :             .uri("/metrics")
     450              :             .body(Body::empty())
     451              :             .map(request)
     452              :             .unwrap()
     453              :             .await;
     454              :         assert_eq!(res.status(), StatusCode::OK);
     455              :         let body = res.into_body().collect().await.unwrap().to_bytes();
     456              :         let body = String::from_utf8_lossy(&body);
     457              :         tracing::debug!(%body);
     458              :         // Storage metrics are not gathered for LocalFs
     459              :         if var(REAL_S3_ENV).is_ok() {
     460              :             assert!(body.contains("remote_storage_s3_deleted_objects_total"));
     461              :         }
     462              :         assert!(body.contains("process_threads"));
     463              :     }
     464              : 
     465            2 :     #[testlog(tokio::test)]
     466              :     async fn insert_retrieve_remove() {
     467              :         let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
     468              :         let chain = vec![
     469              :             (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
     470              :             (uri.clone(), "PUT", "пыщьпыщь", StatusCode::OK, false),
     471              :             (uri.clone(), "GET", "пыщьпыщь", StatusCode::OK, true),
     472              :             (uri.clone(), "DELETE", "", StatusCode::OK, false),
     473              :             (uri, "GET", "", StatusCode::NOT_FOUND, false),
     474              :         ];
     475            5 :         requests_chain(chain.into_iter(), |_| token()).await;
     476              :     }
     477              : 
     478           34 :     fn delete_prefix_token(uri: &str) -> String {
     479              :         use serde::Serialize;
     480           34 :         let parts = uri.split("/").collect::<Vec<&str>>();
     481              :         #[derive(Serialize)]
     482              :         struct PrefixClaims {
     483              :             tenant_id: TenantId,
     484              :             timeline_id: Option<TimelineId>,
     485              :             endpoint_id: Option<object_storage::EndpointId>,
     486              :             exp: u64,
     487              :         }
     488           34 :         let claims = PrefixClaims {
     489           34 :             tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
     490           34 :             timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
     491           34 :             endpoint_id: parts.get(3).map(ToString::to_string),
     492           34 :             exp: u64::MAX,
     493           34 :         };
     494           34 :         let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
     495           34 :         let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
     496           34 :         jsonwebtoken::encode(&header, &claims, &key).unwrap()
     497           34 :     }
     498              : 
     499              :     // Can't use single digit numbers as they won't be validated as TimelineId and EndpointId
     500            2 :     #[testlog(tokio::test)]
     501              :     async fn delete_prefix() {
     502              :         let tenant_id =
     503              :             TenantId::from_array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to_string();
     504              :         let t2 = TimelineId::from_array([2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
     505              :         let t3 = TimelineId::from_array([3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
     506              :         let t4 = TimelineId::from_array([4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
     507           33 :         let f = |timeline, path| format!("/{tenant_id}/{timeline}{path}");
     508              :         // Why extra slash in string literals? Axum is weird with URIs:
     509              :         // /1/2 and 1/2/ match different routes, thus first yields OK and second NOT_FOUND
     510              :         //  as it matches /tenant/timeline/endpoint, see https://stackoverflow.com/a/75355932
     511              :         // The cost of removing trailing slash is suprisingly hard:
     512              :         // * Add tower dependency with NormalizePath layer
     513              :         // * wrap Router<()> in this layer https://github.com/tokio-rs/axum/discussions/2377
     514              :         // * Rewrite make_service() -> into_make_service()
     515              :         // * Rewrite oneshot() (not available for NormalizePath)
     516              :         // I didn't manage to get it working correctly
     517              :         let chain = vec![
     518              :             // create 1/2/3/4, 1/2/3/5, delete prefix 1/2/3 -> empty
     519              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
     520              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), // we can override file contents
     521              :             (f(t2, "/3/5"), "PUT", "", StatusCode::OK, false),
     522              :             (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
     523              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     524              :             (f(t2, "/3/5"), "GET", "", StatusCode::NOT_FOUND, false),
     525              :             // create 1/2/3/4, 1/2/5/6, delete prefix 1/2/3 -> 1/2/5/6
     526              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
     527              :             (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
     528              :             (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
     529              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     530              :             (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
     531              :             // create 1/2/3/4, 1/2/7/8, delete prefix 1/2 -> empty
     532              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
     533              :             (f(t2, "/7/8"), "PUT", "", StatusCode::OK, false),
     534              :             (f(t2, ""), "DELETE", "", StatusCode::OK, false),
     535              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     536              :             (f(t2, "/7/8"), "GET", "", StatusCode::NOT_FOUND, false),
     537              :             // create 1/2/3/4, 1/2/5/6, 1/3/8/9, delete prefix 1/2/3 -> 1/2/5/6, 1/3/8/9
     538              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
     539              :             (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
     540              :             (f(t3, "/8/9"), "PUT", "", StatusCode::OK, false),
     541              :             (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
     542              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     543              :             (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
     544              :             (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
     545              :             // create 1/4/5/6, delete prefix 1/2 -> 1/3/8/9, 1/4/5/6
     546              :             (f(t4, "/5/6"), "PUT", "", StatusCode::OK, false),
     547              :             (f(t2, ""), "DELETE", "", StatusCode::OK, false),
     548              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     549              :             (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
     550              :             (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
     551              :             (f(t4, "/5/6"), "GET", "", StatusCode::OK, false),
     552              :             // delete prefix 1 -> empty
     553              :             (format!("/{tenant_id}"), "DELETE", "", StatusCode::OK, false),
     554              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     555              :             (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
     556              :             (f(t3, "/8/9"), "GET", "", StatusCode::NOT_FOUND, false),
     557              :             (f(t4, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
     558              :         ];
     559              :         requests_chain(chain.into_iter(), delete_prefix_token).await;
     560              :     }
     561              : }
        

Generated by: LCOV version 2.1-beta