LCOV - code coverage report
Current view: top level - endpoint_storage/src - app.rs (source / functions) Coverage Total Hit
Test: a1cc1f33dc9899e4da66eb51e44e911a4b3bd648.info Lines: 86.0 % 243 209
Test Date: 2025-07-31 11:35:14 Functions: 79.2 % 48 38

            Line data    Source code
       1              : use anyhow::anyhow;
       2              : use axum::body::{Body, Bytes};
       3              : use axum::response::{IntoResponse, Response};
       4              : use axum::{Router, http::StatusCode};
       5              : use endpoint_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
       6              : use remote_storage::TimeoutOrCancel;
       7              : use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
       8              : use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
       9              : use tokio_util::sync::CancellationToken;
      10              : use tracing::{error, info};
      11              : use utils::backoff::retry;
      12              : 
      13           35 : pub fn app(state: Arc<Storage>) -> Router<()> {
      14              :     use axum::routing::{delete as _delete, get as _get};
      15           35 :     let delete_prefix = _delete(delete_prefix);
      16              :     // NB: On any changes do not forget to update the OpenAPI spec
      17              :     // in /endpoint_storage/src/openapi_spec.yml.
      18           35 :     Router::new()
      19           35 :         .route(
      20           35 :             "/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",
      21           35 :             _get(get).put(set).delete(delete),
      22              :         )
      23           35 :         .route(
      24           35 :             "/{tenant_id}/{timeline_id}/{endpoint_id}",
      25           35 :             delete_prefix.clone(),
      26              :         )
      27           35 :         .route("/{tenant_id}/{timeline_id}", delete_prefix.clone())
      28           35 :         .route("/{tenant_id}", delete_prefix)
      29           35 :         .route("/metrics", _get(metrics))
      30           35 :         .route("/status", _get(async || StatusCode::OK.into_response()))
      31           35 :         .with_state(state)
      32           35 : }
      33              : 
      34              : type Result = anyhow::Result<Response, Response>;
      35              : type State = axum::extract::State<Arc<Storage>>;
      36              : 
      37              : const CONTENT_TYPE: &str = "content-type";
      38              : const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
      39              : const WARN_THRESHOLD: u32 = 3;
      40              : const MAX_RETRIES: u32 = 10;
      41              : 
      42            1 : async fn metrics() -> Result {
      43            1 :     prometheus::TextEncoder::new()
      44            1 :         .encode_to_string(&prometheus::gather())
      45            1 :         .map(|s| s.into_response())
      46            1 :         .map_err(|e| internal_error(e, "/metrics", "collecting metrics"))
      47            1 : }
      48              : 
      49           21 : async fn get(S3Path { path }: S3Path, state: State) -> Result {
      50           21 :     info!(%path, "downloading");
      51           21 :     let download_err = |err| {
      52           15 :         if let DownloadError::NotFound = err {
      53           15 :             info!(%path, %err, "downloading"); // 404 is not an issue of _this_ service
      54           15 :             return not_found(&path);
      55            0 :         }
      56            0 :         internal_error(err, &path, "downloading")
      57           15 :     };
      58           21 :     let cancel = state.cancel.clone();
      59           21 :     let opts = &DownloadOpts::default();
      60              : 
      61           21 :     let stream = retry(
      62            0 :         async || state.storage.download(&path, opts, &cancel).await,
      63              :         DownloadError::is_permanent,
      64              :         WARN_THRESHOLD,
      65              :         MAX_RETRIES,
      66           21 :         "downloading",
      67           21 :         &cancel,
      68              :     )
      69           21 :     .await
      70           21 :     .unwrap_or(Err(DownloadError::Cancelled))
      71           21 :     .map_err(download_err)?
      72              :     .download_stream;
      73              : 
      74            6 :     Response::builder()
      75            6 :         .status(StatusCode::OK)
      76            6 :         .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
      77            6 :         .body(Body::from_stream(stream))
      78            6 :         .map_err(|e| internal_error(e, path, "reading response"))
      79           21 : }
      80              : 
      81              : // Best solution for files is multipart upload, but remote_storage doesn't support it,
      82              : // so we can either read Bytes in memory and push at once or forward BodyDataStream to
      83              : // remote_storage. The latter may seem more peformant, but BodyDataStream doesn't have a
      84              : // guaranteed size() which may produce issues while uploading to s3.
      85              : // So, currently we're going with an in-memory copy plus a boundary to prevent uploading
      86              : // very large files.
      87           13 : async fn set(S3Path { path }: S3Path, state: State, bytes: Bytes) -> Result {
      88           13 :     info!(%path, "uploading");
      89           13 :     let request_len = bytes.len();
      90           13 :     let max_len = state.max_upload_file_limit;
      91           13 :     if request_len > max_len {
      92            0 :         return Err(bad_request(
      93            0 :             anyhow!("File size {request_len} exceeds max {max_len}"),
      94            0 :             "uploading",
      95            0 :         ));
      96           13 :     }
      97              : 
      98           13 :     let cancel = state.cancel.clone();
      99           13 :     let fun = async || {
     100            0 :         let stream = bytes_to_stream(bytes.clone());
     101            0 :         state
     102            0 :             .storage
     103            0 :             .upload(stream, request_len, &path, None, &cancel)
     104            0 :             .await
     105           13 :     };
     106           13 :     retry(
     107           13 :         fun,
     108           13 :         TimeoutOrCancel::caused_by_cancel,
     109           13 :         WARN_THRESHOLD,
     110           13 :         MAX_RETRIES,
     111           13 :         "uploading",
     112           13 :         &cancel,
     113           13 :     )
     114           13 :     .await
     115           13 :     .unwrap_or(Err(anyhow!("uploading cancelled")))
     116           13 :     .map_err(|e| internal_error(e, path, "reading response"))?;
     117           13 :     Ok(ok())
     118           13 : }
     119              : 
     120            2 : async fn delete(S3Path { path }: S3Path, state: State) -> Result {
     121            2 :     info!(%path, "deleting");
     122            2 :     let cancel = state.cancel.clone();
     123            2 :     retry(
     124            0 :         async || state.storage.delete(&path, &cancel).await,
     125              :         TimeoutOrCancel::caused_by_cancel,
     126              :         WARN_THRESHOLD,
     127              :         MAX_RETRIES,
     128            2 :         "deleting",
     129            2 :         &cancel,
     130              :     )
     131            2 :     .await
     132            2 :     .unwrap_or(Err(anyhow!("deleting cancelled")))
     133            2 :     .map_err(|e| internal_error(e, path, "deleting"))?;
     134            2 :     Ok(ok())
     135            2 : }
     136              : 
     137            6 : async fn delete_prefix(PrefixS3Path { path }: PrefixS3Path, state: State) -> Result {
     138            6 :     info!(%path, "deleting prefix");
     139            6 :     let cancel = state.cancel.clone();
     140            6 :     retry(
     141            0 :         async || state.storage.delete_prefix(&path, &cancel).await,
     142              :         TimeoutOrCancel::caused_by_cancel,
     143              :         WARN_THRESHOLD,
     144              :         MAX_RETRIES,
     145            6 :         "deleting prefix",
     146            6 :         &cancel,
     147              :     )
     148            6 :     .await
     149            6 :     .unwrap_or(Err(anyhow!("deleting prefix cancelled")))
     150            6 :     .map_err(|e| internal_error(e, path, "deleting prefix"))?;
     151            6 :     Ok(ok())
     152            6 : }
     153              : 
     154           35 : pub async fn check_storage_permissions(
     155           35 :     client: &GenericRemoteStorage,
     156           35 :     cancel: CancellationToken,
     157           35 : ) -> anyhow::Result<()> {
     158           35 :     info!("storage permissions check");
     159              : 
     160              :     // as_nanos() as multiple instances proxying same bucket may be started at once
     161           35 :     let now = SystemTime::now()
     162           35 :         .duration_since(UNIX_EPOCH)?
     163           35 :         .as_nanos()
     164           35 :         .to_string();
     165              : 
     166           35 :     let path = RemotePath::from_string(&format!("write_access_{now}"))?;
     167           35 :     info!(%path, "uploading");
     168              : 
     169           35 :     let body = now.to_string();
     170           35 :     let stream = bytes_to_stream(Bytes::from(body.clone()));
     171           35 :     client
     172           35 :         .upload(stream, body.len(), &path, None, &cancel)
     173           35 :         .await?;
     174              : 
     175              :     use tokio::io::AsyncReadExt;
     176           35 :     info!(%path, "downloading");
     177           35 :     let download_opts = DownloadOpts {
     178           35 :         kind: remote_storage::DownloadKind::Small,
     179           35 :         ..Default::default()
     180           35 :     };
     181           35 :     let mut body_read_buf = Vec::new();
     182           35 :     let stream = client
     183           35 :         .download(&path, &download_opts, &cancel)
     184           35 :         .await?
     185              :         .download_stream;
     186           35 :     tokio_util::io::StreamReader::new(stream)
     187           35 :         .read_to_end(&mut body_read_buf)
     188           35 :         .await?;
     189           35 :     let body_read = String::from_utf8(body_read_buf)?;
     190           35 :     if body != body_read {
     191            0 :         error!(%body, %body_read, "File contents do not match");
     192            0 :         anyhow::bail!("Read back file doesn't match original")
     193           35 :     }
     194              : 
     195           35 :     info!(%path, "removing");
     196           35 :     client.delete(&path, &cancel).await
     197           35 : }
     198              : 
     199           48 : fn bytes_to_stream(bytes: Bytes) -> impl futures::Stream<Item = std::io::Result<Bytes>> {
     200           48 :     futures::stream::once(futures::future::ready(Ok(bytes)))
     201           48 : }
     202              : 
     203              : #[cfg(test)]
     204              : mod tests {
     205              :     use super::*;
     206              :     use axum::{body::Body, extract::Request, response::Response};
     207              :     use http_body_util::BodyExt;
     208              :     use itertools::iproduct;
     209              :     use jsonwebtoken::DecodingKey;
     210              :     use std::env::var;
     211              :     use std::sync::Arc;
     212              :     use std::time::Duration;
     213              :     use test_log::test as testlog;
     214              :     use tower::{Service, util::ServiceExt};
     215              :     use utils::{
     216              :         auth::JwtAuth,
     217              :         id::{TenantId, TimelineId},
     218              :     };
     219              : 
     220              :     // see libs/remote_storage/tests/test_real_s3.rs
     221              :     const REAL_S3_ENV: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
     222              :     const REAL_S3_BUCKET: &str = "REMOTE_STORAGE_S3_BUCKET";
     223              :     const REAL_S3_REGION: &str = "REMOTE_STORAGE_S3_REGION";
     224              : 
     225           35 :     async fn proxy() -> (Storage, Option<camino_tempfile::Utf8TempDir>) {
     226           35 :         let cancel = CancellationToken::new();
     227           35 :         let (dir, storage) = if var(REAL_S3_ENV).is_err() {
     228              :             // tests execute in parallel and we need a new directory for each of them
     229           35 :             let dir = camino_tempfile::tempdir().unwrap();
     230           35 :             let fs =
     231           35 :                 remote_storage::LocalFs::new(dir.path().into(), Duration::from_secs(5)).unwrap();
     232           35 :             (Some(dir), GenericRemoteStorage::LocalFs(fs))
     233              :         } else {
     234              :             // test_real_s3::create_s3_client is hard to reference, reimplementing here
     235            0 :             let millis = SystemTime::now()
     236            0 :                 .duration_since(UNIX_EPOCH)
     237            0 :                 .unwrap()
     238            0 :                 .as_millis();
     239              :             use rand::Rng;
     240            0 :             let random = rand::rng().random::<u32>();
     241              : 
     242            0 :             let s3_config = remote_storage::S3Config {
     243            0 :                 bucket_name: var(REAL_S3_BUCKET).unwrap(),
     244            0 :                 bucket_region: var(REAL_S3_REGION).unwrap(),
     245            0 :                 prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
     246            0 :                 endpoint: None,
     247            0 :                 concurrency_limit: std::num::NonZeroUsize::new(100).unwrap(),
     248            0 :                 max_keys_per_list_response: None,
     249            0 :                 upload_storage_class: None,
     250            0 :             };
     251            0 :             let bucket = remote_storage::S3Bucket::new(&s3_config, Duration::from_secs(1))
     252            0 :                 .await
     253            0 :                 .unwrap();
     254            0 :             (None, GenericRemoteStorage::AwsS3(Arc::new(bucket)))
     255              :         };
     256              : 
     257           35 :         let proxy = Storage {
     258           35 :             auth: JwtAuth::new(vec![
     259           35 :                 DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(),
     260           35 :             ]),
     261           35 :             storage,
     262           35 :             cancel: cancel.clone(),
     263           35 :             max_upload_file_limit: usize::MAX,
     264           35 :         };
     265           35 :         check_storage_permissions(&proxy.storage, cancel)
     266           35 :             .await
     267           35 :             .unwrap();
     268           35 :         (proxy, dir)
     269           35 :     }
     270              : 
     271              :     // see libs/utils/src/auth.rs
     272              :     const TEST_PUB_KEY_ED25519: &[u8] = b"
     273              : -----BEGIN PUBLIC KEY-----
     274              : MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w=
     275              : -----END PUBLIC KEY-----
     276              : ";
     277              : 
     278              :     const TEST_PRIV_KEY_ED25519: &[u8] = br#"
     279              : -----BEGIN PRIVATE KEY-----
     280              : MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
     281              : -----END PRIVATE KEY-----
     282              : "#;
     283              : 
     284           30 :     async fn request(req: Request<Body>) -> Response<Body> {
     285           30 :         let (proxy, _) = proxy().await;
     286           30 :         app(Arc::new(proxy))
     287           30 :             .into_service()
     288           30 :             .oneshot(req)
     289           30 :             .await
     290           30 :             .unwrap()
     291           30 :     }
     292              : 
     293              :     #[testlog(tokio::test)]
     294              :     async fn status() {
     295              :         let res = Request::builder()
     296              :             .uri("/status")
     297              :             .body(Body::empty())
     298              :             .map(request)
     299              :             .unwrap()
     300              :             .await;
     301              :         assert_eq!(res.status(), StatusCode::OK);
     302              :     }
     303              : 
     304            3 :     fn routes() -> impl Iterator<Item = (&'static str, &'static str)> {
     305            3 :         iproduct!(
     306            3 :             vec!["/1", "/1/2", "/1/2/3", "/1/2/3/4"],
     307            3 :             vec!["GET", "PUT", "DELETE"]
     308              :         )
     309            3 :     }
     310              : 
     311              :     #[testlog(tokio::test)]
     312              :     async fn no_token() {
     313              :         for (uri, method) in routes() {
     314              :             info!(%uri, %method);
     315              :             let res = Request::builder()
     316              :                 .uri(uri)
     317              :                 .method(method)
     318              :                 .body(Body::empty())
     319              :                 .map(request)
     320              :                 .unwrap()
     321              :                 .await;
     322              :             assert!(matches!(
     323              :                 res.status(),
     324              :                 StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
     325              :             ));
     326              :         }
     327              :     }
     328              : 
     329              :     #[testlog(tokio::test)]
     330              :     async fn invalid_token() {
     331              :         for (uri, method) in routes() {
     332              :             info!(%uri, %method);
     333              :             let status = Request::builder()
     334              :                 .uri(uri)
     335              :                 .header("Authorization", "Bearer 123")
     336              :                 .method(method)
     337              :                 .body(Body::empty())
     338              :                 .map(request)
     339              :                 .unwrap()
     340              :                 .await;
     341              :             assert!(matches!(
     342              :                 status.status(),
     343              :                 StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
     344              :             ));
     345              :         }
     346              :     }
     347              : 
     348              :     const TENANT_ID: TenantId =
     349              :         TenantId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
     350              :     const TIMELINE_ID: TimelineId =
     351              :         TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
     352              :     const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
     353           11 :     fn token() -> String {
     354           11 :         let claims = endpoint_storage::claims::EndpointStorageClaims {
     355           11 :             tenant_id: TENANT_ID,
     356           11 :             timeline_id: TIMELINE_ID,
     357           11 :             endpoint_id: ENDPOINT_ID.into(),
     358           11 :             exp: u64::MAX,
     359           11 :         };
     360           11 :         let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
     361           11 :         let header = jsonwebtoken::Header::new(jsonwebtoken::Algorithm::EdDSA);
     362           11 :         jsonwebtoken::encode(&header, &claims, &key).unwrap()
     363           11 :     }
     364              : 
     365              :     #[testlog(tokio::test)]
     366              :     async fn unauthorized() {
     367              :         let (proxy, _) = proxy().await;
     368              :         let mut app = app(Arc::new(proxy)).into_service();
     369              :         let token = token();
     370              :         let args = itertools::iproduct!(
     371              :             vec![TENANT_ID.to_string(), TenantId::generate().to_string()],
     372              :             vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
     373              :             vec![ENDPOINT_ID, "ep-ololo"]
     374              :         )
     375              :         // first one is fully valid path, second path is valid for GET as
     376              :         // read paths may have different endpoint if tenant and timeline matches
     377              :         // (needed for prewarming RO->RW replica)
     378              :         .skip(2);
     379              : 
     380              :         for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
     381              :             info!(%uri, %method, %tenant, %timeline, %endpoint);
     382              :             let request = Request::builder()
     383              :                 .uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
     384              :                 .method(method)
     385              :                 .header("Authorization", format!("Bearer {token}"))
     386              :                 .body(Body::empty())
     387              :                 .unwrap();
     388              :             let status = ServiceExt::ready(&mut app)
     389              :                 .await
     390              :                 .unwrap()
     391              :                 .call(request)
     392              :                 .await
     393              :                 .unwrap()
     394              :                 .status();
     395              :             assert_eq!(status, StatusCode::UNAUTHORIZED);
     396              :         }
     397              :     }
     398              : 
     399              :     #[testlog(tokio::test)]
     400              :     async fn method_not_allowed() {
     401              :         let token = token();
     402              :         let iter = iproduct!(vec!["", "/.."], vec!["GET", "PUT"]);
     403              :         for (key, method) in iter {
     404              :             let status = Request::builder()
     405              :                 .uri(format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}{key}"))
     406              :                 .method(method)
     407              :                 .header("Authorization", format!("Bearer {token}"))
     408              :                 .body(Body::empty())
     409              :                 .map(request)
     410              :                 .unwrap()
     411              :                 .await
     412              :                 .status();
     413              :             assert!(matches!(
     414              :                 status,
     415              :                 StatusCode::BAD_REQUEST | StatusCode::METHOD_NOT_ALLOWED
     416              :             ));
     417              :         }
     418              :     }
     419              : 
     420            4 :     async fn requests_chain(
     421            4 :         chain: impl Iterator<Item = (String, &str, &'static str, StatusCode, bool)>,
     422            4 :         token: impl Fn(&str) -> String,
     423            4 :     ) {
     424            4 :         let (proxy, _) = proxy().await;
     425            4 :         let mut app = app(Arc::new(proxy)).into_service();
     426           47 :         for (uri, method, body, expected_status, compare_body) in chain {
     427           43 :             info!(%uri, %method, %body, %expected_status);
     428           43 :             let bearer = format!("Bearer {}", token(&uri));
     429           43 :             let request = Request::builder()
     430           43 :                 .uri(uri)
     431           43 :                 .method(method)
     432           43 :                 .header("Authorization", &bearer)
     433           43 :                 .body(Body::from(body))
     434           43 :                 .unwrap();
     435           43 :             let response = ServiceExt::ready(&mut app)
     436           43 :                 .await
     437           43 :                 .unwrap()
     438           43 :                 .call(request)
     439           43 :                 .await
     440           43 :                 .unwrap();
     441           43 :             assert_eq!(response.status(), expected_status);
     442           43 :             if !compare_body {
     443           42 :                 continue;
     444            1 :             }
     445            1 :             let read_body = response.into_body().collect().await.unwrap().to_bytes();
     446            1 :             assert_eq!(body, read_body);
     447              :         }
     448            4 :     }
     449              : 
     450              :     #[testlog(tokio::test)]
     451              :     async fn metrics() {
     452              :         let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
     453              :         let req = vec![
     454              :             (uri.clone(), "PUT", "body", StatusCode::OK, false),
     455              :             (uri.clone(), "DELETE", "", StatusCode::OK, false),
     456              :         ];
     457            2 :         requests_chain(req.into_iter(), |_| token()).await;
     458              : 
     459              :         let res = Request::builder()
     460              :             .uri("/metrics")
     461              :             .body(Body::empty())
     462              :             .map(request)
     463              :             .unwrap()
     464              :             .await;
     465              :         assert_eq!(res.status(), StatusCode::OK);
     466              :         let body = res.into_body().collect().await.unwrap().to_bytes();
     467              :         let body = String::from_utf8_lossy(&body);
     468              :         tracing::debug!(%body);
     469              :         // Storage metrics are not gathered for LocalFs
     470              :         if var(REAL_S3_ENV).is_ok() {
     471              :             assert!(body.contains("remote_storage_s3_deleted_objects_total"));
     472              :         }
     473              : 
     474              :         #[cfg(target_os = "linux")]
     475              :         assert!(body.contains("process_threads"));
     476              :     }
     477              : 
     478              :     #[testlog(tokio::test)]
     479              :     async fn insert_retrieve_remove() {
     480              :         let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
     481              :         let chain = vec![
     482              :             (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
     483              :             (uri.clone(), "PUT", "пыщьпыщь", StatusCode::OK, false),
     484              :             (uri.clone(), "GET", "пыщьпыщь", StatusCode::OK, true),
     485              :             (uri.clone(), "DELETE", "", StatusCode::OK, false),
     486              :             (uri, "GET", "", StatusCode::NOT_FOUND, false),
     487              :         ];
     488            5 :         requests_chain(chain.into_iter(), |_| token()).await;
     489              :     }
     490              : 
     491              :     #[testlog(tokio::test)]
     492              :     async fn read_other_endpoint_data() {
     493              :         let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/other_endpoint/key");
     494              :         let chain = vec![
     495              :             (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
     496              :             (uri.clone(), "PUT", "", StatusCode::UNAUTHORIZED, false),
     497              :         ];
     498            2 :         requests_chain(chain.into_iter(), |_| token()).await;
     499              :     }
     500              : 
     501           34 :     fn delete_prefix_token(uri: &str) -> String {
     502           34 :         let parts = uri.split("/").collect::<Vec<&str>>();
     503           34 :         let claims = endpoint_storage::claims::DeletePrefixClaims {
     504           34 :             tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
     505           34 :             timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
     506           34 :             endpoint_id: parts.get(3).map(ToString::to_string),
     507              :             exp: u64::MAX,
     508              :         };
     509           34 :         let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
     510           34 :         let header = jsonwebtoken::Header::new(jsonwebtoken::Algorithm::EdDSA);
     511           34 :         jsonwebtoken::encode(&header, &claims, &key).unwrap()
     512           34 :     }
     513              : 
     514              :     // Can't use single digit numbers as they won't be validated as TimelineId and EndpointId
     515              :     #[testlog(tokio::test)]
     516              :     async fn delete_prefix() {
     517              :         let tenant_id =
     518              :             TenantId::from_array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to_string();
     519              :         let t2 = TimelineId::from_array([2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
     520              :         let t3 = TimelineId::from_array([3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
     521              :         let t4 = TimelineId::from_array([4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
     522           33 :         let f = |timeline, path| format!("/{tenant_id}/{timeline}{path}");
     523              :         // Why extra slash in string literals? Axum is weird with URIs:
     524              :         // /1/2 and 1/2/ match different routes, thus first yields OK and second NOT_FOUND
     525              :         //  as it matches /tenant/timeline/endpoint, see https://stackoverflow.com/a/75355932
     526              :         // The cost of removing trailing slash is suprisingly hard:
     527              :         // * Add tower dependency with NormalizePath layer
     528              :         // * wrap Router<()> in this layer https://github.com/tokio-rs/axum/discussions/2377
     529              :         // * Rewrite make_service() -> into_make_service()
     530              :         // * Rewrite oneshot() (not available for NormalizePath)
     531              :         // I didn't manage to get it working correctly
     532              :         let chain = vec![
     533              :             // create 1/2/3/4, 1/2/3/5, delete prefix 1/2/3 -> empty
     534              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
     535              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), // we can override file contents
     536              :             (f(t2, "/3/5"), "PUT", "", StatusCode::OK, false),
     537              :             (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
     538              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     539              :             (f(t2, "/3/5"), "GET", "", StatusCode::NOT_FOUND, false),
     540              :             // create 1/2/3/4, 1/2/5/6, delete prefix 1/2/3 -> 1/2/5/6
     541              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
     542              :             (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
     543              :             (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
     544              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     545              :             (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
     546              :             // create 1/2/3/4, 1/2/7/8, delete prefix 1/2 -> empty
     547              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
     548              :             (f(t2, "/7/8"), "PUT", "", StatusCode::OK, false),
     549              :             (f(t2, ""), "DELETE", "", StatusCode::OK, false),
     550              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     551              :             (f(t2, "/7/8"), "GET", "", StatusCode::NOT_FOUND, false),
     552              :             // create 1/2/3/4, 1/2/5/6, 1/3/8/9, delete prefix 1/2/3 -> 1/2/5/6, 1/3/8/9
     553              :             (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
     554              :             (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
     555              :             (f(t3, "/8/9"), "PUT", "", StatusCode::OK, false),
     556              :             (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
     557              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     558              :             (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
     559              :             (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
     560              :             // create 1/4/5/6, delete prefix 1/2 -> 1/3/8/9, 1/4/5/6
     561              :             (f(t4, "/5/6"), "PUT", "", StatusCode::OK, false),
     562              :             (f(t2, ""), "DELETE", "", StatusCode::OK, false),
     563              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     564              :             (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
     565              :             (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
     566              :             (f(t4, "/5/6"), "GET", "", StatusCode::OK, false),
     567              :             // delete prefix 1 -> empty
     568              :             (format!("/{tenant_id}"), "DELETE", "", StatusCode::OK, false),
     569              :             (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
     570              :             (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
     571              :             (f(t3, "/8/9"), "GET", "", StatusCode::NOT_FOUND, false),
     572              :             (f(t4, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
     573              :         ];
     574              :         requests_chain(chain.into_iter(), delete_prefix_token).await;
     575              :     }
     576              : }
        

Generated by: LCOV version 2.1-beta