Line data Source code
1 : use anyhow::anyhow;
2 : use axum::body::{Body, Bytes};
3 : use axum::response::{IntoResponse, Response};
4 : use axum::{Router, http::StatusCode};
5 : use endpoint_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
6 : use remote_storage::TimeoutOrCancel;
7 : use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
8 : use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::{error, info};
11 : use utils::backoff::retry;
12 :
13 35 : pub fn app(state: Arc<Storage>) -> Router<()> {
14 : use axum::routing::{delete as _delete, get as _get};
15 35 : let delete_prefix = _delete(delete_prefix);
16 : // NB: On any changes do not forget to update the OpenAPI spec
17 : // in /endpoint_storage/src/openapi_spec.yml.
18 35 : Router::new()
19 35 : .route(
20 35 : "/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",
21 35 : _get(get).put(set).delete(delete),
22 : )
23 35 : .route(
24 35 : "/{tenant_id}/{timeline_id}/{endpoint_id}",
25 35 : delete_prefix.clone(),
26 : )
27 35 : .route("/{tenant_id}/{timeline_id}", delete_prefix.clone())
28 35 : .route("/{tenant_id}", delete_prefix)
29 35 : .route("/metrics", _get(metrics))
30 35 : .route("/status", _get(async || StatusCode::OK.into_response()))
31 35 : .with_state(state)
32 35 : }
33 :
34 : type Result = anyhow::Result<Response, Response>;
35 : type State = axum::extract::State<Arc<Storage>>;
36 :
37 : const CONTENT_TYPE: &str = "content-type";
38 : const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
39 : const WARN_THRESHOLD: u32 = 3;
40 : const MAX_RETRIES: u32 = 10;
41 :
42 1 : async fn metrics() -> Result {
43 1 : prometheus::TextEncoder::new()
44 1 : .encode_to_string(&prometheus::gather())
45 1 : .map(|s| s.into_response())
46 1 : .map_err(|e| internal_error(e, "/metrics", "collecting metrics"))
47 1 : }
48 :
49 21 : async fn get(S3Path { path }: S3Path, state: State) -> Result {
50 21 : info!(%path, "downloading");
51 21 : let download_err = |err| {
52 15 : if let DownloadError::NotFound = err {
53 15 : info!(%path, %err, "downloading"); // 404 is not an issue of _this_ service
54 15 : return not_found(&path);
55 0 : }
56 0 : internal_error(err, &path, "downloading")
57 15 : };
58 21 : let cancel = state.cancel.clone();
59 21 : let opts = &DownloadOpts::default();
60 :
61 21 : let stream = retry(
62 0 : async || state.storage.download(&path, opts, &cancel).await,
63 : DownloadError::is_permanent,
64 : WARN_THRESHOLD,
65 : MAX_RETRIES,
66 21 : "downloading",
67 21 : &cancel,
68 : )
69 21 : .await
70 21 : .unwrap_or(Err(DownloadError::Cancelled))
71 21 : .map_err(download_err)?
72 : .download_stream;
73 :
74 6 : Response::builder()
75 6 : .status(StatusCode::OK)
76 6 : .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
77 6 : .body(Body::from_stream(stream))
78 6 : .map_err(|e| internal_error(e, path, "reading response"))
79 21 : }
80 :
81 : // Best solution for files is multipart upload, but remote_storage doesn't support it,
82 : // so we can either read Bytes in memory and push at once or forward BodyDataStream to
83 : // remote_storage. The latter may seem more peformant, but BodyDataStream doesn't have a
84 : // guaranteed size() which may produce issues while uploading to s3.
85 : // So, currently we're going with an in-memory copy plus a boundary to prevent uploading
86 : // very large files.
87 13 : async fn set(S3Path { path }: S3Path, state: State, bytes: Bytes) -> Result {
88 13 : info!(%path, "uploading");
89 13 : let request_len = bytes.len();
90 13 : let max_len = state.max_upload_file_limit;
91 13 : if request_len > max_len {
92 0 : return Err(bad_request(
93 0 : anyhow!("File size {request_len} exceeds max {max_len}"),
94 0 : "uploading",
95 0 : ));
96 13 : }
97 :
98 13 : let cancel = state.cancel.clone();
99 13 : let fun = async || {
100 0 : let stream = bytes_to_stream(bytes.clone());
101 0 : state
102 0 : .storage
103 0 : .upload(stream, request_len, &path, None, &cancel)
104 0 : .await
105 13 : };
106 13 : retry(
107 13 : fun,
108 13 : TimeoutOrCancel::caused_by_cancel,
109 13 : WARN_THRESHOLD,
110 13 : MAX_RETRIES,
111 13 : "uploading",
112 13 : &cancel,
113 13 : )
114 13 : .await
115 13 : .unwrap_or(Err(anyhow!("uploading cancelled")))
116 13 : .map_err(|e| internal_error(e, path, "reading response"))?;
117 13 : Ok(ok())
118 13 : }
119 :
120 2 : async fn delete(S3Path { path }: S3Path, state: State) -> Result {
121 2 : info!(%path, "deleting");
122 2 : let cancel = state.cancel.clone();
123 2 : retry(
124 0 : async || state.storage.delete(&path, &cancel).await,
125 : TimeoutOrCancel::caused_by_cancel,
126 : WARN_THRESHOLD,
127 : MAX_RETRIES,
128 2 : "deleting",
129 2 : &cancel,
130 : )
131 2 : .await
132 2 : .unwrap_or(Err(anyhow!("deleting cancelled")))
133 2 : .map_err(|e| internal_error(e, path, "deleting"))?;
134 2 : Ok(ok())
135 2 : }
136 :
137 6 : async fn delete_prefix(PrefixS3Path { path }: PrefixS3Path, state: State) -> Result {
138 6 : info!(%path, "deleting prefix");
139 6 : let cancel = state.cancel.clone();
140 6 : retry(
141 0 : async || state.storage.delete_prefix(&path, &cancel).await,
142 : TimeoutOrCancel::caused_by_cancel,
143 : WARN_THRESHOLD,
144 : MAX_RETRIES,
145 6 : "deleting prefix",
146 6 : &cancel,
147 : )
148 6 : .await
149 6 : .unwrap_or(Err(anyhow!("deleting prefix cancelled")))
150 6 : .map_err(|e| internal_error(e, path, "deleting prefix"))?;
151 6 : Ok(ok())
152 6 : }
153 :
154 35 : pub async fn check_storage_permissions(
155 35 : client: &GenericRemoteStorage,
156 35 : cancel: CancellationToken,
157 35 : ) -> anyhow::Result<()> {
158 35 : info!("storage permissions check");
159 :
160 : // as_nanos() as multiple instances proxying same bucket may be started at once
161 35 : let now = SystemTime::now()
162 35 : .duration_since(UNIX_EPOCH)?
163 35 : .as_nanos()
164 35 : .to_string();
165 :
166 35 : let path = RemotePath::from_string(&format!("write_access_{now}"))?;
167 35 : info!(%path, "uploading");
168 :
169 35 : let body = now.to_string();
170 35 : let stream = bytes_to_stream(Bytes::from(body.clone()));
171 35 : client
172 35 : .upload(stream, body.len(), &path, None, &cancel)
173 35 : .await?;
174 :
175 : use tokio::io::AsyncReadExt;
176 35 : info!(%path, "downloading");
177 35 : let download_opts = DownloadOpts {
178 35 : kind: remote_storage::DownloadKind::Small,
179 35 : ..Default::default()
180 35 : };
181 35 : let mut body_read_buf = Vec::new();
182 35 : let stream = client
183 35 : .download(&path, &download_opts, &cancel)
184 35 : .await?
185 : .download_stream;
186 35 : tokio_util::io::StreamReader::new(stream)
187 35 : .read_to_end(&mut body_read_buf)
188 35 : .await?;
189 35 : let body_read = String::from_utf8(body_read_buf)?;
190 35 : if body != body_read {
191 0 : error!(%body, %body_read, "File contents do not match");
192 0 : anyhow::bail!("Read back file doesn't match original")
193 35 : }
194 :
195 35 : info!(%path, "removing");
196 35 : client.delete(&path, &cancel).await
197 35 : }
198 :
199 48 : fn bytes_to_stream(bytes: Bytes) -> impl futures::Stream<Item = std::io::Result<Bytes>> {
200 48 : futures::stream::once(futures::future::ready(Ok(bytes)))
201 48 : }
202 :
203 : #[cfg(test)]
204 : mod tests {
205 : use super::*;
206 : use axum::{body::Body, extract::Request, response::Response};
207 : use http_body_util::BodyExt;
208 : use itertools::iproduct;
209 : use std::env::var;
210 : use std::sync::Arc;
211 : use std::time::Duration;
212 : use test_log::test as testlog;
213 : use tower::{Service, util::ServiceExt};
214 : use utils::id::{TenantId, TimelineId};
215 :
216 : // see libs/remote_storage/tests/test_real_s3.rs
217 : const REAL_S3_ENV: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
218 : const REAL_S3_BUCKET: &str = "REMOTE_STORAGE_S3_BUCKET";
219 : const REAL_S3_REGION: &str = "REMOTE_STORAGE_S3_REGION";
220 :
221 35 : async fn proxy() -> (Storage, Option<camino_tempfile::Utf8TempDir>) {
222 35 : let cancel = CancellationToken::new();
223 35 : let (dir, storage) = if var(REAL_S3_ENV).is_err() {
224 : // tests execute in parallel and we need a new directory for each of them
225 35 : let dir = camino_tempfile::tempdir().unwrap();
226 35 : let fs =
227 35 : remote_storage::LocalFs::new(dir.path().into(), Duration::from_secs(5)).unwrap();
228 35 : (Some(dir), GenericRemoteStorage::LocalFs(fs))
229 : } else {
230 : // test_real_s3::create_s3_client is hard to reference, reimplementing here
231 0 : let millis = SystemTime::now()
232 0 : .duration_since(UNIX_EPOCH)
233 0 : .unwrap()
234 0 : .as_millis();
235 : use rand::Rng;
236 0 : let random = rand::thread_rng().r#gen::<u32>();
237 :
238 0 : let s3_config = remote_storage::S3Config {
239 0 : bucket_name: var(REAL_S3_BUCKET).unwrap(),
240 0 : bucket_region: var(REAL_S3_REGION).unwrap(),
241 0 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
242 0 : endpoint: None,
243 0 : concurrency_limit: std::num::NonZeroUsize::new(100).unwrap(),
244 0 : max_keys_per_list_response: None,
245 0 : upload_storage_class: None,
246 0 : };
247 0 : let bucket = remote_storage::S3Bucket::new(&s3_config, Duration::from_secs(1))
248 0 : .await
249 0 : .unwrap();
250 0 : (None, GenericRemoteStorage::AwsS3(Arc::new(bucket)))
251 : };
252 :
253 35 : let proxy = Storage {
254 35 : auth: endpoint_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
255 35 : storage,
256 35 : cancel: cancel.clone(),
257 35 : max_upload_file_limit: usize::MAX,
258 35 : };
259 35 : check_storage_permissions(&proxy.storage, cancel)
260 35 : .await
261 35 : .unwrap();
262 35 : (proxy, dir)
263 35 : }
264 :
265 : // see libs/utils/src/auth.rs
266 : const TEST_PUB_KEY_ED25519: &[u8] = b"
267 : -----BEGIN PUBLIC KEY-----
268 : MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w=
269 : -----END PUBLIC KEY-----
270 : ";
271 :
272 : const TEST_PRIV_KEY_ED25519: &[u8] = br#"
273 : -----BEGIN PRIVATE KEY-----
274 : MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
275 : -----END PRIVATE KEY-----
276 : "#;
277 :
278 30 : async fn request(req: Request<Body>) -> Response<Body> {
279 30 : let (proxy, _) = proxy().await;
280 30 : app(Arc::new(proxy))
281 30 : .into_service()
282 30 : .oneshot(req)
283 30 : .await
284 30 : .unwrap()
285 30 : }
286 :
287 : #[testlog(tokio::test)]
288 : async fn status() {
289 : let res = Request::builder()
290 : .uri("/status")
291 : .body(Body::empty())
292 : .map(request)
293 : .unwrap()
294 : .await;
295 : assert_eq!(res.status(), StatusCode::OK);
296 : }
297 :
298 3 : fn routes() -> impl Iterator<Item = (&'static str, &'static str)> {
299 3 : iproduct!(
300 3 : vec!["/1", "/1/2", "/1/2/3", "/1/2/3/4"],
301 3 : vec!["GET", "PUT", "DELETE"]
302 : )
303 3 : }
304 :
305 : #[testlog(tokio::test)]
306 : async fn no_token() {
307 : for (uri, method) in routes() {
308 : info!(%uri, %method);
309 : let res = Request::builder()
310 : .uri(uri)
311 : .method(method)
312 : .body(Body::empty())
313 : .map(request)
314 : .unwrap()
315 : .await;
316 : assert!(matches!(
317 : res.status(),
318 : StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
319 : ));
320 : }
321 : }
322 :
323 : #[testlog(tokio::test)]
324 : async fn invalid_token() {
325 : for (uri, method) in routes() {
326 : info!(%uri, %method);
327 : let status = Request::builder()
328 : .uri(uri)
329 : .header("Authorization", "Bearer 123")
330 : .method(method)
331 : .body(Body::empty())
332 : .map(request)
333 : .unwrap()
334 : .await;
335 : assert!(matches!(
336 : status.status(),
337 : StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
338 : ));
339 : }
340 : }
341 :
342 : const TENANT_ID: TenantId =
343 : TenantId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
344 : const TIMELINE_ID: TimelineId =
345 : TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
346 : const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
347 11 : fn token() -> String {
348 11 : let claims = endpoint_storage::claims::EndpointStorageClaims {
349 11 : tenant_id: TENANT_ID,
350 11 : timeline_id: TIMELINE_ID,
351 11 : endpoint_id: ENDPOINT_ID.into(),
352 11 : exp: u64::MAX,
353 11 : };
354 11 : let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
355 11 : let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
356 11 : jsonwebtoken::encode(&header, &claims, &key).unwrap()
357 11 : }
358 :
359 : #[testlog(tokio::test)]
360 : async fn unauthorized() {
361 : let (proxy, _) = proxy().await;
362 : let mut app = app(Arc::new(proxy)).into_service();
363 : let token = token();
364 : let args = itertools::iproduct!(
365 : vec![TENANT_ID.to_string(), TenantId::generate().to_string()],
366 : vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
367 : vec![ENDPOINT_ID, "ep-ololo"]
368 : )
369 : // first one is fully valid path, second path is valid for GET as
370 : // read paths may have different endpoint if tenant and timeline matches
371 : // (needed for prewarming RO->RW replica)
372 : .skip(2);
373 :
374 : for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
375 : info!(%uri, %method, %tenant, %timeline, %endpoint);
376 : let request = Request::builder()
377 : .uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
378 : .method(method)
379 : .header("Authorization", format!("Bearer {token}"))
380 : .body(Body::empty())
381 : .unwrap();
382 : let status = ServiceExt::ready(&mut app)
383 : .await
384 : .unwrap()
385 : .call(request)
386 : .await
387 : .unwrap()
388 : .status();
389 : assert_eq!(status, StatusCode::UNAUTHORIZED);
390 : }
391 : }
392 :
393 : #[testlog(tokio::test)]
394 : async fn method_not_allowed() {
395 : let token = token();
396 : let iter = iproduct!(vec!["", "/.."], vec!["GET", "PUT"]);
397 : for (key, method) in iter {
398 : let status = Request::builder()
399 : .uri(format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}{key}"))
400 : .method(method)
401 : .header("Authorization", format!("Bearer {token}"))
402 : .body(Body::empty())
403 : .map(request)
404 : .unwrap()
405 : .await
406 : .status();
407 : assert!(matches!(
408 : status,
409 : StatusCode::BAD_REQUEST | StatusCode::METHOD_NOT_ALLOWED
410 : ));
411 : }
412 : }
413 :
414 4 : async fn requests_chain(
415 4 : chain: impl Iterator<Item = (String, &str, &'static str, StatusCode, bool)>,
416 4 : token: impl Fn(&str) -> String,
417 4 : ) {
418 4 : let (proxy, _) = proxy().await;
419 4 : let mut app = app(Arc::new(proxy)).into_service();
420 47 : for (uri, method, body, expected_status, compare_body) in chain {
421 43 : info!(%uri, %method, %body, %expected_status);
422 43 : let bearer = format!("Bearer {}", token(&uri));
423 43 : let request = Request::builder()
424 43 : .uri(uri)
425 43 : .method(method)
426 43 : .header("Authorization", &bearer)
427 43 : .body(Body::from(body))
428 43 : .unwrap();
429 43 : let response = ServiceExt::ready(&mut app)
430 43 : .await
431 43 : .unwrap()
432 43 : .call(request)
433 43 : .await
434 43 : .unwrap();
435 43 : assert_eq!(response.status(), expected_status);
436 43 : if !compare_body {
437 42 : continue;
438 1 : }
439 1 : let read_body = response.into_body().collect().await.unwrap().to_bytes();
440 1 : assert_eq!(body, read_body);
441 : }
442 4 : }
443 :
444 : #[testlog(tokio::test)]
445 : async fn metrics() {
446 : let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
447 : let req = vec![
448 : (uri.clone(), "PUT", "body", StatusCode::OK, false),
449 : (uri.clone(), "DELETE", "", StatusCode::OK, false),
450 : ];
451 2 : requests_chain(req.into_iter(), |_| token()).await;
452 :
453 : let res = Request::builder()
454 : .uri("/metrics")
455 : .body(Body::empty())
456 : .map(request)
457 : .unwrap()
458 : .await;
459 : assert_eq!(res.status(), StatusCode::OK);
460 : let body = res.into_body().collect().await.unwrap().to_bytes();
461 : let body = String::from_utf8_lossy(&body);
462 : tracing::debug!(%body);
463 : // Storage metrics are not gathered for LocalFs
464 : if var(REAL_S3_ENV).is_ok() {
465 : assert!(body.contains("remote_storage_s3_deleted_objects_total"));
466 : }
467 :
468 : #[cfg(target_os = "linux")]
469 : assert!(body.contains("process_threads"));
470 : }
471 :
472 : #[testlog(tokio::test)]
473 : async fn insert_retrieve_remove() {
474 : let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
475 : let chain = vec![
476 : (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
477 : (uri.clone(), "PUT", "пыщьпыщь", StatusCode::OK, false),
478 : (uri.clone(), "GET", "пыщьпыщь", StatusCode::OK, true),
479 : (uri.clone(), "DELETE", "", StatusCode::OK, false),
480 : (uri, "GET", "", StatusCode::NOT_FOUND, false),
481 : ];
482 5 : requests_chain(chain.into_iter(), |_| token()).await;
483 : }
484 :
485 : #[testlog(tokio::test)]
486 : async fn read_other_endpoint_data() {
487 : let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/other_endpoint/key");
488 : let chain = vec![
489 : (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
490 : (uri.clone(), "PUT", "", StatusCode::UNAUTHORIZED, false),
491 : ];
492 2 : requests_chain(chain.into_iter(), |_| token()).await;
493 : }
494 :
495 34 : fn delete_prefix_token(uri: &str) -> String {
496 34 : let parts = uri.split("/").collect::<Vec<&str>>();
497 34 : let claims = endpoint_storage::claims::DeletePrefixClaims {
498 34 : tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
499 34 : timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
500 34 : endpoint_id: parts.get(3).map(ToString::to_string),
501 : exp: u64::MAX,
502 : };
503 34 : let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
504 34 : let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
505 34 : jsonwebtoken::encode(&header, &claims, &key).unwrap()
506 34 : }
507 :
508 : // Can't use single digit numbers as they won't be validated as TimelineId and EndpointId
509 : #[testlog(tokio::test)]
510 : async fn delete_prefix() {
511 : let tenant_id =
512 : TenantId::from_array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to_string();
513 : let t2 = TimelineId::from_array([2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
514 : let t3 = TimelineId::from_array([3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
515 : let t4 = TimelineId::from_array([4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
516 33 : let f = |timeline, path| format!("/{tenant_id}/{timeline}{path}");
517 : // Why extra slash in string literals? Axum is weird with URIs:
518 : // /1/2 and 1/2/ match different routes, thus first yields OK and second NOT_FOUND
519 : // as it matches /tenant/timeline/endpoint, see https://stackoverflow.com/a/75355932
520 : // The cost of removing trailing slash is suprisingly hard:
521 : // * Add tower dependency with NormalizePath layer
522 : // * wrap Router<()> in this layer https://github.com/tokio-rs/axum/discussions/2377
523 : // * Rewrite make_service() -> into_make_service()
524 : // * Rewrite oneshot() (not available for NormalizePath)
525 : // I didn't manage to get it working correctly
526 : let chain = vec![
527 : // create 1/2/3/4, 1/2/3/5, delete prefix 1/2/3 -> empty
528 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
529 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), // we can override file contents
530 : (f(t2, "/3/5"), "PUT", "", StatusCode::OK, false),
531 : (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
532 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
533 : (f(t2, "/3/5"), "GET", "", StatusCode::NOT_FOUND, false),
534 : // create 1/2/3/4, 1/2/5/6, delete prefix 1/2/3 -> 1/2/5/6
535 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
536 : (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
537 : (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
538 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
539 : (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
540 : // create 1/2/3/4, 1/2/7/8, delete prefix 1/2 -> empty
541 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
542 : (f(t2, "/7/8"), "PUT", "", StatusCode::OK, false),
543 : (f(t2, ""), "DELETE", "", StatusCode::OK, false),
544 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
545 : (f(t2, "/7/8"), "GET", "", StatusCode::NOT_FOUND, false),
546 : // create 1/2/3/4, 1/2/5/6, 1/3/8/9, delete prefix 1/2/3 -> 1/2/5/6, 1/3/8/9
547 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
548 : (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
549 : (f(t3, "/8/9"), "PUT", "", StatusCode::OK, false),
550 : (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
551 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
552 : (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
553 : (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
554 : // create 1/4/5/6, delete prefix 1/2 -> 1/3/8/9, 1/4/5/6
555 : (f(t4, "/5/6"), "PUT", "", StatusCode::OK, false),
556 : (f(t2, ""), "DELETE", "", StatusCode::OK, false),
557 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
558 : (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
559 : (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
560 : (f(t4, "/5/6"), "GET", "", StatusCode::OK, false),
561 : // delete prefix 1 -> empty
562 : (format!("/{tenant_id}"), "DELETE", "", StatusCode::OK, false),
563 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
564 : (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
565 : (f(t3, "/8/9"), "GET", "", StatusCode::NOT_FOUND, false),
566 : (f(t4, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
567 : ];
568 : requests_chain(chain.into_iter(), delete_prefix_token).await;
569 : }
570 : }
|