Line data Source code
1 : use crate::{background_process, local_env::LocalEnv};
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use hyper::Method;
4 : use pageserver_api::{
5 : models::{
6 : ShardParameters, TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
7 : TimelineCreateRequest, TimelineInfo,
8 : },
9 : shard::TenantShardId,
10 : };
11 : use pageserver_client::mgmt_api::ResponseErrorMessageExt;
12 : use postgres_backend::AuthType;
13 : use serde::{de::DeserializeOwned, Deserialize, Serialize};
14 : use std::str::FromStr;
15 : use tokio::process::Command;
16 : use tracing::instrument;
17 : use url::Url;
18 : use utils::{
19 : auth::{Claims, Scope},
20 : id::{NodeId, TenantId},
21 : };
22 :
23 : pub struct AttachmentService {
24 : env: LocalEnv,
25 : listen: String,
26 : path: Utf8PathBuf,
27 : jwt_token: Option<String>,
28 : public_key: Option<String>,
29 : postgres_port: u16,
30 : client: reqwest::Client,
31 : }
32 :
33 : const COMMAND: &str = "attachment_service";
34 :
35 : const ATTACHMENT_SERVICE_POSTGRES_VERSION: u32 = 16;
36 :
37 1035 : #[derive(Serialize, Deserialize)]
38 : pub struct AttachHookRequest {
39 : pub tenant_shard_id: TenantShardId,
40 : pub node_id: Option<NodeId>,
41 : }
42 :
43 207 : #[derive(Serialize, Deserialize)]
44 : pub struct AttachHookResponse {
45 : pub gen: Option<u32>,
46 : }
47 :
48 219 : #[derive(Serialize, Deserialize)]
49 : pub struct InspectRequest {
50 : pub tenant_shard_id: TenantShardId,
51 : }
52 :
53 73 : #[derive(Serialize, Deserialize)]
54 : pub struct InspectResponse {
55 : pub attachment: Option<(u32, NodeId)>,
56 : }
57 :
58 3402 : #[derive(Serialize, Deserialize)]
59 : pub struct TenantCreateResponseShard {
60 : pub shard_id: TenantShardId,
61 : pub node_id: NodeId,
62 : pub generation: u32,
63 : }
64 :
65 1377 : #[derive(Serialize, Deserialize)]
66 : pub struct TenantCreateResponse {
67 : pub shards: Vec<TenantCreateResponseShard>,
68 : }
69 :
70 6864 : #[derive(Serialize, Deserialize)]
71 : pub struct NodeRegisterRequest {
72 : pub node_id: NodeId,
73 :
74 : pub listen_pg_addr: String,
75 : pub listen_pg_port: u16,
76 :
77 : pub listen_http_addr: String,
78 : pub listen_http_port: u16,
79 : }
80 :
81 20 : #[derive(Serialize, Deserialize)]
82 : pub struct NodeConfigureRequest {
83 : pub node_id: NodeId,
84 :
85 : pub availability: Option<NodeAvailability>,
86 : pub scheduling: Option<NodeSchedulingPolicy>,
87 : }
88 :
89 8255 : #[derive(Serialize, Deserialize, Debug)]
90 : pub struct TenantLocateResponseShard {
91 : pub shard_id: TenantShardId,
92 : pub node_id: NodeId,
93 :
94 : pub listen_pg_addr: String,
95 : pub listen_pg_port: u16,
96 :
97 : pub listen_http_addr: String,
98 : pub listen_http_port: u16,
99 : }
100 :
101 2860 : #[derive(Serialize, Deserialize)]
102 : pub struct TenantLocateResponse {
103 : pub shards: Vec<TenantLocateResponseShard>,
104 : pub shard_params: ShardParameters,
105 : }
106 :
107 : /// Explicitly migrating a particular shard is a low level operation
108 : /// TODO: higher level "Reschedule tenant" operation where the request
109 : /// specifies some constraints, e.g. asking it to get off particular node(s)
110 20 : #[derive(Serialize, Deserialize, Debug)]
111 : pub struct TenantShardMigrateRequest {
112 : pub tenant_shard_id: TenantShardId,
113 : pub node_id: NodeId,
114 : }
115 :
116 961 : #[derive(Serialize, Deserialize, Clone, Copy)]
117 : pub enum NodeAvailability {
118 : // Normal, happy state
119 : Active,
120 : // Offline: Tenants shouldn't try to attach here, but they may assume that their
121 : // secondary locations on this node still exist. Newly added nodes are in this
122 : // state until we successfully contact them.
123 : Offline,
124 : }
125 :
126 : impl FromStr for NodeAvailability {
127 : type Err = anyhow::Error;
128 :
129 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
130 0 : match s {
131 0 : "active" => Ok(Self::Active),
132 0 : "offline" => Ok(Self::Offline),
133 0 : _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
134 : }
135 0 : }
136 : }
137 :
138 : /// FIXME: this is a duplicate of the type in the attachment_service crate, because the
139 : /// type needs to be defined with diesel traits in there.
140 961 : #[derive(Serialize, Deserialize, Clone, Copy)]
141 : pub enum NodeSchedulingPolicy {
142 : Active,
143 : Filling,
144 : Pause,
145 : Draining,
146 : }
147 :
148 : impl FromStr for NodeSchedulingPolicy {
149 : type Err = anyhow::Error;
150 :
151 21 : fn from_str(s: &str) -> Result<Self, Self::Err> {
152 21 : match s {
153 21 : "active" => Ok(Self::Active),
154 21 : "filling" => Ok(Self::Filling),
155 0 : "pause" => Ok(Self::Pause),
156 0 : "draining" => Ok(Self::Draining),
157 0 : _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
158 : }
159 21 : }
160 : }
161 :
162 : impl From<NodeSchedulingPolicy> for String {
163 410 : fn from(value: NodeSchedulingPolicy) -> String {
164 410 : use NodeSchedulingPolicy::*;
165 410 : match value {
166 0 : Active => "active",
167 410 : Filling => "filling",
168 0 : Pause => "pause",
169 0 : Draining => "draining",
170 : }
171 410 : .to_string()
172 410 : }
173 : }
174 :
175 4 : #[derive(Serialize, Deserialize, Debug)]
176 : pub struct TenantShardMigrateResponse {}
177 :
178 : impl AttachmentService {
179 2731 : pub fn from_env(env: &LocalEnv) -> Self {
180 2731 : let path = Utf8PathBuf::from_path_buf(env.base_data_dir.clone())
181 2731 : .unwrap()
182 2731 : .join("attachments.json");
183 2731 :
184 2731 : // Makes no sense to construct this if pageservers aren't going to use it: assume
185 2731 : // pageservers have control plane API set
186 2731 : let listen_url = env.control_plane_api.clone().unwrap();
187 2731 :
188 2731 : let listen = format!(
189 2731 : "{}:{}",
190 2731 : listen_url.host_str().unwrap(),
191 2731 : listen_url.port().unwrap()
192 2731 : );
193 2731 :
194 2731 : // Convention: NeonEnv in python tests reserves the next port after the control_plane_api
195 2731 : // port, for use by our captive postgres.
196 2731 : let postgres_port = listen_url
197 2731 : .port()
198 2731 : .expect("Control plane API setting should always have a port")
199 2731 : + 1;
200 2731 :
201 2731 : // Assume all pageservers have symmetric auth configuration: this service
202 2731 : // expects to use one JWT token to talk to all of them.
203 2731 : let ps_conf = env
204 2731 : .pageservers
205 2731 : .first()
206 2731 : .expect("Config is validated to contain at least one pageserver");
207 2731 : let (jwt_token, public_key) = match ps_conf.http_auth_type {
208 2661 : AuthType::Trust => (None, None),
209 : AuthType::NeonJWT => {
210 70 : let jwt_token = env
211 70 : .generate_auth_token(&Claims::new(None, Scope::PageServerApi))
212 70 : .unwrap();
213 70 :
214 70 : // If pageserver auth is enabled, this implicitly enables auth for this service,
215 70 : // using the same credentials.
216 70 : let public_key_path =
217 70 : camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
218 70 : .unwrap();
219 :
220 : // This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
221 70 : let public_key = if std::fs::metadata(&public_key_path)
222 70 : .expect("Can't stat public key")
223 70 : .is_dir()
224 : {
225 : // Our config may specify a directory: this is for the pageserver's ability to handle multiple
226 : // keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
227 1 : let mut dir =
228 1 : std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
229 1 : let dent = dir
230 1 : .next()
231 1 : .expect("Empty key dir")
232 1 : .expect("Error reading key dir");
233 1 :
234 1 : std::fs::read_to_string(dent.path()).expect("Can't read public key")
235 : } else {
236 69 : std::fs::read_to_string(&public_key_path).expect("Can't read public key")
237 : };
238 70 : (Some(jwt_token), Some(public_key))
239 : }
240 : };
241 :
242 2731 : Self {
243 2731 : env: env.clone(),
244 2731 : path,
245 2731 : listen,
246 2731 : jwt_token,
247 2731 : public_key,
248 2731 : postgres_port,
249 2731 : client: reqwest::ClientBuilder::new()
250 2731 : .build()
251 2731 : .expect("Failed to construct http client"),
252 2731 : }
253 2731 : }
254 :
255 734 : fn pid_file(&self) -> Utf8PathBuf {
256 734 : Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("attachment_service.pid"))
257 734 : .expect("non-Unicode path")
258 734 : }
259 :
260 : /// PIDFile for the postgres instance used to store attachment service state
261 366 : fn postgres_pid_file(&self) -> Utf8PathBuf {
262 366 : Utf8PathBuf::from_path_buf(
263 366 : self.env
264 366 : .base_data_dir
265 366 : .join("attachment_service_postgres.pid"),
266 366 : )
267 366 : .expect("non-Unicode path")
268 366 : }
269 :
270 : /// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl`
271 : ///
272 : /// This usually uses ATTACHMENT_SERVICE_POSTGRES_VERSION of postgres, but will fall back
273 : /// to other versions if that one isn't found. Some automated tests create circumstances
274 : /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
275 1100 : pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
276 1100 : let prefer_versions = [ATTACHMENT_SERVICE_POSTGRES_VERSION, 15, 14];
277 :
278 1106 : for v in prefer_versions {
279 1106 : let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap();
280 1106 : if tokio::fs::try_exists(&path).await? {
281 1100 : return Ok(path);
282 6 : }
283 : }
284 :
285 : // Fall through
286 0 : anyhow::bail!(
287 0 : "Postgres binaries not found in {}",
288 0 : self.env.pg_distrib_dir.display()
289 0 : );
290 1100 : }
291 :
292 : /// Readiness check for our postgres process
293 736 : async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result<bool> {
294 736 : let bin_path = pg_bin_dir.join("pg_isready");
295 736 : let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)];
296 1099 : let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
297 :
298 736 : Ok(exitcode.success())
299 736 : }
300 :
301 : /// Create our database if it doesn't exist, and run migrations.
302 : ///
303 : /// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
304 : /// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
305 : /// who just want to run `cargo neon_local` without knowing about diesel.
306 : ///
307 : /// Returns the database url
308 366 : pub async fn setup_database(&self) -> anyhow::Result<String> {
309 366 : const DB_NAME: &str = "attachment_service";
310 366 : let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port);
311 :
312 368 : let pg_bin_dir = self.get_pg_bin_dir().await?;
313 366 : let createdb_path = pg_bin_dir.join("createdb");
314 366 : let output = Command::new(&createdb_path)
315 366 : .args([
316 366 : "-h",
317 366 : "localhost",
318 366 : "-p",
319 366 : &format!("{}", self.postgres_port),
320 366 : &DB_NAME,
321 366 : ])
322 366 : .output()
323 381 : .await
324 366 : .expect("Failed to spawn createdb");
325 366 :
326 366 : if !output.status.success() {
327 8 : let stderr = String::from_utf8(output.stderr).expect("Non-UTF8 output from createdb");
328 8 : if stderr.contains("already exists") {
329 0 : tracing::info!("Database {DB_NAME} already exists");
330 : } else {
331 0 : anyhow::bail!("createdb failed with status {}: {stderr}", output.status);
332 : }
333 358 : }
334 :
335 366 : Ok(database_url)
336 366 : }
337 :
338 366 : pub async fn start(&self) -> anyhow::Result<()> {
339 366 : // Start a vanilla Postgres process used by the attachment service for persistence.
340 366 : let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
341 366 : .unwrap()
342 366 : .join("attachment_service_db");
343 368 : let pg_bin_dir = self.get_pg_bin_dir().await?;
344 366 : let pg_log_path = pg_data_path.join("postgres.log");
345 366 :
346 366 : if !tokio::fs::try_exists(&pg_data_path).await? {
347 : // Initialize empty database
348 358 : let initdb_path = pg_bin_dir.join("initdb");
349 358 : let mut child = Command::new(&initdb_path)
350 358 : .args(["-D", pg_data_path.as_ref()])
351 358 : .spawn()
352 358 : .expect("Failed to spawn initdb");
353 358 : let status = child.wait().await?;
354 358 : if !status.success() {
355 0 : anyhow::bail!("initdb failed with status {status}");
356 358 : }
357 358 :
358 358 : tokio::fs::write(
359 358 : &pg_data_path.join("postgresql.conf"),
360 358 : format!("port = {}", self.postgres_port),
361 358 : )
362 358 : .await?;
363 8 : };
364 :
365 366 : println!("Starting attachment service database...");
366 366 : let db_start_args = [
367 366 : "-w",
368 366 : "-D",
369 366 : pg_data_path.as_ref(),
370 366 : "-l",
371 366 : pg_log_path.as_ref(),
372 366 : "start",
373 366 : ];
374 366 :
375 366 : background_process::start_process(
376 366 : "attachment_service_db",
377 366 : &self.env.base_data_dir,
378 366 : pg_bin_dir.join("pg_ctl").as_std_path(),
379 366 : db_start_args,
380 366 : [],
381 366 : background_process::InitialPidFile::Create(self.postgres_pid_file()),
382 736 : || self.pg_isready(&pg_bin_dir),
383 366 : )
384 1099 : .await?;
385 :
386 : // Run migrations on every startup, in case something changed.
387 749 : let database_url = self.setup_database().await?;
388 :
389 366 : let mut args = vec![
390 366 : "-l",
391 366 : &self.listen,
392 366 : "-p",
393 366 : self.path.as_ref(),
394 366 : "--database-url",
395 366 : &database_url,
396 366 : ]
397 366 : .into_iter()
398 2196 : .map(|s| s.to_string())
399 366 : .collect::<Vec<_>>();
400 366 : if let Some(jwt_token) = &self.jwt_token {
401 11 : args.push(format!("--jwt-token={jwt_token}"));
402 355 : }
403 :
404 366 : if let Some(public_key) = &self.public_key {
405 11 : args.push(format!("--public-key=\"{public_key}\""));
406 355 : }
407 :
408 366 : if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {
409 2 : args.push(format!(
410 2 : "--compute-hook-url={control_plane_compute_hook_api}"
411 2 : ));
412 364 : }
413 :
414 366 : background_process::start_process(
415 366 : COMMAND,
416 366 : &self.env.base_data_dir,
417 366 : &self.env.attachment_service_bin(),
418 366 : args,
419 366 : [(
420 366 : "NEON_REPO_DIR".to_string(),
421 366 : self.env.base_data_dir.to_string_lossy().to_string(),
422 366 : )],
423 366 : background_process::InitialPidFile::Create(self.pid_file()),
424 778 : || async {
425 1510 : match self.status().await {
426 366 : Ok(_) => Ok(true),
427 412 : Err(_) => Ok(false),
428 : }
429 1556 : },
430 366 : )
431 1510 : .await?;
432 :
433 366 : Ok(())
434 366 : }
435 :
436 368 : pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> {
437 368 : background_process::stop_process(immediate, COMMAND, &self.pid_file())?;
438 :
439 368 : let pg_data_path = self.env.base_data_dir.join("attachment_service_db");
440 370 : let pg_bin_dir = self.get_pg_bin_dir().await?;
441 :
442 368 : println!("Stopping attachment service database...");
443 368 : let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
444 368 : let stop_status = Command::new(pg_bin_dir.join("pg_ctl"))
445 368 : .args(pg_stop_args)
446 368 : .spawn()?
447 368 : .wait()
448 368 : .await?;
449 368 : if !stop_status.success() {
450 2 : let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
451 2 : let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl"))
452 2 : .args(pg_status_args)
453 2 : .spawn()?
454 2 : .wait()
455 2 : .await?;
456 :
457 : // pg_ctl status returns this exit code if postgres is not running: in this case it is
458 : // fine that stop failed. Otherwise it is an error that stop failed.
459 : const PG_STATUS_NOT_RUNNING: i32 = 3;
460 2 : if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() {
461 2 : println!("Attachment service data base is already stopped");
462 2 : return Ok(());
463 : } else {
464 0 : anyhow::bail!("Failed to stop attachment service database: {stop_status}")
465 : }
466 366 : }
467 366 :
468 366 : Ok(())
469 368 : }
470 :
471 : /// Simple HTTP request wrapper for calling into attachment service
472 3234 : async fn dispatch<RQ, RS>(
473 3234 : &self,
474 3234 : method: hyper::Method,
475 3234 : path: String,
476 3234 : body: Option<RQ>,
477 3234 : ) -> anyhow::Result<RS>
478 3234 : where
479 3234 : RQ: Serialize + Sized,
480 3234 : RS: DeserializeOwned + Sized,
481 3234 : {
482 3234 : // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
483 3234 : // for general purpose API access.
484 3234 : let listen_url = self.env.control_plane_api.clone().unwrap();
485 3234 : let url = Url::from_str(&format!(
486 3234 : "http://{}:{}/{path}",
487 3234 : listen_url.host_str().unwrap(),
488 3234 : listen_url.port().unwrap()
489 3234 : ))
490 3234 : .unwrap();
491 3234 :
492 3234 : let mut builder = self.client.request(method, url);
493 3234 : if let Some(body) = body {
494 1884 : builder = builder.json(&body)
495 1350 : }
496 3234 : if let Some(jwt_token) = &self.jwt_token {
497 82 : builder = builder.header(
498 82 : reqwest::header::AUTHORIZATION,
499 82 : format!("Bearer {jwt_token}"),
500 82 : );
501 3152 : }
502 :
503 7960 : let response = builder.send().await?;
504 2822 : let response = response.error_from_body().await?;
505 :
506 2817 : Ok(response
507 2817 : .json()
508 0 : .await
509 2817 : .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
510 3234 : }
511 :
512 : /// Call into the attach_hook API, for use before handing out attachments to pageservers
513 0 : #[instrument(skip(self))]
514 : pub async fn attach_hook(
515 : &self,
516 : tenant_shard_id: TenantShardId,
517 : pageserver_id: NodeId,
518 : ) -> anyhow::Result<Option<u32>> {
519 : let request = AttachHookRequest {
520 : tenant_shard_id,
521 : node_id: Some(pageserver_id),
522 : };
523 :
524 : let response = self
525 : .dispatch::<_, AttachHookResponse>(
526 : Method::POST,
527 : "debug/v1/attach-hook".to_string(),
528 : Some(request),
529 : )
530 : .await?;
531 :
532 : Ok(response.gen)
533 : }
534 :
535 0 : #[instrument(skip(self))]
536 : pub async fn inspect(
537 : &self,
538 : tenant_shard_id: TenantShardId,
539 : ) -> anyhow::Result<Option<(u32, NodeId)>> {
540 : let request = InspectRequest { tenant_shard_id };
541 :
542 : let response = self
543 : .dispatch::<_, InspectResponse>(
544 : Method::POST,
545 : "debug/v1/inspect".to_string(),
546 : Some(request),
547 : )
548 : .await?;
549 :
550 : Ok(response.attachment)
551 : }
552 :
553 1380 : #[instrument(skip(self))]
554 : pub async fn tenant_create(
555 : &self,
556 : req: TenantCreateRequest,
557 : ) -> anyhow::Result<TenantCreateResponse> {
558 : self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req))
559 : .await
560 : }
561 :
562 1716 : #[instrument(skip(self))]
563 : pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
564 : self.dispatch::<(), _>(
565 : Method::GET,
566 : format!("control/v1/tenant/{tenant_id}/locate"),
567 : None,
568 : )
569 : .await
570 : }
571 :
572 12 : #[instrument(skip(self))]
573 : pub async fn tenant_migrate(
574 : &self,
575 : tenant_shard_id: TenantShardId,
576 : node_id: NodeId,
577 : ) -> anyhow::Result<TenantShardMigrateResponse> {
578 : self.dispatch(
579 : Method::PUT,
580 : format!("control/v1/tenant/{tenant_shard_id}/migrate"),
581 : Some(TenantShardMigrateRequest {
582 : tenant_shard_id,
583 : node_id,
584 : }),
585 : )
586 : .await
587 : }
588 :
589 0 : #[instrument(skip(self), fields(%tenant_id, %new_shard_count))]
590 : pub async fn tenant_split(
591 : &self,
592 : tenant_id: TenantId,
593 : new_shard_count: u8,
594 : ) -> anyhow::Result<TenantShardSplitResponse> {
595 : self.dispatch(
596 : Method::PUT,
597 : format!("control/v1/tenant/{tenant_id}/shard_split"),
598 : Some(TenantShardSplitRequest { new_shard_count }),
599 : )
600 : .await
601 : }
602 :
603 1248 : #[instrument(skip_all, fields(node_id=%req.node_id))]
604 : pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
605 : self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))
606 : .await
607 : }
608 :
609 0 : #[instrument(skip_all, fields(node_id=%req.node_id))]
610 : pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> {
611 : self.dispatch::<_, ()>(
612 : Method::PUT,
613 : format!("control/v1/node/{}/config", req.node_id),
614 : Some(req),
615 : )
616 : .await
617 : }
618 :
619 1556 : #[instrument(skip(self))]
620 : pub async fn status(&self) -> anyhow::Result<()> {
621 : self.dispatch::<(), ()>(Method::GET, "status".to_string(), None)
622 : .await
623 : }
624 :
625 1592 : #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
626 : pub async fn tenant_timeline_create(
627 : &self,
628 : tenant_id: TenantId,
629 : req: TimelineCreateRequest,
630 : ) -> anyhow::Result<TimelineInfo> {
631 : self.dispatch(
632 : Method::POST,
633 : format!("v1/tenant/{tenant_id}/timeline"),
634 : Some(req),
635 : )
636 : .await
637 : }
638 : }
|