Line data Source code
1 : use anyhow::anyhow;
2 : use axum::body::{Body, Bytes};
3 : use axum::response::{IntoResponse, Response};
4 : use axum::{Router, http::StatusCode};
5 : use endpoint_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
6 : use remote_storage::TimeoutOrCancel;
7 : use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
8 : use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::{error, info};
11 : use utils::backoff::retry;
12 :
13 35 : pub fn app(state: Arc<Storage>) -> Router<()> {
14 : use axum::routing::{delete as _delete, get as _get};
15 35 : let delete_prefix = _delete(delete_prefix);
16 35 : Router::new()
17 35 : .route(
18 35 : "/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",
19 35 : _get(get).put(set).delete(delete),
20 35 : )
21 35 : .route(
22 35 : "/{tenant_id}/{timeline_id}/{endpoint_id}",
23 35 : delete_prefix.clone(),
24 35 : )
25 35 : .route("/{tenant_id}/{timeline_id}", delete_prefix.clone())
26 35 : .route("/{tenant_id}", delete_prefix)
27 35 : .route("/metrics", _get(metrics))
28 35 : .route("/status", _get(async || StatusCode::OK.into_response()))
29 35 : .with_state(state)
30 35 : }
31 :
32 : type Result = anyhow::Result<Response, Response>;
33 : type State = axum::extract::State<Arc<Storage>>;
34 :
35 : const CONTENT_TYPE: &str = "content-type";
36 : const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
37 : const WARN_THRESHOLD: u32 = 3;
38 : const MAX_RETRIES: u32 = 10;
39 :
40 1 : async fn metrics() -> Result {
41 1 : prometheus::TextEncoder::new()
42 1 : .encode_to_string(&prometheus::gather())
43 1 : .map(|s| s.into_response())
44 1 : .map_err(|e| internal_error(e, "/metrics", "collecting metrics"))
45 1 : }
46 :
47 21 : async fn get(S3Path { path }: S3Path, state: State) -> Result {
48 21 : info!(%path, "downloading");
49 21 : let download_err = |err| {
50 15 : if let DownloadError::NotFound = err {
51 15 : info!(%path, %err, "downloading"); // 404 is not an issue of _this_ service
52 15 : return not_found(&path);
53 0 : }
54 0 : internal_error(err, &path, "downloading")
55 15 : };
56 21 : let cancel = state.cancel.clone();
57 21 : let opts = &DownloadOpts::default();
58 :
59 21 : let stream = retry(
60 21 : async || state.storage.download(&path, opts, &cancel).await,
61 21 : DownloadError::is_permanent,
62 21 : WARN_THRESHOLD,
63 21 : MAX_RETRIES,
64 21 : "downloading",
65 21 : &cancel,
66 21 : )
67 21 : .await
68 21 : .unwrap_or(Err(DownloadError::Cancelled))
69 21 : .map_err(download_err)?
70 : .download_stream;
71 :
72 6 : Response::builder()
73 6 : .status(StatusCode::OK)
74 6 : .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
75 6 : .body(Body::from_stream(stream))
76 6 : .map_err(|e| internal_error(e, path, "reading response"))
77 21 : }
78 :
79 : // Best solution for files is multipart upload, but remote_storage doesn't support it,
80 : // so we can either read Bytes in memory and push at once or forward BodyDataStream to
81 : // remote_storage. The latter may seem more peformant, but BodyDataStream doesn't have a
82 : // guaranteed size() which may produce issues while uploading to s3.
83 : // So, currently we're going with an in-memory copy plus a boundary to prevent uploading
84 : // very large files.
85 13 : async fn set(S3Path { path }: S3Path, state: State, bytes: Bytes) -> Result {
86 13 : info!(%path, "uploading");
87 13 : let request_len = bytes.len();
88 13 : let max_len = state.max_upload_file_limit;
89 13 : if request_len > max_len {
90 0 : return Err(bad_request(
91 0 : anyhow!("File size {request_len} exceeds max {max_len}"),
92 0 : "uploading",
93 0 : ));
94 13 : }
95 13 :
96 13 : let cancel = state.cancel.clone();
97 13 : let fun = async || {
98 0 : let stream = bytes_to_stream(bytes.clone());
99 0 : state
100 0 : .storage
101 0 : .upload(stream, request_len, &path, None, &cancel)
102 0 : .await
103 13 : };
104 13 : retry(
105 13 : fun,
106 13 : TimeoutOrCancel::caused_by_cancel,
107 13 : WARN_THRESHOLD,
108 13 : MAX_RETRIES,
109 13 : "uploading",
110 13 : &cancel,
111 13 : )
112 13 : .await
113 13 : .unwrap_or(Err(anyhow!("uploading cancelled")))
114 13 : .map_err(|e| internal_error(e, path, "reading response"))?;
115 13 : Ok(ok())
116 13 : }
117 :
118 2 : async fn delete(S3Path { path }: S3Path, state: State) -> Result {
119 2 : info!(%path, "deleting");
120 2 : let cancel = state.cancel.clone();
121 2 : retry(
122 2 : async || state.storage.delete(&path, &cancel).await,
123 2 : TimeoutOrCancel::caused_by_cancel,
124 2 : WARN_THRESHOLD,
125 2 : MAX_RETRIES,
126 2 : "deleting",
127 2 : &cancel,
128 2 : )
129 2 : .await
130 2 : .unwrap_or(Err(anyhow!("deleting cancelled")))
131 2 : .map_err(|e| internal_error(e, path, "deleting"))?;
132 2 : Ok(ok())
133 2 : }
134 :
135 6 : async fn delete_prefix(PrefixS3Path { path }: PrefixS3Path, state: State) -> Result {
136 6 : info!(%path, "deleting prefix");
137 6 : let cancel = state.cancel.clone();
138 6 : retry(
139 6 : async || state.storage.delete_prefix(&path, &cancel).await,
140 6 : TimeoutOrCancel::caused_by_cancel,
141 6 : WARN_THRESHOLD,
142 6 : MAX_RETRIES,
143 6 : "deleting prefix",
144 6 : &cancel,
145 6 : )
146 6 : .await
147 6 : .unwrap_or(Err(anyhow!("deleting prefix cancelled")))
148 6 : .map_err(|e| internal_error(e, path, "deleting prefix"))?;
149 6 : Ok(ok())
150 6 : }
151 :
152 35 : pub async fn check_storage_permissions(
153 35 : client: &GenericRemoteStorage,
154 35 : cancel: CancellationToken,
155 35 : ) -> anyhow::Result<()> {
156 35 : info!("storage permissions check");
157 :
158 : // as_nanos() as multiple instances proxying same bucket may be started at once
159 35 : let now = SystemTime::now()
160 35 : .duration_since(UNIX_EPOCH)?
161 35 : .as_nanos()
162 35 : .to_string();
163 :
164 35 : let path = RemotePath::from_string(&format!("write_access_{now}"))?;
165 35 : info!(%path, "uploading");
166 :
167 35 : let body = now.to_string();
168 35 : let stream = bytes_to_stream(Bytes::from(body.clone()));
169 35 : client
170 35 : .upload(stream, body.len(), &path, None, &cancel)
171 35 : .await?;
172 :
173 : use tokio::io::AsyncReadExt;
174 35 : info!(%path, "downloading");
175 35 : let download_opts = DownloadOpts {
176 35 : kind: remote_storage::DownloadKind::Small,
177 35 : ..Default::default()
178 35 : };
179 35 : let mut body_read_buf = Vec::new();
180 35 : let stream = client
181 35 : .download(&path, &download_opts, &cancel)
182 35 : .await?
183 : .download_stream;
184 35 : tokio_util::io::StreamReader::new(stream)
185 35 : .read_to_end(&mut body_read_buf)
186 35 : .await?;
187 35 : let body_read = String::from_utf8(body_read_buf)?;
188 35 : if body != body_read {
189 0 : error!(%body, %body_read, "File contents do not match");
190 0 : anyhow::bail!("Read back file doesn't match original")
191 35 : }
192 35 :
193 35 : info!(%path, "removing");
194 35 : client.delete(&path, &cancel).await
195 35 : }
196 :
197 48 : fn bytes_to_stream(bytes: Bytes) -> impl futures::Stream<Item = std::io::Result<Bytes>> {
198 48 : futures::stream::once(futures::future::ready(Ok(bytes)))
199 48 : }
200 :
201 : #[cfg(test)]
202 : mod tests {
203 : use super::*;
204 : use axum::{body::Body, extract::Request, response::Response};
205 : use http_body_util::BodyExt;
206 : use itertools::iproduct;
207 : use std::env::var;
208 : use std::sync::Arc;
209 : use std::time::Duration;
210 : use test_log::test as testlog;
211 : use tower::{Service, util::ServiceExt};
212 : use utils::id::{TenantId, TimelineId};
213 :
214 : // see libs/remote_storage/tests/test_real_s3.rs
215 : const REAL_S3_ENV: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
216 : const REAL_S3_BUCKET: &str = "REMOTE_STORAGE_S3_BUCKET";
217 : const REAL_S3_REGION: &str = "REMOTE_STORAGE_S3_REGION";
218 :
219 35 : async fn proxy() -> (Storage, Option<camino_tempfile::Utf8TempDir>) {
220 35 : let cancel = CancellationToken::new();
221 35 : let (dir, storage) = if var(REAL_S3_ENV).is_err() {
222 : // tests execute in parallel and we need a new directory for each of them
223 35 : let dir = camino_tempfile::tempdir().unwrap();
224 35 : let fs =
225 35 : remote_storage::LocalFs::new(dir.path().into(), Duration::from_secs(5)).unwrap();
226 35 : (Some(dir), GenericRemoteStorage::LocalFs(fs))
227 : } else {
228 : // test_real_s3::create_s3_client is hard to reference, reimplementing here
229 0 : let millis = SystemTime::now()
230 0 : .duration_since(UNIX_EPOCH)
231 0 : .unwrap()
232 0 : .as_millis();
233 : use rand::Rng;
234 0 : let random = rand::thread_rng().r#gen::<u32>();
235 0 :
236 0 : let s3_config = remote_storage::S3Config {
237 0 : bucket_name: var(REAL_S3_BUCKET).unwrap(),
238 0 : bucket_region: var(REAL_S3_REGION).unwrap(),
239 0 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
240 0 : endpoint: None,
241 0 : concurrency_limit: std::num::NonZeroUsize::new(100).unwrap(),
242 0 : max_keys_per_list_response: None,
243 0 : upload_storage_class: None,
244 0 : };
245 0 : let bucket = remote_storage::S3Bucket::new(&s3_config, Duration::from_secs(1))
246 0 : .await
247 0 : .unwrap();
248 0 : (None, GenericRemoteStorage::AwsS3(Arc::new(bucket)))
249 : };
250 :
251 35 : let proxy = Storage {
252 35 : auth: endpoint_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
253 35 : storage,
254 35 : cancel: cancel.clone(),
255 35 : max_upload_file_limit: usize::MAX,
256 35 : };
257 35 : check_storage_permissions(&proxy.storage, cancel)
258 35 : .await
259 35 : .unwrap();
260 35 : (proxy, dir)
261 35 : }
262 :
263 : // see libs/utils/src/auth.rs
264 : const TEST_PUB_KEY_ED25519: &[u8] = b"
265 : -----BEGIN PUBLIC KEY-----
266 : MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w=
267 : -----END PUBLIC KEY-----
268 : ";
269 :
270 : const TEST_PRIV_KEY_ED25519: &[u8] = br#"
271 : -----BEGIN PRIVATE KEY-----
272 : MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
273 : -----END PRIVATE KEY-----
274 : "#;
275 :
276 30 : async fn request(req: Request<Body>) -> Response<Body> {
277 30 : let (proxy, _) = proxy().await;
278 30 : app(Arc::new(proxy))
279 30 : .into_service()
280 30 : .oneshot(req)
281 30 : .await
282 30 : .unwrap()
283 30 : }
284 :
285 2 : #[testlog(tokio::test)]
286 : async fn status() {
287 : let res = Request::builder()
288 : .uri("/status")
289 : .body(Body::empty())
290 : .map(request)
291 : .unwrap()
292 : .await;
293 : assert_eq!(res.status(), StatusCode::OK);
294 : }
295 :
296 3 : fn routes() -> impl Iterator<Item = (&'static str, &'static str)> {
297 3 : iproduct!(
298 3 : vec!["/1", "/1/2", "/1/2/3", "/1/2/3/4"],
299 3 : vec!["GET", "PUT", "DELETE"]
300 3 : )
301 3 : }
302 :
303 2 : #[testlog(tokio::test)]
304 : async fn no_token() {
305 : for (uri, method) in routes() {
306 : info!(%uri, %method);
307 : let res = Request::builder()
308 : .uri(uri)
309 : .method(method)
310 : .body(Body::empty())
311 : .map(request)
312 : .unwrap()
313 : .await;
314 : assert!(matches!(
315 : res.status(),
316 : StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
317 : ));
318 : }
319 : }
320 :
321 2 : #[testlog(tokio::test)]
322 : async fn invalid_token() {
323 : for (uri, method) in routes() {
324 : info!(%uri, %method);
325 : let status = Request::builder()
326 : .uri(uri)
327 : .header("Authorization", "Bearer 123")
328 : .method(method)
329 : .body(Body::empty())
330 : .map(request)
331 : .unwrap()
332 : .await;
333 : assert!(matches!(
334 : status.status(),
335 : StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
336 : ));
337 : }
338 : }
339 :
340 : const TENANT_ID: TenantId =
341 : TenantId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
342 : const TIMELINE_ID: TimelineId =
343 : TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
344 : const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
345 11 : fn token() -> String {
346 11 : let claims = endpoint_storage::Claims {
347 11 : tenant_id: TENANT_ID,
348 11 : timeline_id: TIMELINE_ID,
349 11 : endpoint_id: ENDPOINT_ID.into(),
350 11 : exp: u64::MAX,
351 11 : };
352 11 : let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
353 11 : let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
354 11 : jsonwebtoken::encode(&header, &claims, &key).unwrap()
355 11 : }
356 :
357 2 : #[testlog(tokio::test)]
358 : async fn unauthorized() {
359 : let (proxy, _) = proxy().await;
360 : let mut app = app(Arc::new(proxy)).into_service();
361 : let token = token();
362 : let args = itertools::iproduct!(
363 : vec![TENANT_ID.to_string(), TenantId::generate().to_string()],
364 : vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
365 : vec![ENDPOINT_ID, "ep-ololo"]
366 : )
367 : // first one is fully valid path, second path is valid for GET as
368 : // read paths may have different endpoint if tenant and timeline matches
369 : // (needed for prewarming RO->RW replica)
370 : .skip(2);
371 :
372 : for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
373 : info!(%uri, %method, %tenant, %timeline, %endpoint);
374 : let request = Request::builder()
375 : .uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
376 : .method(method)
377 : .header("Authorization", format!("Bearer {}", token))
378 : .body(Body::empty())
379 : .unwrap();
380 : let status = ServiceExt::ready(&mut app)
381 : .await
382 : .unwrap()
383 : .call(request)
384 : .await
385 : .unwrap()
386 : .status();
387 : assert_eq!(status, StatusCode::UNAUTHORIZED);
388 : }
389 : }
390 :
391 2 : #[testlog(tokio::test)]
392 : async fn method_not_allowed() {
393 : let token = token();
394 : let iter = iproduct!(vec!["", "/.."], vec!["GET", "PUT"]);
395 : for (key, method) in iter {
396 : let status = Request::builder()
397 : .uri(format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}{key}"))
398 : .method(method)
399 : .header("Authorization", format!("Bearer {token}"))
400 : .body(Body::empty())
401 : .map(request)
402 : .unwrap()
403 : .await
404 : .status();
405 : assert!(matches!(
406 : status,
407 : StatusCode::BAD_REQUEST | StatusCode::METHOD_NOT_ALLOWED
408 : ));
409 : }
410 : }
411 :
412 4 : async fn requests_chain(
413 4 : chain: impl Iterator<Item = (String, &str, &'static str, StatusCode, bool)>,
414 4 : token: impl Fn(&str) -> String,
415 4 : ) {
416 4 : let (proxy, _) = proxy().await;
417 4 : let mut app = app(Arc::new(proxy)).into_service();
418 47 : for (uri, method, body, expected_status, compare_body) in chain {
419 43 : info!(%uri, %method, %body, %expected_status);
420 43 : let bearer = format!("Bearer {}", token(&uri));
421 43 : let request = Request::builder()
422 43 : .uri(uri)
423 43 : .method(method)
424 43 : .header("Authorization", &bearer)
425 43 : .body(Body::from(body))
426 43 : .unwrap();
427 43 : let response = ServiceExt::ready(&mut app)
428 43 : .await
429 43 : .unwrap()
430 43 : .call(request)
431 43 : .await
432 43 : .unwrap();
433 43 : assert_eq!(response.status(), expected_status);
434 43 : if !compare_body {
435 42 : continue;
436 1 : }
437 1 : let read_body = response.into_body().collect().await.unwrap().to_bytes();
438 1 : assert_eq!(body, read_body);
439 : }
440 4 : }
441 :
442 2 : #[testlog(tokio::test)]
443 : async fn metrics() {
444 : let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
445 : let req = vec![
446 : (uri.clone(), "PUT", "body", StatusCode::OK, false),
447 : (uri.clone(), "DELETE", "", StatusCode::OK, false),
448 : ];
449 2 : requests_chain(req.into_iter(), |_| token()).await;
450 :
451 : let res = Request::builder()
452 : .uri("/metrics")
453 : .body(Body::empty())
454 : .map(request)
455 : .unwrap()
456 : .await;
457 : assert_eq!(res.status(), StatusCode::OK);
458 : let body = res.into_body().collect().await.unwrap().to_bytes();
459 : let body = String::from_utf8_lossy(&body);
460 : tracing::debug!(%body);
461 : // Storage metrics are not gathered for LocalFs
462 : if var(REAL_S3_ENV).is_ok() {
463 : assert!(body.contains("remote_storage_s3_deleted_objects_total"));
464 : }
465 : assert!(body.contains("process_threads"));
466 : }
467 :
468 2 : #[testlog(tokio::test)]
469 : async fn insert_retrieve_remove() {
470 : let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
471 : let chain = vec![
472 : (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
473 : (uri.clone(), "PUT", "пыщьпыщь", StatusCode::OK, false),
474 : (uri.clone(), "GET", "пыщьпыщь", StatusCode::OK, true),
475 : (uri.clone(), "DELETE", "", StatusCode::OK, false),
476 : (uri, "GET", "", StatusCode::NOT_FOUND, false),
477 : ];
478 5 : requests_chain(chain.into_iter(), |_| token()).await;
479 : }
480 :
481 2 : #[testlog(tokio::test)]
482 : async fn read_other_endpoint_data() {
483 : let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/other_endpoint/key");
484 : let chain = vec![
485 : (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
486 : (uri.clone(), "PUT", "", StatusCode::UNAUTHORIZED, false),
487 : ];
488 2 : requests_chain(chain.into_iter(), |_| token()).await;
489 : }
490 :
491 34 : fn delete_prefix_token(uri: &str) -> String {
492 : use serde::Serialize;
493 34 : let parts = uri.split("/").collect::<Vec<&str>>();
494 : #[derive(Serialize)]
495 : struct PrefixClaims {
496 : tenant_id: TenantId,
497 : timeline_id: Option<TimelineId>,
498 : endpoint_id: Option<endpoint_storage::EndpointId>,
499 : exp: u64,
500 : }
501 34 : let claims = PrefixClaims {
502 34 : tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
503 34 : timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
504 34 : endpoint_id: parts.get(3).map(ToString::to_string),
505 34 : exp: u64::MAX,
506 34 : };
507 34 : let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
508 34 : let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
509 34 : jsonwebtoken::encode(&header, &claims, &key).unwrap()
510 34 : }
511 :
512 : // Can't use single digit numbers as they won't be validated as TimelineId and EndpointId
513 2 : #[testlog(tokio::test)]
514 : async fn delete_prefix() {
515 : let tenant_id =
516 : TenantId::from_array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to_string();
517 : let t2 = TimelineId::from_array([2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
518 : let t3 = TimelineId::from_array([3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
519 : let t4 = TimelineId::from_array([4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
520 33 : let f = |timeline, path| format!("/{tenant_id}/{timeline}{path}");
521 : // Why extra slash in string literals? Axum is weird with URIs:
522 : // /1/2 and 1/2/ match different routes, thus first yields OK and second NOT_FOUND
523 : // as it matches /tenant/timeline/endpoint, see https://stackoverflow.com/a/75355932
524 : // The cost of removing trailing slash is suprisingly hard:
525 : // * Add tower dependency with NormalizePath layer
526 : // * wrap Router<()> in this layer https://github.com/tokio-rs/axum/discussions/2377
527 : // * Rewrite make_service() -> into_make_service()
528 : // * Rewrite oneshot() (not available for NormalizePath)
529 : // I didn't manage to get it working correctly
530 : let chain = vec![
531 : // create 1/2/3/4, 1/2/3/5, delete prefix 1/2/3 -> empty
532 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
533 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), // we can override file contents
534 : (f(t2, "/3/5"), "PUT", "", StatusCode::OK, false),
535 : (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
536 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
537 : (f(t2, "/3/5"), "GET", "", StatusCode::NOT_FOUND, false),
538 : // create 1/2/3/4, 1/2/5/6, delete prefix 1/2/3 -> 1/2/5/6
539 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
540 : (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
541 : (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
542 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
543 : (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
544 : // create 1/2/3/4, 1/2/7/8, delete prefix 1/2 -> empty
545 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
546 : (f(t2, "/7/8"), "PUT", "", StatusCode::OK, false),
547 : (f(t2, ""), "DELETE", "", StatusCode::OK, false),
548 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
549 : (f(t2, "/7/8"), "GET", "", StatusCode::NOT_FOUND, false),
550 : // create 1/2/3/4, 1/2/5/6, 1/3/8/9, delete prefix 1/2/3 -> 1/2/5/6, 1/3/8/9
551 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
552 : (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
553 : (f(t3, "/8/9"), "PUT", "", StatusCode::OK, false),
554 : (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
555 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
556 : (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
557 : (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
558 : // create 1/4/5/6, delete prefix 1/2 -> 1/3/8/9, 1/4/5/6
559 : (f(t4, "/5/6"), "PUT", "", StatusCode::OK, false),
560 : (f(t2, ""), "DELETE", "", StatusCode::OK, false),
561 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
562 : (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
563 : (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
564 : (f(t4, "/5/6"), "GET", "", StatusCode::OK, false),
565 : // delete prefix 1 -> empty
566 : (format!("/{tenant_id}"), "DELETE", "", StatusCode::OK, false),
567 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
568 : (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
569 : (f(t3, "/8/9"), "GET", "", StatusCode::NOT_FOUND, false),
570 : (f(t4, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
571 : ];
572 : requests_chain(chain.into_iter(), delete_prefix_token).await;
573 : }
574 : }
|