LCOV - code coverage report
Current view: top level - endpoint_storage/src - app.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 85.9 % 241 207
Test Date: 2025-07-16 12:29:03 Functions: 79.2 % 48 38

            Line data    Source code
       1              : use anyhow::anyhow;
       2              : use axum::body::{Body, Bytes};
       3              : use axum::response::{IntoResponse, Response};
       4              : use axum::{Router, http::StatusCode};
       5              : use endpoint_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
       6              : use remote_storage::TimeoutOrCancel;
       7              : use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
       8              : use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
       9              : use tokio_util::sync::CancellationToken;
      10              : use tracing::{error, info};
      11              : use utils::backoff::retry;
      12              : 
      13           35 : pub fn app(state: Arc<Storage>) -> Router<()> {
      14              :     use axum::routing::{delete as _delete, get as _get};
      15           35 :     let delete_prefix = _delete(delete_prefix);
      16              :     // NB: On any changes do not forget to update the OpenAPI spec
      17              :     // in /endpoint_storage/src/openapi_spec.yml.
      18           35 :     Router::new()
      19           35 :         .route(
      20           35 :             "/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",
      21           35 :             _get(get).put(set).delete(delete),
      22              :         )
      23           35 :         .route(
      24           35 :             "/{tenant_id}/{timeline_id}/{endpoint_id}",
      25           35 :             delete_prefix.clone(),
      26              :         )
      27           35 :         .route("/{tenant_id}/{timeline_id}", delete_prefix.clone())
      28           35 :         .route("/{tenant_id}", delete_prefix)
      29           35 :         .route("/metrics", _get(metrics))
      30           35 :         .route("/status", _get(async || StatusCode::OK.into_response()))
      31           35 :         .with_state(state)
      32           35 : }
      33              : 
      34              : type Result = anyhow::Result<Response, Response>;
      35              : type State = axum::extract::State<Arc<Storage>>;
      36              : 
      37              : const CONTENT_TYPE: &str = "content-type";
      38              : const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
      39              : const WARN_THRESHOLD: u32 = 3;
      40              : const MAX_RETRIES: u32 = 10;
      41              : 
      42            1 : async fn metrics() -> Result {
      43            1 :     prometheus::TextEncoder::new()
      44            1 :         .encode_to_string(&prometheus::gather())
      45            1 :         .map(|s| s.into_response())
      46            1 :         .map_err(|e| internal_error(e, "/metrics", "collecting metrics"))
      47            1 : }
      48              : 
      49           21 : async fn get(S3Path { path }: S3Path, state: State) -> Result {
      50           21 :     info!(%path, "downloading");
      51           21 :     let download_err = |err| {
      52           15 :         if let DownloadError::NotFound = err {
      53           15 :             info!(%path, %err, "downloading"); // 404 is not an issue of _this_ service
      54           15 :             return not_found(&path);
      55            0 :         }
      56            0 :         internal_error(err, &path, "downloading")
      57           15 :     };
      58           21 :     let cancel = state.cancel.clone();
      59           21 :     let opts = &DownloadOpts::default();
      60              : 
      61           21 :     let stream = retry(
      62            0 :         async || state.storage.download(&path, opts, &cancel).await,
      63              :         DownloadError::is_permanent,
      64              :         WARN_THRESHOLD,
      65              :         MAX_RETRIES,
      66           21 :         "downloading",
      67           21 :         &cancel,
      68              :     )
      69           21 :     .await
      70           21 :     .unwrap_or(Err(DownloadError::Cancelled))
      71           21 :     .map_err(download_err)?
      72              :     .download_stream;
      73              : 
      74            6 :     Response::builder()
      75            6 :         .status(StatusCode::OK)
      76            6 :         .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
      77            6 :         .body(Body::from_stream(stream))
      78            6 :         .map_err(|e| internal_error(e, path, "reading response"))
      79           21 : }
      80              : 
      81              : // Best solution for files is multipart upload, but remote_storage doesn't support it,
      82              : // so we can either read Bytes in memory and push at once or forward BodyDataStream to
      83              : // remote_storage. The latter may seem more peformant, but BodyDataStream doesn't have a
      84              : // guaranteed size() which may produce issues while uploading to s3.
      85              : // So, currently we're going with an in-memory copy plus a boundary to prevent uploading
      86              : // very large files.
      87           13 : async fn set(S3Path { path }: S3Path, state: State, bytes: Bytes) -> Result {
      88           13 :     info!(%path, "uploading");
      89           13 :     let request_len = bytes.len();
      90           13 :     let max_len = state.max_upload_file_limit;
      91           13 :     if request_len > max_len {
      92            0 :         return Err(bad_request(
      93            0 :             anyhow!("File size {request_len} exceeds max {max_len}"),
      94            0 :             "uploading",
      95            0 :         ));
      96           13 :     }
      97              : 
      98           13 :     let cancel = state.cancel.clone();
      99           13 :     let fun = async || {
     100            0 :         let stream = bytes_to_stream(bytes.clone());
     101            0 :         state
     102            0 :             .storage
     103            0 :             .upload(stream, request_len, &path, None, &cancel)
     104            0 :             .await
     105           13 :     };
     106           13 :     retry(
     107           13 :         fun,
     108           13 :         TimeoutOrCancel::caused_by_cancel,
     109           13 :         WARN_THRESHOLD,
     110           13 :         MAX_RETRIES,
     111           13 :         "uploading",
     112           13 :         &cancel,
     113           13 :     )
     114           13 :     .await
     115           13 :     .unwrap_or(Err(anyhow!("uploading cancelled")))
     116           13 :     .map_err(|e| internal_error(e, path, "reading response"))?;
     117           13 :     Ok(ok())
     118           13 : }
     119              : 
     120            2 : async fn delete(S3Path { path }: S3Path, state: State) -> Result {
     121            2 :     info!(%path, "deleting");
     122            2 :     let cancel = state.cancel.clone();
     123            2 :     retry(
     124            0 :         async || state.storage.delete(&path, &cancel).await,
     125              :         TimeoutOrCancel::caused_by_cancel,
     126              :         WARN_THRESHOLD,
     127              :         MAX_RETRIES,
     128            2 :         "deleting",
     129            2 :         &cancel,
     130              :     )
     131            2 :     .await
     132            2 :     .unwrap_or(Err(anyhow!("deleting cancelled")))
     133            2 :     .map_err(|e| internal_error(e, path, "deleting"))?;
     134            2 :     Ok(ok())
     135            2 : }
     136              : 
     137            6 : async fn delete_prefix(PrefixS3Path { path }: PrefixS3Path, state: State) -> Result {
     138            6 :     info!(%path, "deleting prefix");
     139            6 :     let cancel = state.cancel.clone();
     140            6 :     retry(
     141            0 :         async || state.storage.delete_prefix(&path, &cancel).await,
     142              :         TimeoutOrCancel::caused_by_cancel,
     143              :         WARN_THRESHOLD,
     144              :         MAX_RETRIES,
     145            6 :         "deleting prefix",
     146            6 :         &cancel,
     147              :     )
     148            6 :     .await
     149            6 :     .unwrap_or(Err(anyhow!("deleting prefix cancelled")))
     150            6 :     .map_err(|e| internal_error(e, path, "deleting prefix"))?;
     151            6 :     Ok(ok())
     152            6 : }
     153              : 
     154           35 : pub async fn check_storage_permissions(
     155           35 :     client: &GenericRemoteStorage,
     156           35 :     cancel: CancellationToken,
     157           35 : ) -> anyhow::Result<()> {
     158           35 :     info!("storage permissions check");
     159              : 
     160              :     // as_nanos() as multiple instances proxying same bucket may be started at once
     161           35 :     let now = SystemTime::now()
     162           35 :         .duration_since(UNIX_EPOCH)?
     163           35 :         .as_nanos()
     164           35 :         .to_string();
     165              : 
     166           35 :     let path = RemotePath::from_string(&format!("write_access_{now}"))?;
     167           35 :     info!(%path, "uploading");
     168              : 
     169           35 :     let body = now.to_string();
     170           35 :     let stream = bytes_to_stream(Bytes::from(body.clone()));
     171           35 :     client
     172           35 :         .upload(stream, body.len(), &path, None, &cancel)
     173           35 :         .await?;
     174              : 
     175              :     use tokio::io::AsyncReadExt;
     176           35 :     info!(%path, "downloading");
     177           35 :     let download_opts = DownloadOpts {
     178           35 :         kind: remote_storage::DownloadKind::Small,
     179           35 :         ..Default::default()
     180           35 :     };
     181           35 :     let mut body_read_buf = Vec::new();
     182           35 :     let stream = client
     183           35 :         .download(&path, &download_opts, &cancel)
     184           35 :         .await?
     185              :         .download_stream;
     186           35 :     tokio_util::io::StreamReader::new(stream)
     187           35 :         .read_to_end(&mut body_read_buf)
     188           35 :         .await?;
     189           35 :     let body_read = String::from_utf8(body_read_buf)?;
     190           35 :     if body != body_read {
     191            0 :         error!(%body, %body_read, "File contents do not match");
     192            0 :         anyhow::bail!("Read back file doesn't match original")
     193           35 :     }
     194              : 
     195           35 :     info!(%path, "removing");
     196           35 :     client.delete(&path, &cancel).await
     197           35 : }
     198              : 
     199           48 : fn bytes_to_stream(bytes: Bytes) -> impl futures::Stream<Item = std::io::Result<Bytes>> {
     200           48 :     futures::stream::once(futures::future::ready(Ok(bytes)))
     201           48 : }
     202              : 
     203              : #[cfg(test)]
     204              : mod tests {
     205              :     use super::*;
     206              :     use axum::{body::Body, extract::Request, response::Response};
     207              :     use http_body_util::BodyExt;
     208              :     use itertools::iproduct;
     209              :     use std::env::var;
     210              :     use std::sync::Arc;
     211              :     use std::time::Duration;
     212              :     use test_log::test as testlog;
     213              :     use tower::{Service, util::ServiceExt};
     214              :     use utils::id::{TenantId, TimelineId};
     215              : 
     216              :     // see libs/remote_storage/tests/test_real_s3.rs
     217              :     const REAL_S3_ENV: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
     218              :     const REAL_S3_BUCKET: &str = "REMOTE_STORAGE_S3_BUCKET";
     219              :     const REAL_S3_REGION: &str = "REMOTE_STORAGE_S3_REGION";
     220              : 
     221           35 :     async fn proxy() -> (Storage, Option<camino_tempfile::Utf8TempDir>) {
     222           35 :         let cancel = CancellationToken::new();
     223           35 :         let (dir, storage) = if var(REAL_S3_ENV).is_err() {
     224              :             // tests execute in parallel and we need a new directory for each of them
     225           35 :             let dir = camino_tempfile::tempdir().unwrap();
     226           35 :             let fs =
     227           35 :                 remote_storage::LocalFs::new(dir.path().into(), Duration::from_secs(5)).unwrap();
     228           35 :             (Some(dir), GenericRemoteStorage::LocalFs(fs))
     229              :         } else {
     230              :             // test_real_s3::create_s3_client is hard to reference, reimplementing here
     231            0 :             let millis = SystemTime::now()
     232            0 :                 .duration_since(UNIX_EPOCH)
     233            0 :                 .unwrap()
     234            0 :                 .as_millis();
     235              :             use rand::Rng;
     236            0 :             let random = rand::thread_rng().r#gen::<u32>();
     237              : 
     238            0 :             let s3_config = remote_storage::S3Config {
     239            0 :                 bucket_name: var(REAL_S3_BUCKET).unwrap(),
     240            0 :                 bucket_region: var(REAL_S3_REGION).unwrap(),
     241            0 :                 prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
     242            0 :                 endpoint: None,
     243            0 :                 concurrency_limit: std::num::NonZeroUsize::new(100).unwrap(),
     244            0 :                 max_keys_per_list_response: None,
     245            0 :                 upload_storage_class: None,
     246            0 :             };
     247            0 :             let bucket = remote_storage::S3Bucket::new(&s3_config, Duration::from_secs(1))
     248            0 :                 .await
     249            0 :                 .unwrap();
     250            0 :             (None, GenericRemoteStorage::AwsS3(Arc::new(bucket)))
     251              :         };
     252              : 
     253           35 :         let proxy = Storage {
     254           35 :             auth: endpoint_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
     255           35 :             storage,
     256           35 :             cancel: cancel.clone(),
     257           35 :             max_upload_file_limit: usize::MAX,
     258           35 :         };
     259           35 :         check_storage_permissions(&proxy.storage, cancel)
     260           35 :             .await
     261           35 :             .unwrap();
     262           35 :         (proxy, dir)
     263           35 :     }
     264              : 
     265              :     // see libs/utils/src/auth.rs
     266              :     const TEST_PUB_KEY_ED25519: &[u8] = b"
     267              : -----BEGIN PUBLIC KEY-----
     268              : MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w=
     269              : -----END PUBLIC KEY-----
     270              : ";
     271              : 
     272              :     const TEST_PRIV_KEY_ED25519: &[u8] = br#"
     273              : -----BEGIN PRIVATE KEY-----
     274              : MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
     275              : -----END PRIVATE KEY-----
     276              : "#;
     277              : 
     278           30 :     async fn request(req: Request<Body>) -> Response<Body> {
     279           30 :         let (proxy, _) = proxy().await;
     280           30 :         app(Arc::new(proxy))
     281           30 :             .into_service()
     282           30 :             .oneshot(req)
     283           30 :             .await
     284           30 :             .unwrap()
     285           30 :     }
     286              : 
     287              :     #[testlog(tokio::test)]
     288              :     async fn status() {
     289              :         let res = Request::builder()
     290              :             .uri("/status")
     291              :             .body(Body::empty())
     292              :             .map(request)
     293              :             .unwrap()
     294              :             .await;
     295              :         assert_eq!(res.status(), StatusCode::OK);
     296              :     }
     297              : 
     298            3 :     fn routes() -> impl Iterator<Item = (&'static str, &'static str)> {
     299            3 :         iproduct!(
     300            3 :             vec!["/1", "/1/2", "/1/2/3", "/1/2/3/4"],
     301            3 :             vec!["GET", "PUT", "DELETE"]
     302              :         )
     303            3 :     }
     304              : 
     305              :     #[testlog(tokio::test)]
     306              :     async fn no_token() {
     307              :         for (uri, method) in routes() {
     308              :             info!(%uri, %method);
     309              :             let res = Request::builder()
     310              :                 .uri(uri)
     311              :                 .method(method)
     312              :                 .body(Body::empty())
     313              :                 .map(request)
     314              :                 .unwrap()
     315              :                 .await;
     316              :             assert!(matches!(
     317              :                 res.status(),
     318              :                 StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
     319              :             ));
     320              :         }
     321              :     }
     322              : 
     323              :     #[testlog(tokio::test)]
     324              :     async fn invalid_token() {
     325              :         for (uri, method) in routes() {
     326              :             info!(%uri, %method);
     327              :             let status = Request::builder()
     328              :                 .uri(uri)
     329              :                 .header("Authorization", "Bearer 123")
     330              :                 .method(method)
     331              :                 .body(Body::empty())
     332              :                 .map(request)
     333              :                 .unwrap()
     334              :                 .await;
     335              :             assert!(matches!(
     336              :                 status.status(),
     337              :                 StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
     338              :             ));
     339              :         }
     340              :     }
     341              : 
     342              :     const TENANT_ID: TenantId =
     343              :         TenantId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
     344              :     const TIMELINE_ID: TimelineId =
     345              :         TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
     346              :     const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
     347           11 :     fn token() -> String {
     348           11 :         let claims = endpoint_storage::claims::EndpointStorageClaims {
     349           11 :             tenant_id: TENANT_ID,
     350           11 :             timeline_id: TIMELINE_ID,
     351           11 :             endpoint_id: ENDPOINT_ID.into(),
     352           11 :             exp: u64::MAX,
     353           11 :         };
     354           11 :         let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
     355           11 :         let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
     356           11 :         jsonwebtoken::encode(&header, &claims, &key).unwrap()
     357           11 :     }
     358              : 
     359              :     #[testlog(tokio::test)]
     360              :     async fn unauthorized() {
     361              :         let (proxy, _) = proxy().await;
     362              :         let mut app = app(Arc::new(proxy)).into_service();
     363              :         let token = token();
     364              :         let args = itertools::iproduct!(
     365              :             vec![TENANT_ID.to_string(), TenantId::generate().to_string()],
     366              :             vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
     367              :             vec![ENDPOINT_ID, "ep-ololo"]
     368              :         )
     369              :         // first one is fully valid path, second path is valid for GET as
     370              :         // read paths may have different endpoint if tenant and timeline matches
     371              :         // (needed for prewarming RO->RW replica)
     372              :         .skip(2);
     373              : 
     374              :         for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
     375              :             info!(%uri, %method, %tenant, %timeline, %endpoint);
     376              :             let request = Request::builder()
     377              :                 .uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
     378              :                 .method(method)
     379              :                 .header("Authorization", format!("Bearer {token}"))
     380              :                 .body(Body::empty())
     381              :                 .unwrap();
     382              :             let status = ServiceExt::ready(&mut app)
     383              :                 .await
     384              :                 .unwrap()
     385              :                 .call(request)
     386              :                 .await
     387              :                 .unwrap()
     388              :                 .status();
     389              :             assert_eq!(status, StatusCode::UNAUTHORIZED);
     390              :         }
     391              :     }
     392              : 
     393              :     #[testlog(tokio::test)]
     394              :     async fn method_not_allowed() {
     395              :         let token = token();
     396              :         let iter = iproduct!(vec!["", "/.."], vec!["GET", "PUT"]);
     397              :         for (key, method) in iter {
     398              :             let status = Request::builder()
     399              :                 .uri(format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}{key}"))
     400              :                 .method(method)
     401              :                 .header("Authorization", format!("Bearer {token}"))
     402              :                 .body(Body::empty())
     403              :                 .map(request)
     404              :                 .unwrap()
     405              :                 .await
     406              :                 .status();
     407              :             assert!(matches!(
     408              :                 status,
     409              :                 StatusCode::BAD_REQUEST | StatusCode::METHOD_NOT_ALLOWED
     410              :             ));
     411              :         }
     412              :     }
     413              : 
     414            4 :     async fn requests_chain(
     415            4 :         chain: impl Iterator<Item = (String, &str, &'static str, StatusCode, bool)>,
     416            4 :         token: impl Fn(&str) -> String,
     417            4 :     ) {
     418            4 :         let (proxy, _) = proxy().await;
     419            4 :         let mut app = app(Arc::new(proxy)).into_service();
     420           47 :         for (uri, method, body, expected_status, compare_body) in chain {
     421           43 :             info!(%uri, %method, %body, %expected_status);
     422           43 :             let bearer = format!("Bearer {}", token(&uri));
     423           43 :             let request = Request::builder()
     424           43 :                 .uri(uri)
     425           43 :                 .method(method)
     426           43 :                 .header("Authorization", &bearer)
     427           43 :                 .body(Body::from(body))
     428           43 :                 .unwrap();
     429           43 :             let response = ServiceExt::ready(&mut app)
     430           43 :                 .await
     431           43 :                 .unwrap()
     432           43 :                 .call(request)
     433           43 :                 .await
     434           43 :                 .unwrap();
     435           43 :             assert_eq!(response.status(), expected_status);
     436           43 :             if !compare_body {
     437           42 :                 continue;
     438            1 :             }
     439            1 :             let read_body = response.into_body().collect().await.unwrap().to_bytes();
     440            1 :             assert_eq!(body, read_body);
     441              :         }
     442            4 :     }
     443              : 
     444              :     #[testlog(tokio::test)]
     445              :     async fn metrics() {
     446              :         let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
     447              :         let req = vec![
     448              :             (uri.clone(), "PUT", "body", StatusCode::OK, false),
     449              :             (uri.clone(), "DELETE", "", StatusCode::OK, false),
     450              :         ];
     451            2 :         requests_chain(req.into_iter(), |_| token()).await;
     452              : 
     453              :         let res = Request::builder()
     454              :             .uri("/metrics")
     455              :             .body(Body::empty())
     456              :             .map(request)
     457              :             .unwrap()
     458              :             .await;
     459              :         assert_eq!(res.status(), StatusCode::OK);
     460              :         let body = res.into_body().collect().await.unwrap().to_bytes();
     461              :         let body = String::from_utf8_lossy(&body);
     462              :         tracing::debug!(%body);
     463              :         // Storage metrics are not gathered for LocalFs
     464              :         if var(REAL_S3_ENV).is_ok() {
     465              :             assert!(body.contains("remote_storage_s3_deleted_objects_total"));
     466              :         }
     467              : 
     468              :         #[cfg(target_os = "linux")]
     469              :         assert!(body.contains("process_threads"));
     470              :     }
     471              : 
     472              :     #[testlog(tokio::test)]
     473              :     async fn insert_retrieve_remove() {
     474              :         let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
     475              :         let chain = vec![
     476              :             (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
     477              :             (uri.clone(), "PUT", "пыщьпыщь", StatusCode::OK, false),
     478              :             (uri.clone(), "GET", "пыщьпыщь", StatusCode::OK, true),
     479              :             (uri.clone(), "DELETE", "", StatusCode::OK, false),
     480              :             (uri, "GET", "", StatusCode::NOT_FOUND, false),
     481              :         ];
     482            5 :         requests_chain(chain.into_iter(), |_| token()).await;
     483              :     }
     484              : 
     485              :     #[testlog(tokio::test)]
     486              :     async fn read_other_endpoint_data() {
     487              :         let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/other_endpoint/key");
     488              :         let chain = vec![
     489              :             (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
     490              :             (uri.clone(), "PUT", "", StatusCode::UNAUTHORIZED, false),
     491              :         ];
     492            2 :         requests_chain(chain.into_iter(), |_| token()).await;
     493              :     }
     494              : 
     495           34 :     fn delete_prefix_token(uri: &str) -> String {
     496           34 :         let parts = uri.split("/").collect::<Vec<&str>>();
     497           34 :         let claims = endpoint_storage::claims::DeletePrefixClaims {
     498           34 :             tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
     499           34 :             timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
     500           34 :             endpoint_id: parts.get(3).map(ToString::to_string),
     501              :             exp: u64::MAX,
     502              :         };
     503           34 :         let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
     504           34 :         let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
     505           34 :         jsonwebtoken::encode(&header, &claims, &key).unwrap()
     506           34 :     }
     507              : 
     508              :     // Can't use single digit numbers as they won't be validated as TimelineId and EndpointId
     509              :     #[testlog(tokio::test)]
     510              :     async fn delete_prefix() {
     511              :         let tenant_id =
     512              :             TenantId::from_array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to_string();
     513              :         let t2 = TimelineId::from_array([2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
     514              :         let t3 = TimelineId::from_array([3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
     515              :         let t4 = TimelineId::from_array([4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
     516           33 :         let f = |timeline, path| format!("/{tenant_id}/{timeline}{path}");
     517              :         // Why extra slash in string literals? Axum is weird with URIs:
     518              :         // /1/2 and 1/2/ match different routes, thus first yields OK and second NOT_FOUND
     519              :         //  as it matches /tenant/timeline/endpoint, see https://stackoverflow.com/a/75355932
     520              :         // The cost of removing trailing slash is suprisingly hard:
     521              :         // * Add tower dependency with NormalizePath layer
     522              :         // * wrap Router<()> in this layer https://github.com/tokio-rs/axum/discussions/2377
     523              :         // * Rewrite make_service() -> into_make_service()
     524              :         // * Rewrite oneshot() (not available for NormalizePath)
     525              :         // I didn't manage to get it working correctly
     526              :         let chain = vec![
     527              :             // create 1/2/3/4, 1/2/3/5, delete prefix 1/2/3 -> empty
     528              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
     529              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), // we can override file contents
     530              :             (f(t2, "/3/5"), "PUT", "", StatusCode::OK, false),
     531              :             (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
     532              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     533              :             (f(t2, "/3/5"), "GET", "", StatusCode::NOT_FOUND, false),
     534              :             // create 1/2/3/4, 1/2/5/6, delete prefix 1/2/3 -> 1/2/5/6
     535              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
     536              :             (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
     537              :             (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
     538              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     539              :             (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
     540              :             // create 1/2/3/4, 1/2/7/8, delete prefix 1/2 -> empty
     541              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
     542              :             (f(t2, "/7/8"), "PUT", "", StatusCode::OK, false),
     543              :             (f(t2, ""), "DELETE", "", StatusCode::OK, false),
     544              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     545              :             (f(t2, "/7/8"), "GET", "", StatusCode::NOT_FOUND, false),
     546              :             // create 1/2/3/4, 1/2/5/6, 1/3/8/9, delete prefix 1/2/3 -> 1/2/5/6, 1/3/8/9
     547              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
     548              :             (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
     549              :             (f(t3, "/8/9"), "PUT", "", StatusCode::OK, false),
     550              :             (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
     551              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     552              :             (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
     553              :             (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
     554              :             // create 1/4/5/6, delete prefix 1/2 -> 1/3/8/9, 1/4/5/6
     555              :             (f(t4, "/5/6"), "PUT", "", StatusCode::OK, false),
     556              :             (f(t2, ""), "DELETE", "", StatusCode::OK, false),
     557              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     558              :             (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
     559              :             (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
     560              :             (f(t4, "/5/6"), "GET", "", StatusCode::OK, false),
     561              :             // delete prefix 1 -> empty
     562              :             (format!("/{tenant_id}"), "DELETE", "", StatusCode::OK, false),
     563              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     564              :             (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
     565              :             (f(t3, "/8/9"), "GET", "", StatusCode::NOT_FOUND, false),
     566              :             (f(t4, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
     567              :         ];
     568              :         requests_chain(chain.into_iter(), delete_prefix_token).await;
     569              :     }
     570              : }
        

Generated by: LCOV version 2.1-beta