Line data Source code
1 : use anyhow::anyhow;
2 : use axum::body::{Body, Bytes};
3 : use axum::response::{IntoResponse, Response};
4 : use axum::{Router, http::StatusCode};
5 : use endpoint_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
6 : use remote_storage::TimeoutOrCancel;
7 : use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
8 : use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::{error, info};
11 : use utils::backoff::retry;
12 :
13 35 : pub fn app(state: Arc<Storage>) -> Router<()> {
14 : use axum::routing::{delete as _delete, get as _get};
15 35 : let delete_prefix = _delete(delete_prefix);
16 : // NB: On any changes do not forget to update the OpenAPI spec
17 : // in /endpoint_storage/src/openapi_spec.yml.
18 35 : Router::new()
19 35 : .route(
20 35 : "/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",
21 35 : _get(get).put(set).delete(delete),
22 : )
23 35 : .route(
24 35 : "/{tenant_id}/{timeline_id}/{endpoint_id}",
25 35 : delete_prefix.clone(),
26 : )
27 35 : .route("/{tenant_id}/{timeline_id}", delete_prefix.clone())
28 35 : .route("/{tenant_id}", delete_prefix)
29 35 : .route("/metrics", _get(metrics))
30 35 : .route("/status", _get(async || StatusCode::OK.into_response()))
31 35 : .with_state(state)
32 35 : }
33 :
34 : type Result = anyhow::Result<Response, Response>;
35 : type State = axum::extract::State<Arc<Storage>>;
36 :
37 : const CONTENT_TYPE: &str = "content-type";
38 : const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
39 : const WARN_THRESHOLD: u32 = 3;
40 : const MAX_RETRIES: u32 = 10;
41 :
42 1 : async fn metrics() -> Result {
43 1 : prometheus::TextEncoder::new()
44 1 : .encode_to_string(&prometheus::gather())
45 1 : .map(|s| s.into_response())
46 1 : .map_err(|e| internal_error(e, "/metrics", "collecting metrics"))
47 1 : }
48 :
49 21 : async fn get(S3Path { path }: S3Path, state: State) -> Result {
50 21 : info!(%path, "downloading");
51 21 : let download_err = |err| {
52 15 : if let DownloadError::NotFound = err {
53 15 : info!(%path, %err, "downloading"); // 404 is not an issue of _this_ service
54 15 : return not_found(&path);
55 0 : }
56 0 : internal_error(err, &path, "downloading")
57 15 : };
58 21 : let cancel = state.cancel.clone();
59 21 : let opts = &DownloadOpts::default();
60 :
61 21 : let stream = retry(
62 0 : async || state.storage.download(&path, opts, &cancel).await,
63 : DownloadError::is_permanent,
64 : WARN_THRESHOLD,
65 : MAX_RETRIES,
66 21 : "downloading",
67 21 : &cancel,
68 : )
69 21 : .await
70 21 : .unwrap_or(Err(DownloadError::Cancelled))
71 21 : .map_err(download_err)?
72 : .download_stream;
73 :
74 6 : Response::builder()
75 6 : .status(StatusCode::OK)
76 6 : .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
77 6 : .body(Body::from_stream(stream))
78 6 : .map_err(|e| internal_error(e, path, "reading response"))
79 21 : }
80 :
81 : // Best solution for files is multipart upload, but remote_storage doesn't support it,
82 : // so we can either read Bytes in memory and push at once or forward BodyDataStream to
83 : // remote_storage. The latter may seem more peformant, but BodyDataStream doesn't have a
84 : // guaranteed size() which may produce issues while uploading to s3.
85 : // So, currently we're going with an in-memory copy plus a boundary to prevent uploading
86 : // very large files.
87 13 : async fn set(S3Path { path }: S3Path, state: State, bytes: Bytes) -> Result {
88 13 : info!(%path, "uploading");
89 13 : let request_len = bytes.len();
90 13 : let max_len = state.max_upload_file_limit;
91 13 : if request_len > max_len {
92 0 : return Err(bad_request(
93 0 : anyhow!("File size {request_len} exceeds max {max_len}"),
94 0 : "uploading",
95 0 : ));
96 13 : }
97 :
98 13 : let cancel = state.cancel.clone();
99 13 : let fun = async || {
100 0 : let stream = bytes_to_stream(bytes.clone());
101 0 : state
102 0 : .storage
103 0 : .upload(stream, request_len, &path, None, &cancel)
104 0 : .await
105 13 : };
106 13 : retry(
107 13 : fun,
108 13 : TimeoutOrCancel::caused_by_cancel,
109 13 : WARN_THRESHOLD,
110 13 : MAX_RETRIES,
111 13 : "uploading",
112 13 : &cancel,
113 13 : )
114 13 : .await
115 13 : .unwrap_or(Err(anyhow!("uploading cancelled")))
116 13 : .map_err(|e| internal_error(e, path, "reading response"))?;
117 13 : Ok(ok())
118 13 : }
119 :
120 2 : async fn delete(S3Path { path }: S3Path, state: State) -> Result {
121 2 : info!(%path, "deleting");
122 2 : let cancel = state.cancel.clone();
123 2 : retry(
124 0 : async || state.storage.delete(&path, &cancel).await,
125 : TimeoutOrCancel::caused_by_cancel,
126 : WARN_THRESHOLD,
127 : MAX_RETRIES,
128 2 : "deleting",
129 2 : &cancel,
130 : )
131 2 : .await
132 2 : .unwrap_or(Err(anyhow!("deleting cancelled")))
133 2 : .map_err(|e| internal_error(e, path, "deleting"))?;
134 2 : Ok(ok())
135 2 : }
136 :
137 6 : async fn delete_prefix(PrefixS3Path { path }: PrefixS3Path, state: State) -> Result {
138 6 : info!(%path, "deleting prefix");
139 6 : let cancel = state.cancel.clone();
140 6 : retry(
141 0 : async || state.storage.delete_prefix(&path, &cancel).await,
142 : TimeoutOrCancel::caused_by_cancel,
143 : WARN_THRESHOLD,
144 : MAX_RETRIES,
145 6 : "deleting prefix",
146 6 : &cancel,
147 : )
148 6 : .await
149 6 : .unwrap_or(Err(anyhow!("deleting prefix cancelled")))
150 6 : .map_err(|e| internal_error(e, path, "deleting prefix"))?;
151 6 : Ok(ok())
152 6 : }
153 :
154 35 : pub async fn check_storage_permissions(
155 35 : client: &GenericRemoteStorage,
156 35 : cancel: CancellationToken,
157 35 : ) -> anyhow::Result<()> {
158 35 : info!("storage permissions check");
159 :
160 : // as_nanos() as multiple instances proxying same bucket may be started at once
161 35 : let now = SystemTime::now()
162 35 : .duration_since(UNIX_EPOCH)?
163 35 : .as_nanos()
164 35 : .to_string();
165 :
166 35 : let path = RemotePath::from_string(&format!("write_access_{now}"))?;
167 35 : info!(%path, "uploading");
168 :
169 35 : let body = now.to_string();
170 35 : let stream = bytes_to_stream(Bytes::from(body.clone()));
171 35 : client
172 35 : .upload(stream, body.len(), &path, None, &cancel)
173 35 : .await?;
174 :
175 : use tokio::io::AsyncReadExt;
176 35 : info!(%path, "downloading");
177 35 : let download_opts = DownloadOpts {
178 35 : kind: remote_storage::DownloadKind::Small,
179 35 : ..Default::default()
180 35 : };
181 35 : let mut body_read_buf = Vec::new();
182 35 : let stream = client
183 35 : .download(&path, &download_opts, &cancel)
184 35 : .await?
185 : .download_stream;
186 35 : tokio_util::io::StreamReader::new(stream)
187 35 : .read_to_end(&mut body_read_buf)
188 35 : .await?;
189 35 : let body_read = String::from_utf8(body_read_buf)?;
190 35 : if body != body_read {
191 0 : error!(%body, %body_read, "File contents do not match");
192 0 : anyhow::bail!("Read back file doesn't match original")
193 35 : }
194 :
195 35 : info!(%path, "removing");
196 35 : client.delete(&path, &cancel).await
197 35 : }
198 :
199 48 : fn bytes_to_stream(bytes: Bytes) -> impl futures::Stream<Item = std::io::Result<Bytes>> {
200 48 : futures::stream::once(futures::future::ready(Ok(bytes)))
201 48 : }
202 :
203 : #[cfg(test)]
204 : mod tests {
205 : use super::*;
206 : use axum::{body::Body, extract::Request, response::Response};
207 : use http_body_util::BodyExt;
208 : use itertools::iproduct;
209 : use jsonwebtoken::DecodingKey;
210 : use std::env::var;
211 : use std::sync::Arc;
212 : use std::time::Duration;
213 : use test_log::test as testlog;
214 : use tower::{Service, util::ServiceExt};
215 : use utils::{
216 : auth::JwtAuth,
217 : id::{TenantId, TimelineId},
218 : };
219 :
220 : // see libs/remote_storage/tests/test_real_s3.rs
221 : const REAL_S3_ENV: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
222 : const REAL_S3_BUCKET: &str = "REMOTE_STORAGE_S3_BUCKET";
223 : const REAL_S3_REGION: &str = "REMOTE_STORAGE_S3_REGION";
224 :
225 35 : async fn proxy() -> (Storage, Option<camino_tempfile::Utf8TempDir>) {
226 35 : let cancel = CancellationToken::new();
227 35 : let (dir, storage) = if var(REAL_S3_ENV).is_err() {
228 : // tests execute in parallel and we need a new directory for each of them
229 35 : let dir = camino_tempfile::tempdir().unwrap();
230 35 : let fs =
231 35 : remote_storage::LocalFs::new(dir.path().into(), Duration::from_secs(5)).unwrap();
232 35 : (Some(dir), GenericRemoteStorage::LocalFs(fs))
233 : } else {
234 : // test_real_s3::create_s3_client is hard to reference, reimplementing here
235 0 : let millis = SystemTime::now()
236 0 : .duration_since(UNIX_EPOCH)
237 0 : .unwrap()
238 0 : .as_millis();
239 : use rand::Rng;
240 0 : let random = rand::rng().random::<u32>();
241 :
242 0 : let s3_config = remote_storage::S3Config {
243 0 : bucket_name: var(REAL_S3_BUCKET).unwrap(),
244 0 : bucket_region: var(REAL_S3_REGION).unwrap(),
245 0 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
246 0 : endpoint: None,
247 0 : concurrency_limit: std::num::NonZeroUsize::new(100).unwrap(),
248 0 : max_keys_per_list_response: None,
249 0 : upload_storage_class: None,
250 0 : };
251 0 : let bucket = remote_storage::S3Bucket::new(&s3_config, Duration::from_secs(1))
252 0 : .await
253 0 : .unwrap();
254 0 : (None, GenericRemoteStorage::AwsS3(Arc::new(bucket)))
255 : };
256 :
257 35 : let proxy = Storage {
258 35 : auth: JwtAuth::new(vec![
259 35 : DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(),
260 35 : ]),
261 35 : storage,
262 35 : cancel: cancel.clone(),
263 35 : max_upload_file_limit: usize::MAX,
264 35 : };
265 35 : check_storage_permissions(&proxy.storage, cancel)
266 35 : .await
267 35 : .unwrap();
268 35 : (proxy, dir)
269 35 : }
270 :
271 : // see libs/utils/src/auth.rs
272 : const TEST_PUB_KEY_ED25519: &[u8] = b"
273 : -----BEGIN PUBLIC KEY-----
274 : MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w=
275 : -----END PUBLIC KEY-----
276 : ";
277 :
278 : const TEST_PRIV_KEY_ED25519: &[u8] = br#"
279 : -----BEGIN PRIVATE KEY-----
280 : MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
281 : -----END PRIVATE KEY-----
282 : "#;
283 :
284 30 : async fn request(req: Request<Body>) -> Response<Body> {
285 30 : let (proxy, _) = proxy().await;
286 30 : app(Arc::new(proxy))
287 30 : .into_service()
288 30 : .oneshot(req)
289 30 : .await
290 30 : .unwrap()
291 30 : }
292 :
293 : #[testlog(tokio::test)]
294 : async fn status() {
295 : let res = Request::builder()
296 : .uri("/status")
297 : .body(Body::empty())
298 : .map(request)
299 : .unwrap()
300 : .await;
301 : assert_eq!(res.status(), StatusCode::OK);
302 : }
303 :
304 3 : fn routes() -> impl Iterator<Item = (&'static str, &'static str)> {
305 3 : iproduct!(
306 3 : vec!["/1", "/1/2", "/1/2/3", "/1/2/3/4"],
307 3 : vec!["GET", "PUT", "DELETE"]
308 : )
309 3 : }
310 :
311 : #[testlog(tokio::test)]
312 : async fn no_token() {
313 : for (uri, method) in routes() {
314 : info!(%uri, %method);
315 : let res = Request::builder()
316 : .uri(uri)
317 : .method(method)
318 : .body(Body::empty())
319 : .map(request)
320 : .unwrap()
321 : .await;
322 : assert!(matches!(
323 : res.status(),
324 : StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
325 : ));
326 : }
327 : }
328 :
329 : #[testlog(tokio::test)]
330 : async fn invalid_token() {
331 : for (uri, method) in routes() {
332 : info!(%uri, %method);
333 : let status = Request::builder()
334 : .uri(uri)
335 : .header("Authorization", "Bearer 123")
336 : .method(method)
337 : .body(Body::empty())
338 : .map(request)
339 : .unwrap()
340 : .await;
341 : assert!(matches!(
342 : status.status(),
343 : StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
344 : ));
345 : }
346 : }
347 :
348 : const TENANT_ID: TenantId =
349 : TenantId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
350 : const TIMELINE_ID: TimelineId =
351 : TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
352 : const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
353 11 : fn token() -> String {
354 11 : let claims = endpoint_storage::claims::EndpointStorageClaims {
355 11 : tenant_id: TENANT_ID,
356 11 : timeline_id: TIMELINE_ID,
357 11 : endpoint_id: ENDPOINT_ID.into(),
358 11 : exp: u64::MAX,
359 11 : };
360 11 : let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
361 11 : let header = jsonwebtoken::Header::new(jsonwebtoken::Algorithm::EdDSA);
362 11 : jsonwebtoken::encode(&header, &claims, &key).unwrap()
363 11 : }
364 :
365 : #[testlog(tokio::test)]
366 : async fn unauthorized() {
367 : let (proxy, _) = proxy().await;
368 : let mut app = app(Arc::new(proxy)).into_service();
369 : let token = token();
370 : let args = itertools::iproduct!(
371 : vec![TENANT_ID.to_string(), TenantId::generate().to_string()],
372 : vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
373 : vec![ENDPOINT_ID, "ep-ololo"]
374 : )
375 : // first one is fully valid path, second path is valid for GET as
376 : // read paths may have different endpoint if tenant and timeline matches
377 : // (needed for prewarming RO->RW replica)
378 : .skip(2);
379 :
380 : for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
381 : info!(%uri, %method, %tenant, %timeline, %endpoint);
382 : let request = Request::builder()
383 : .uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
384 : .method(method)
385 : .header("Authorization", format!("Bearer {token}"))
386 : .body(Body::empty())
387 : .unwrap();
388 : let status = ServiceExt::ready(&mut app)
389 : .await
390 : .unwrap()
391 : .call(request)
392 : .await
393 : .unwrap()
394 : .status();
395 : assert_eq!(status, StatusCode::UNAUTHORIZED);
396 : }
397 : }
398 :
399 : #[testlog(tokio::test)]
400 : async fn method_not_allowed() {
401 : let token = token();
402 : let iter = iproduct!(vec!["", "/.."], vec!["GET", "PUT"]);
403 : for (key, method) in iter {
404 : let status = Request::builder()
405 : .uri(format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}{key}"))
406 : .method(method)
407 : .header("Authorization", format!("Bearer {token}"))
408 : .body(Body::empty())
409 : .map(request)
410 : .unwrap()
411 : .await
412 : .status();
413 : assert!(matches!(
414 : status,
415 : StatusCode::BAD_REQUEST | StatusCode::METHOD_NOT_ALLOWED
416 : ));
417 : }
418 : }
419 :
420 4 : async fn requests_chain(
421 4 : chain: impl Iterator<Item = (String, &str, &'static str, StatusCode, bool)>,
422 4 : token: impl Fn(&str) -> String,
423 4 : ) {
424 4 : let (proxy, _) = proxy().await;
425 4 : let mut app = app(Arc::new(proxy)).into_service();
426 47 : for (uri, method, body, expected_status, compare_body) in chain {
427 43 : info!(%uri, %method, %body, %expected_status);
428 43 : let bearer = format!("Bearer {}", token(&uri));
429 43 : let request = Request::builder()
430 43 : .uri(uri)
431 43 : .method(method)
432 43 : .header("Authorization", &bearer)
433 43 : .body(Body::from(body))
434 43 : .unwrap();
435 43 : let response = ServiceExt::ready(&mut app)
436 43 : .await
437 43 : .unwrap()
438 43 : .call(request)
439 43 : .await
440 43 : .unwrap();
441 43 : assert_eq!(response.status(), expected_status);
442 43 : if !compare_body {
443 42 : continue;
444 1 : }
445 1 : let read_body = response.into_body().collect().await.unwrap().to_bytes();
446 1 : assert_eq!(body, read_body);
447 : }
448 4 : }
449 :
450 : #[testlog(tokio::test)]
451 : async fn metrics() {
452 : let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
453 : let req = vec![
454 : (uri.clone(), "PUT", "body", StatusCode::OK, false),
455 : (uri.clone(), "DELETE", "", StatusCode::OK, false),
456 : ];
457 2 : requests_chain(req.into_iter(), |_| token()).await;
458 :
459 : let res = Request::builder()
460 : .uri("/metrics")
461 : .body(Body::empty())
462 : .map(request)
463 : .unwrap()
464 : .await;
465 : assert_eq!(res.status(), StatusCode::OK);
466 : let body = res.into_body().collect().await.unwrap().to_bytes();
467 : let body = String::from_utf8_lossy(&body);
468 : tracing::debug!(%body);
469 : // Storage metrics are not gathered for LocalFs
470 : if var(REAL_S3_ENV).is_ok() {
471 : assert!(body.contains("remote_storage_s3_deleted_objects_total"));
472 : }
473 :
474 : #[cfg(target_os = "linux")]
475 : assert!(body.contains("process_threads"));
476 : }
477 :
478 : #[testlog(tokio::test)]
479 : async fn insert_retrieve_remove() {
480 : let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
481 : let chain = vec![
482 : (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
483 : (uri.clone(), "PUT", "пыщьпыщь", StatusCode::OK, false),
484 : (uri.clone(), "GET", "пыщьпыщь", StatusCode::OK, true),
485 : (uri.clone(), "DELETE", "", StatusCode::OK, false),
486 : (uri, "GET", "", StatusCode::NOT_FOUND, false),
487 : ];
488 5 : requests_chain(chain.into_iter(), |_| token()).await;
489 : }
490 :
491 : #[testlog(tokio::test)]
492 : async fn read_other_endpoint_data() {
493 : let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/other_endpoint/key");
494 : let chain = vec![
495 : (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
496 : (uri.clone(), "PUT", "", StatusCode::UNAUTHORIZED, false),
497 : ];
498 2 : requests_chain(chain.into_iter(), |_| token()).await;
499 : }
500 :
501 34 : fn delete_prefix_token(uri: &str) -> String {
502 34 : let parts = uri.split("/").collect::<Vec<&str>>();
503 34 : let claims = endpoint_storage::claims::DeletePrefixClaims {
504 34 : tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
505 34 : timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
506 34 : endpoint_id: parts.get(3).map(ToString::to_string),
507 : exp: u64::MAX,
508 : };
509 34 : let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
510 34 : let header = jsonwebtoken::Header::new(jsonwebtoken::Algorithm::EdDSA);
511 34 : jsonwebtoken::encode(&header, &claims, &key).unwrap()
512 34 : }
513 :
514 : // Can't use single digit numbers as they won't be validated as TimelineId and EndpointId
515 : #[testlog(tokio::test)]
516 : async fn delete_prefix() {
517 : let tenant_id =
518 : TenantId::from_array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to_string();
519 : let t2 = TimelineId::from_array([2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
520 : let t3 = TimelineId::from_array([3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
521 : let t4 = TimelineId::from_array([4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
522 33 : let f = |timeline, path| format!("/{tenant_id}/{timeline}{path}");
523 : // Why extra slash in string literals? Axum is weird with URIs:
524 : // /1/2 and 1/2/ match different routes, thus first yields OK and second NOT_FOUND
525 : // as it matches /tenant/timeline/endpoint, see https://stackoverflow.com/a/75355932
526 : // The cost of removing trailing slash is suprisingly hard:
527 : // * Add tower dependency with NormalizePath layer
528 : // * wrap Router<()> in this layer https://github.com/tokio-rs/axum/discussions/2377
529 : // * Rewrite make_service() -> into_make_service()
530 : // * Rewrite oneshot() (not available for NormalizePath)
531 : // I didn't manage to get it working correctly
532 : let chain = vec![
533 : // create 1/2/3/4, 1/2/3/5, delete prefix 1/2/3 -> empty
534 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
535 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), // we can override file contents
536 : (f(t2, "/3/5"), "PUT", "", StatusCode::OK, false),
537 : (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
538 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
539 : (f(t2, "/3/5"), "GET", "", StatusCode::NOT_FOUND, false),
540 : // create 1/2/3/4, 1/2/5/6, delete prefix 1/2/3 -> 1/2/5/6
541 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
542 : (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
543 : (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
544 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
545 : (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
546 : // create 1/2/3/4, 1/2/7/8, delete prefix 1/2 -> empty
547 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
548 : (f(t2, "/7/8"), "PUT", "", StatusCode::OK, false),
549 : (f(t2, ""), "DELETE", "", StatusCode::OK, false),
550 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
551 : (f(t2, "/7/8"), "GET", "", StatusCode::NOT_FOUND, false),
552 : // create 1/2/3/4, 1/2/5/6, 1/3/8/9, delete prefix 1/2/3 -> 1/2/5/6, 1/3/8/9
553 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
554 : (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
555 : (f(t3, "/8/9"), "PUT", "", StatusCode::OK, false),
556 : (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
557 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
558 : (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
559 : (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
560 : // create 1/4/5/6, delete prefix 1/2 -> 1/3/8/9, 1/4/5/6
561 : (f(t4, "/5/6"), "PUT", "", StatusCode::OK, false),
562 : (f(t2, ""), "DELETE", "", StatusCode::OK, false),
563 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
564 : (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
565 : (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
566 : (f(t4, "/5/6"), "GET", "", StatusCode::OK, false),
567 : // delete prefix 1 -> empty
568 : (format!("/{tenant_id}"), "DELETE", "", StatusCode::OK, false),
569 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
570 : (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
571 : (f(t3, "/8/9"), "GET", "", StatusCode::NOT_FOUND, false),
572 : (f(t4, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
573 : ];
574 : requests_chain(chain.into_iter(), delete_prefix_token).await;
575 : }
576 : }
|