Line data Source code
1 : use anyhow::anyhow;
2 : use axum::body::{Body, Bytes};
3 : use axum::response::{IntoResponse, Response};
4 : use axum::{Router, http::StatusCode};
5 : use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
6 : use remote_storage::TimeoutOrCancel;
7 : use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
8 : use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::{error, info};
11 : use utils::backoff::retry;
12 :
13 34 : pub fn app(state: Arc<Storage>) -> Router<()> {
14 : use axum::routing::{delete as _delete, get as _get};
15 34 : let delete_prefix = _delete(delete_prefix);
16 34 : Router::new()
17 34 : .route(
18 34 : "/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",
19 34 : _get(get).put(set).delete(delete),
20 34 : )
21 34 : .route(
22 34 : "/{tenant_id}/{timeline_id}/{endpoint_id}",
23 34 : delete_prefix.clone(),
24 34 : )
25 34 : .route("/{tenant_id}/{timeline_id}", delete_prefix.clone())
26 34 : .route("/{tenant_id}", delete_prefix)
27 34 : .route("/metrics", _get(metrics))
28 34 : .route("/status", _get(async || StatusCode::OK.into_response()))
29 34 : .with_state(state)
30 34 : }
31 :
32 : type Result = anyhow::Result<Response, Response>;
33 : type State = axum::extract::State<Arc<Storage>>;
34 :
35 : const CONTENT_TYPE: &str = "content-type";
36 : const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
37 : const WARN_THRESHOLD: u32 = 3;
38 : const MAX_RETRIES: u32 = 10;
39 :
40 1 : async fn metrics() -> Result {
41 1 : prometheus::TextEncoder::new()
42 1 : .encode_to_string(&prometheus::gather())
43 1 : .map(|s| s.into_response())
44 1 : .map_err(|e| internal_error(e, "/metrics", "collecting metrics"))
45 1 : }
46 :
47 20 : async fn get(S3Path { path }: S3Path, state: State) -> Result {
48 20 : info!(%path, "downloading");
49 20 : let download_err = |e| {
50 14 : if let DownloadError::NotFound = e {
51 14 : info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service
52 14 : return not_found(&path);
53 0 : }
54 0 : internal_error(e, &path, "downloading")
55 14 : };
56 20 : let cancel = state.cancel.clone();
57 20 : let opts = &DownloadOpts::default();
58 :
59 20 : let stream = retry(
60 20 : async || state.storage.download(&path, opts, &cancel).await,
61 20 : DownloadError::is_permanent,
62 20 : WARN_THRESHOLD,
63 20 : MAX_RETRIES,
64 20 : "downloading",
65 20 : &cancel,
66 20 : )
67 20 : .await
68 20 : .unwrap_or(Err(DownloadError::Cancelled))
69 20 : .map_err(download_err)?
70 : .download_stream;
71 :
72 6 : Response::builder()
73 6 : .status(StatusCode::OK)
74 6 : .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
75 6 : .body(Body::from_stream(stream))
76 6 : .map_err(|e| internal_error(e, path, "reading response"))
77 20 : }
78 :
79 : // Best solution for files is multipart upload, but remote_storage doesn't support it,
80 : // so we can either read Bytes in memory and push at once or forward BodyDataStream to
81 : // remote_storage. The latter may seem more peformant, but BodyDataStream doesn't have a
82 : // guaranteed size() which may produce issues while uploading to s3.
83 : // So, currently we're going with an in-memory copy plus a boundary to prevent uploading
84 : // very large files.
85 13 : async fn set(S3Path { path }: S3Path, state: State, bytes: Bytes) -> Result {
86 13 : info!(%path, "uploading");
87 13 : let request_len = bytes.len();
88 13 : let max_len = state.max_upload_file_limit;
89 13 : if request_len > max_len {
90 0 : return Err(bad_request(
91 0 : anyhow!("File size {request_len} exceeds max {max_len}"),
92 0 : "uploading",
93 0 : ));
94 13 : }
95 13 :
96 13 : let cancel = state.cancel.clone();
97 13 : let fun = async || {
98 0 : let stream = bytes_to_stream(bytes.clone());
99 0 : state
100 0 : .storage
101 0 : .upload(stream, request_len, &path, None, &cancel)
102 0 : .await
103 13 : };
104 13 : retry(
105 13 : fun,
106 13 : TimeoutOrCancel::caused_by_cancel,
107 13 : WARN_THRESHOLD,
108 13 : MAX_RETRIES,
109 13 : "uploading",
110 13 : &cancel,
111 13 : )
112 13 : .await
113 13 : .unwrap_or(Err(anyhow!("uploading cancelled")))
114 13 : .map_err(|e| internal_error(e, path, "reading response"))?;
115 13 : Ok(ok())
116 13 : }
117 :
118 2 : async fn delete(S3Path { path }: S3Path, state: State) -> Result {
119 2 : info!(%path, "deleting");
120 2 : let cancel = state.cancel.clone();
121 2 : retry(
122 2 : async || state.storage.delete(&path, &cancel).await,
123 2 : TimeoutOrCancel::caused_by_cancel,
124 2 : WARN_THRESHOLD,
125 2 : MAX_RETRIES,
126 2 : "deleting",
127 2 : &cancel,
128 2 : )
129 2 : .await
130 2 : .unwrap_or(Err(anyhow!("deleting cancelled")))
131 2 : .map_err(|e| internal_error(e, path, "deleting"))?;
132 2 : Ok(ok())
133 2 : }
134 :
135 6 : async fn delete_prefix(PrefixS3Path { path }: PrefixS3Path, state: State) -> Result {
136 6 : info!(%path, "deleting prefix");
137 6 : let cancel = state.cancel.clone();
138 6 : retry(
139 6 : async || state.storage.delete_prefix(&path, &cancel).await,
140 6 : TimeoutOrCancel::caused_by_cancel,
141 6 : WARN_THRESHOLD,
142 6 : MAX_RETRIES,
143 6 : "deleting prefix",
144 6 : &cancel,
145 6 : )
146 6 : .await
147 6 : .unwrap_or(Err(anyhow!("deleting prefix cancelled")))
148 6 : .map_err(|e| internal_error(e, path, "deleting prefix"))?;
149 6 : Ok(ok())
150 6 : }
151 :
152 34 : pub async fn check_storage_permissions(
153 34 : client: &GenericRemoteStorage,
154 34 : cancel: CancellationToken,
155 34 : ) -> anyhow::Result<()> {
156 34 : info!("storage permissions check");
157 :
158 : // as_nanos() as multiple instances proxying same bucket may be started at once
159 34 : let now = SystemTime::now()
160 34 : .duration_since(UNIX_EPOCH)?
161 34 : .as_nanos()
162 34 : .to_string();
163 :
164 34 : let path = RemotePath::from_string(&format!("write_access_{now}"))?;
165 34 : info!(%path, "uploading");
166 :
167 34 : let body = now.to_string();
168 34 : let stream = bytes_to_stream(Bytes::from(body.clone()));
169 34 : client
170 34 : .upload(stream, body.len(), &path, None, &cancel)
171 34 : .await?;
172 :
173 : use tokio::io::AsyncReadExt;
174 34 : info!(%path, "downloading");
175 34 : let download_opts = DownloadOpts {
176 34 : kind: remote_storage::DownloadKind::Small,
177 34 : ..Default::default()
178 34 : };
179 34 : let mut body_read_buf = Vec::new();
180 34 : let stream = client
181 34 : .download(&path, &download_opts, &cancel)
182 34 : .await?
183 : .download_stream;
184 34 : tokio_util::io::StreamReader::new(stream)
185 34 : .read_to_end(&mut body_read_buf)
186 34 : .await?;
187 34 : let body_read = String::from_utf8(body_read_buf)?;
188 34 : if body != body_read {
189 0 : error!(%body, %body_read, "File contents do not match");
190 0 : anyhow::bail!("Read back file doesn't match original")
191 34 : }
192 34 :
193 34 : info!(%path, "removing");
194 34 : client.delete(&path, &cancel).await
195 34 : }
196 :
197 47 : fn bytes_to_stream(bytes: Bytes) -> impl futures::Stream<Item = std::io::Result<Bytes>> {
198 47 : futures::stream::once(futures::future::ready(Ok(bytes)))
199 47 : }
200 :
201 : #[cfg(test)]
202 : mod tests {
203 : use super::*;
204 : use axum::{body::Body, extract::Request, response::Response};
205 : use http_body_util::BodyExt;
206 : use itertools::iproduct;
207 : use std::env::var;
208 : use std::sync::Arc;
209 : use std::time::Duration;
210 : use test_log::test as testlog;
211 : use tower::{Service, util::ServiceExt};
212 : use utils::id::{TenantId, TimelineId};
213 :
214 : // see libs/remote_storage/tests/test_real_s3.rs
215 : const REAL_S3_ENV: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
216 : const REAL_S3_BUCKET: &str = "REMOTE_STORAGE_S3_BUCKET";
217 : const REAL_S3_REGION: &str = "REMOTE_STORAGE_S3_REGION";
218 :
219 34 : async fn proxy() -> (Storage, Option<camino_tempfile::Utf8TempDir>) {
220 34 : let cancel = CancellationToken::new();
221 34 : let (dir, storage) = if var(REAL_S3_ENV).is_err() {
222 : // tests execute in parallel and we need a new directory for each of them
223 34 : let dir = camino_tempfile::tempdir().unwrap();
224 34 : let fs =
225 34 : remote_storage::LocalFs::new(dir.path().into(), Duration::from_secs(5)).unwrap();
226 34 : (Some(dir), GenericRemoteStorage::LocalFs(fs))
227 : } else {
228 : // test_real_s3::create_s3_client is hard to reference, reimplementing here
229 0 : let millis = SystemTime::now()
230 0 : .duration_since(UNIX_EPOCH)
231 0 : .unwrap()
232 0 : .as_millis();
233 : use rand::Rng;
234 0 : let random = rand::thread_rng().r#gen::<u32>();
235 0 :
236 0 : let s3_config = remote_storage::S3Config {
237 0 : bucket_name: var(REAL_S3_BUCKET).unwrap(),
238 0 : bucket_region: var(REAL_S3_REGION).unwrap(),
239 0 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
240 0 : endpoint: None,
241 0 : concurrency_limit: std::num::NonZeroUsize::new(100).unwrap(),
242 0 : max_keys_per_list_response: None,
243 0 : upload_storage_class: None,
244 0 : };
245 0 : let bucket = remote_storage::S3Bucket::new(&s3_config, Duration::from_secs(1))
246 0 : .await
247 0 : .unwrap();
248 0 : (None, GenericRemoteStorage::AwsS3(Arc::new(bucket)))
249 : };
250 :
251 34 : let proxy = Storage {
252 34 : auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
253 34 : storage,
254 34 : cancel: cancel.clone(),
255 34 : max_upload_file_limit: usize::MAX,
256 34 : };
257 34 : check_storage_permissions(&proxy.storage, cancel)
258 34 : .await
259 34 : .unwrap();
260 34 : (proxy, dir)
261 34 : }
262 :
263 : // see libs/utils/src/auth.rs
264 : const TEST_PUB_KEY_ED25519: &[u8] = b"
265 : -----BEGIN PUBLIC KEY-----
266 : MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w=
267 : -----END PUBLIC KEY-----
268 : ";
269 :
270 : const TEST_PRIV_KEY_ED25519: &[u8] = br#"
271 : -----BEGIN PRIVATE KEY-----
272 : MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
273 : -----END PRIVATE KEY-----
274 : "#;
275 :
276 30 : async fn request(req: Request<Body>) -> Response<Body> {
277 30 : let (proxy, _) = proxy().await;
278 30 : app(Arc::new(proxy))
279 30 : .into_service()
280 30 : .oneshot(req)
281 30 : .await
282 30 : .unwrap()
283 30 : }
284 :
285 2 : #[testlog(tokio::test)]
286 : async fn status() {
287 : let res = Request::builder()
288 : .uri("/status")
289 : .body(Body::empty())
290 : .map(request)
291 : .unwrap()
292 : .await;
293 : assert_eq!(res.status(), StatusCode::OK);
294 : }
295 :
296 3 : fn routes() -> impl Iterator<Item = (&'static str, &'static str)> {
297 3 : iproduct!(
298 3 : vec!["/1", "/1/2", "/1/2/3", "/1/2/3/4"],
299 3 : vec!["GET", "PUT", "DELETE"]
300 3 : )
301 3 : }
302 :
303 2 : #[testlog(tokio::test)]
304 : async fn no_token() {
305 : for (uri, method) in routes() {
306 : info!(%uri, %method);
307 : let res = Request::builder()
308 : .uri(uri)
309 : .method(method)
310 : .body(Body::empty())
311 : .map(request)
312 : .unwrap()
313 : .await;
314 : assert!(matches!(
315 : res.status(),
316 : StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
317 : ));
318 : }
319 : }
320 :
321 2 : #[testlog(tokio::test)]
322 : async fn invalid_token() {
323 : for (uri, method) in routes() {
324 : info!(%uri, %method);
325 : let status = Request::builder()
326 : .uri(uri)
327 : .header("Authorization", "Bearer 123")
328 : .method(method)
329 : .body(Body::empty())
330 : .map(request)
331 : .unwrap()
332 : .await;
333 : assert!(matches!(
334 : status.status(),
335 : StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
336 : ));
337 : }
338 : }
339 :
340 : const TENANT_ID: TenantId =
341 : TenantId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
342 : const TIMELINE_ID: TimelineId =
343 : TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
344 : const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
345 9 : fn token() -> String {
346 9 : let claims = object_storage::Claims {
347 9 : tenant_id: TENANT_ID,
348 9 : timeline_id: TIMELINE_ID,
349 9 : endpoint_id: ENDPOINT_ID.into(),
350 9 : exp: u64::MAX,
351 9 : };
352 9 : let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
353 9 : let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
354 9 : jsonwebtoken::encode(&header, &claims, &key).unwrap()
355 9 : }
356 :
357 2 : #[testlog(tokio::test)]
358 : async fn unauthorized() {
359 : let (proxy, _) = proxy().await;
360 : let mut app = app(Arc::new(proxy)).into_service();
361 : let token = token();
362 : let args = itertools::iproduct!(
363 : vec![TENANT_ID.to_string(), TenantId::generate().to_string()],
364 : vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
365 : vec![ENDPOINT_ID, "ep-ololo"]
366 : )
367 : .skip(1);
368 :
369 : for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
370 : info!(%uri, %method, %tenant, %timeline, %endpoint);
371 : let request = Request::builder()
372 : .uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
373 : .method(method)
374 : .header("Authorization", format!("Bearer {}", token))
375 : .body(Body::empty())
376 : .unwrap();
377 : let status = ServiceExt::ready(&mut app)
378 : .await
379 : .unwrap()
380 : .call(request)
381 : .await
382 : .unwrap()
383 : .status();
384 : assert_eq!(status, StatusCode::UNAUTHORIZED);
385 : }
386 : }
387 :
388 2 : #[testlog(tokio::test)]
389 : async fn method_not_allowed() {
390 : let token = token();
391 : let iter = iproduct!(vec!["", "/.."], vec!["GET", "PUT"]);
392 : for (key, method) in iter {
393 : let status = Request::builder()
394 : .uri(format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}{key}"))
395 : .method(method)
396 : .header("Authorization", format!("Bearer {token}"))
397 : .body(Body::empty())
398 : .map(request)
399 : .unwrap()
400 : .await
401 : .status();
402 : assert!(matches!(
403 : status,
404 : StatusCode::BAD_REQUEST | StatusCode::METHOD_NOT_ALLOWED
405 : ));
406 : }
407 : }
408 :
409 3 : async fn requests_chain(
410 3 : chain: impl Iterator<Item = (String, &str, &'static str, StatusCode, bool)>,
411 3 : token: impl Fn(&str) -> String,
412 3 : ) {
413 3 : let (proxy, _) = proxy().await;
414 3 : let mut app = app(Arc::new(proxy)).into_service();
415 44 : for (uri, method, body, expected_status, compare_body) in chain {
416 41 : info!(%uri, %method, %body, %expected_status);
417 41 : let bearer = format!("Bearer {}", token(&uri));
418 41 : let request = Request::builder()
419 41 : .uri(uri)
420 41 : .method(method)
421 41 : .header("Authorization", &bearer)
422 41 : .body(Body::from(body))
423 41 : .unwrap();
424 41 : let response = ServiceExt::ready(&mut app)
425 41 : .await
426 41 : .unwrap()
427 41 : .call(request)
428 41 : .await
429 41 : .unwrap();
430 41 : assert_eq!(response.status(), expected_status);
431 41 : if !compare_body {
432 40 : continue;
433 1 : }
434 1 : let read_body = response.into_body().collect().await.unwrap().to_bytes();
435 1 : assert_eq!(body, read_body);
436 : }
437 3 : }
438 :
439 2 : #[testlog(tokio::test)]
440 : async fn metrics() {
441 : let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
442 : let req = vec![
443 : (uri.clone(), "PUT", "body", StatusCode::OK, false),
444 : (uri.clone(), "DELETE", "", StatusCode::OK, false),
445 : ];
446 2 : requests_chain(req.into_iter(), |_| token()).await;
447 :
448 : let res = Request::builder()
449 : .uri("/metrics")
450 : .body(Body::empty())
451 : .map(request)
452 : .unwrap()
453 : .await;
454 : assert_eq!(res.status(), StatusCode::OK);
455 : let body = res.into_body().collect().await.unwrap().to_bytes();
456 : let body = String::from_utf8_lossy(&body);
457 : tracing::debug!(%body);
458 : // Storage metrics are not gathered for LocalFs
459 : if var(REAL_S3_ENV).is_ok() {
460 : assert!(body.contains("remote_storage_s3_deleted_objects_total"));
461 : }
462 : assert!(body.contains("process_threads"));
463 : }
464 :
465 2 : #[testlog(tokio::test)]
466 : async fn insert_retrieve_remove() {
467 : let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
468 : let chain = vec![
469 : (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
470 : (uri.clone(), "PUT", "пыщьпыщь", StatusCode::OK, false),
471 : (uri.clone(), "GET", "пыщьпыщь", StatusCode::OK, true),
472 : (uri.clone(), "DELETE", "", StatusCode::OK, false),
473 : (uri, "GET", "", StatusCode::NOT_FOUND, false),
474 : ];
475 5 : requests_chain(chain.into_iter(), |_| token()).await;
476 : }
477 :
478 34 : fn delete_prefix_token(uri: &str) -> String {
479 : use serde::Serialize;
480 34 : let parts = uri.split("/").collect::<Vec<&str>>();
481 : #[derive(Serialize)]
482 : struct PrefixClaims {
483 : tenant_id: TenantId,
484 : timeline_id: Option<TimelineId>,
485 : endpoint_id: Option<object_storage::EndpointId>,
486 : exp: u64,
487 : }
488 34 : let claims = PrefixClaims {
489 34 : tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
490 34 : timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
491 34 : endpoint_id: parts.get(3).map(ToString::to_string),
492 34 : exp: u64::MAX,
493 34 : };
494 34 : let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
495 34 : let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
496 34 : jsonwebtoken::encode(&header, &claims, &key).unwrap()
497 34 : }
498 :
499 : // Can't use single digit numbers as they won't be validated as TimelineId and EndpointId
500 2 : #[testlog(tokio::test)]
501 : async fn delete_prefix() {
502 : let tenant_id =
503 : TenantId::from_array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to_string();
504 : let t2 = TimelineId::from_array([2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
505 : let t3 = TimelineId::from_array([3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
506 : let t4 = TimelineId::from_array([4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
507 33 : let f = |timeline, path| format!("/{tenant_id}/{timeline}{path}");
508 : // Why extra slash in string literals? Axum is weird with URIs:
509 : // /1/2 and 1/2/ match different routes, thus first yields OK and second NOT_FOUND
510 : // as it matches /tenant/timeline/endpoint, see https://stackoverflow.com/a/75355932
511 : // The cost of removing trailing slash is suprisingly hard:
512 : // * Add tower dependency with NormalizePath layer
513 : // * wrap Router<()> in this layer https://github.com/tokio-rs/axum/discussions/2377
514 : // * Rewrite make_service() -> into_make_service()
515 : // * Rewrite oneshot() (not available for NormalizePath)
516 : // I didn't manage to get it working correctly
517 : let chain = vec![
518 : // create 1/2/3/4, 1/2/3/5, delete prefix 1/2/3 -> empty
519 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
520 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), // we can override file contents
521 : (f(t2, "/3/5"), "PUT", "", StatusCode::OK, false),
522 : (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
523 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
524 : (f(t2, "/3/5"), "GET", "", StatusCode::NOT_FOUND, false),
525 : // create 1/2/3/4, 1/2/5/6, delete prefix 1/2/3 -> 1/2/5/6
526 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
527 : (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
528 : (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
529 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
530 : (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
531 : // create 1/2/3/4, 1/2/7/8, delete prefix 1/2 -> empty
532 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
533 : (f(t2, "/7/8"), "PUT", "", StatusCode::OK, false),
534 : (f(t2, ""), "DELETE", "", StatusCode::OK, false),
535 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
536 : (f(t2, "/7/8"), "GET", "", StatusCode::NOT_FOUND, false),
537 : // create 1/2/3/4, 1/2/5/6, 1/3/8/9, delete prefix 1/2/3 -> 1/2/5/6, 1/3/8/9
538 : (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
539 : (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
540 : (f(t3, "/8/9"), "PUT", "", StatusCode::OK, false),
541 : (f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
542 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
543 : (f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
544 : (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
545 : // create 1/4/5/6, delete prefix 1/2 -> 1/3/8/9, 1/4/5/6
546 : (f(t4, "/5/6"), "PUT", "", StatusCode::OK, false),
547 : (f(t2, ""), "DELETE", "", StatusCode::OK, false),
548 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
549 : (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
550 : (f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
551 : (f(t4, "/5/6"), "GET", "", StatusCode::OK, false),
552 : // delete prefix 1 -> empty
553 : (format!("/{tenant_id}"), "DELETE", "", StatusCode::OK, false),
554 : (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
555 : (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
556 : (f(t3, "/8/9"), "GET", "", StatusCode::NOT_FOUND, false),
557 : (f(t4, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
558 : ];
559 : requests_chain(chain.into_iter(), delete_prefix_token).await;
560 : }
561 : }
|