Line data Source code
1 : //! This module implements Timeline lifecycle management and has all necessary code
2 : //! to glue together SafeKeeper and all other background services.
3 :
4 : use anyhow::{anyhow, bail, Result};
5 : use camino::{Utf8Path, Utf8PathBuf};
6 : use remote_storage::RemotePath;
7 : use safekeeper_api::models::TimelineTermBumpResponse;
8 : use serde::{Deserialize, Serialize};
9 : use tokio::fs::{self};
10 : use tokio_util::sync::CancellationToken;
11 : use utils::id::TenantId;
12 : use utils::sync::gate::Gate;
13 :
14 : use std::cmp::max;
15 : use std::ops::{Deref, DerefMut};
16 : use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17 : use std::sync::Arc;
18 : use std::time::Duration;
19 : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
20 : use tokio::{sync::watch, time::Instant};
21 : use tracing::*;
22 : use utils::http::error::ApiError;
23 : use utils::{
24 : id::{NodeId, TenantTimelineId},
25 : lsn::Lsn,
26 : };
27 :
28 : use storage_broker::proto::SafekeeperTimelineInfo;
29 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
30 :
31 : use crate::control_file;
32 : use crate::rate_limit::RateLimiter;
33 : use crate::receive_wal::WalReceivers;
34 : use crate::safekeeper::{
35 : AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, Term, TermLsn,
36 : };
37 : use crate::send_wal::WalSenders;
38 : use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
39 : use crate::timeline_guard::ResidenceGuard;
40 : use crate::timeline_manager::{AtomicStatus, ManagerCtl};
41 : use crate::timelines_set::TimelinesSet;
42 : use crate::wal_backup::{self, remote_timeline_path};
43 : use crate::wal_backup_partial::PartialRemoteSegment;
44 :
45 : use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
46 : use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
47 : use crate::{debug_dump, timeline_manager, wal_storage};
48 : use crate::{GlobalTimelines, SafeKeeperConf};
49 :
50 : /// Things safekeeper should know about timeline state on peers.
51 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
52 : pub struct PeerInfo {
53 : pub sk_id: NodeId,
54 : pub term: Term,
55 : /// Term of the last entry.
56 : pub last_log_term: Term,
57 : /// LSN of the last record.
58 : pub flush_lsn: Lsn,
59 : pub commit_lsn: Lsn,
60 : /// Since which LSN safekeeper has WAL.
61 : pub local_start_lsn: Lsn,
62 : /// When info was received. Serde annotations are not very useful but make
63 : /// the code compile -- we don't rely on this field externally.
64 : #[serde(skip)]
65 : #[serde(default = "Instant::now")]
66 : ts: Instant,
67 : pub pg_connstr: String,
68 : pub http_connstr: String,
69 : }
70 :
71 : impl PeerInfo {
72 0 : fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
73 0 : PeerInfo {
74 0 : sk_id: NodeId(sk_info.safekeeper_id),
75 0 : term: sk_info.term,
76 0 : last_log_term: sk_info.last_log_term,
77 0 : flush_lsn: Lsn(sk_info.flush_lsn),
78 0 : commit_lsn: Lsn(sk_info.commit_lsn),
79 0 : local_start_lsn: Lsn(sk_info.local_start_lsn),
80 0 : pg_connstr: sk_info.safekeeper_connstr.clone(),
81 0 : http_connstr: sk_info.http_connstr.clone(),
82 0 : ts,
83 0 : }
84 0 : }
85 : }
86 :
87 : // vector-based node id -> peer state map with very limited functionality we
88 : // need.
89 : #[derive(Debug, Clone, Default)]
90 : pub struct PeersInfo(pub Vec<PeerInfo>);
91 :
92 : impl PeersInfo {
93 0 : fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
94 0 : self.0.iter_mut().find(|p| p.sk_id == id)
95 0 : }
96 :
97 0 : fn upsert(&mut self, p: &PeerInfo) {
98 0 : match self.get(p.sk_id) {
99 0 : Some(rp) => *rp = p.clone(),
100 0 : None => self.0.push(p.clone()),
101 : }
102 0 : }
103 : }
104 :
105 : pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
106 :
107 : /// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
108 : /// automatically updates `watch::Sender` channels with state on drop.
109 : pub struct WriteGuardSharedState<'a> {
110 : tli: Arc<Timeline>,
111 : guard: RwLockWriteGuard<'a, SharedState>,
112 : }
113 :
114 : impl<'a> WriteGuardSharedState<'a> {
115 0 : fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
116 0 : WriteGuardSharedState { tli, guard }
117 0 : }
118 : }
119 :
120 : impl Deref for WriteGuardSharedState<'_> {
121 : type Target = SharedState;
122 :
123 0 : fn deref(&self) -> &Self::Target {
124 0 : &self.guard
125 0 : }
126 : }
127 :
128 : impl DerefMut for WriteGuardSharedState<'_> {
129 0 : fn deref_mut(&mut self) -> &mut Self::Target {
130 0 : &mut self.guard
131 0 : }
132 : }
133 :
134 : impl Drop for WriteGuardSharedState<'_> {
135 0 : fn drop(&mut self) {
136 0 : let term_flush_lsn =
137 0 : TermLsn::from((self.guard.sk.last_log_term(), self.guard.sk.flush_lsn()));
138 0 : let commit_lsn = self.guard.sk.state().inmem.commit_lsn;
139 0 :
140 0 : let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
141 0 : if *old != term_flush_lsn {
142 0 : *old = term_flush_lsn;
143 0 : true
144 : } else {
145 0 : false
146 : }
147 0 : });
148 0 :
149 0 : let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
150 0 : if *old != commit_lsn {
151 0 : *old = commit_lsn;
152 0 : true
153 : } else {
154 0 : false
155 : }
156 0 : });
157 0 :
158 0 : // send notification about shared state update
159 0 : self.tli.shared_state_version_tx.send_modify(|old| {
160 0 : *old += 1;
161 0 : });
162 0 : }
163 : }
164 :
165 : /// This structure is stored in shared state and represents the state of the timeline.
166 : ///
167 : /// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
168 : /// case, SafeKeeper is not available (because WAL is not present on disk) and all
169 : /// operations can be done only with control file.
170 : pub enum StateSK {
171 : Loaded(SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>),
172 : Offloaded(Box<TimelineState<control_file::FileStorage>>),
173 : // Not used, required for moving between states.
174 : Empty,
175 : }
176 :
177 : impl StateSK {
178 0 : pub fn flush_lsn(&self) -> Lsn {
179 0 : match self {
180 0 : StateSK::Loaded(sk) => sk.wal_store.flush_lsn(),
181 0 : StateSK::Offloaded(state) => match state.eviction_state {
182 0 : EvictionState::Offloaded(flush_lsn) => flush_lsn,
183 0 : _ => panic!("StateSK::Offloaded mismatches with eviction_state from control_file"),
184 : },
185 0 : StateSK::Empty => unreachable!(),
186 : }
187 0 : }
188 :
189 : /// Get a reference to the control file's timeline state.
190 0 : pub fn state(&self) -> &TimelineState<control_file::FileStorage> {
191 0 : match self {
192 0 : StateSK::Loaded(sk) => &sk.state,
193 0 : StateSK::Offloaded(ref s) => s,
194 0 : StateSK::Empty => unreachable!(),
195 : }
196 0 : }
197 :
198 0 : pub fn state_mut(&mut self) -> &mut TimelineState<control_file::FileStorage> {
199 0 : match self {
200 0 : StateSK::Loaded(sk) => &mut sk.state,
201 0 : StateSK::Offloaded(ref mut s) => s,
202 0 : StateSK::Empty => unreachable!(),
203 : }
204 0 : }
205 :
206 0 : pub fn last_log_term(&self) -> Term {
207 0 : self.state()
208 0 : .acceptor_state
209 0 : .get_last_log_term(self.flush_lsn())
210 0 : }
211 :
212 0 : pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
213 0 : self.state_mut().term_bump(to).await
214 0 : }
215 :
216 : /// Close open WAL files to release FDs.
217 0 : fn close_wal_store(&mut self) {
218 0 : if let StateSK::Loaded(sk) = self {
219 0 : sk.wal_store.close();
220 0 : }
221 0 : }
222 :
223 : /// Update timeline state with peer safekeeper data.
224 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
225 0 : // update commit_lsn if safekeeper is loaded
226 0 : match self {
227 0 : StateSK::Loaded(sk) => sk.record_safekeeper_info(sk_info).await?,
228 0 : StateSK::Offloaded(_) => {}
229 0 : StateSK::Empty => unreachable!(),
230 : }
231 :
232 : // update everything else, including remote_consistent_lsn and backup_lsn
233 0 : let mut sync_control_file = false;
234 0 : let state = self.state_mut();
235 0 : let wal_seg_size = state.server.wal_seg_size as u64;
236 0 :
237 0 : state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
238 0 : sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
239 0 :
240 0 : state.inmem.remote_consistent_lsn = max(
241 0 : Lsn(sk_info.remote_consistent_lsn),
242 0 : state.inmem.remote_consistent_lsn,
243 0 : );
244 0 : sync_control_file |=
245 0 : state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
246 0 :
247 0 : state.inmem.peer_horizon_lsn =
248 0 : max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
249 0 : sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;
250 0 :
251 0 : if sync_control_file {
252 0 : state.flush().await?;
253 0 : }
254 0 : Ok(())
255 0 : }
256 :
257 : /// Previously known as epoch_start_lsn. Needed only for reference in some APIs.
258 0 : pub fn term_start_lsn(&self) -> Lsn {
259 0 : match self {
260 0 : StateSK::Loaded(sk) => sk.term_start_lsn,
261 0 : StateSK::Offloaded(_) => Lsn(0),
262 0 : StateSK::Empty => unreachable!(),
263 : }
264 0 : }
265 :
266 : /// Used for metrics only.
267 0 : pub fn wal_storage_metrics(&self) -> WalStorageMetrics {
268 0 : match self {
269 0 : StateSK::Loaded(sk) => sk.wal_store.get_metrics(),
270 0 : StateSK::Offloaded(_) => WalStorageMetrics::default(),
271 0 : StateSK::Empty => unreachable!(),
272 : }
273 0 : }
274 :
275 : /// Returns WAL storage internal LSNs for debug dump.
276 0 : pub fn wal_storage_internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
277 0 : match self {
278 0 : StateSK::Loaded(sk) => sk.wal_store.internal_state(),
279 : StateSK::Offloaded(_) => {
280 0 : let flush_lsn = self.flush_lsn();
281 0 : (flush_lsn, flush_lsn, flush_lsn, false)
282 : }
283 0 : StateSK::Empty => unreachable!(),
284 : }
285 0 : }
286 :
287 : /// Access to SafeKeeper object. Panics if offloaded, should be good to use from WalResidentTimeline.
288 0 : pub fn safekeeper(
289 0 : &mut self,
290 0 : ) -> &mut SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage> {
291 0 : match self {
292 0 : StateSK::Loaded(sk) => sk,
293 : StateSK::Offloaded(_) => {
294 0 : panic!("safekeeper is offloaded, cannot be used")
295 : }
296 0 : StateSK::Empty => unreachable!(),
297 : }
298 0 : }
299 :
300 : /// Moves control file's state structure out of the enum. Used to switch states.
301 0 : fn take_state(self) -> TimelineState<control_file::FileStorage> {
302 0 : match self {
303 0 : StateSK::Loaded(sk) => sk.state,
304 0 : StateSK::Offloaded(state) => *state,
305 0 : StateSK::Empty => unreachable!(),
306 : }
307 0 : }
308 : }
309 :
310 : /// Shared state associated with database instance
311 : pub struct SharedState {
312 : /// Safekeeper object
313 : pub(crate) sk: StateSK,
314 : /// In memory list containing state of peers sent in latest messages from them.
315 : pub(crate) peers_info: PeersInfo,
316 : // True value hinders old WAL removal; this is used by snapshotting. We
317 : // could make it a counter, but there is no need to.
318 : pub(crate) wal_removal_on_hold: bool,
319 : }
320 :
321 : impl SharedState {
322 : /// Creates a new SharedState.
323 0 : pub fn new(sk: StateSK) -> Self {
324 0 : Self {
325 0 : sk,
326 0 : peers_info: PeersInfo(vec![]),
327 0 : wal_removal_on_hold: false,
328 0 : }
329 0 : }
330 :
331 : /// Restore SharedState from control file. If file doesn't exist, bails out.
332 0 : pub fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
333 0 : let timeline_dir = get_timeline_dir(conf, ttid);
334 0 : let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?;
335 0 : if control_store.server.wal_seg_size == 0 {
336 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
337 0 : }
338 :
339 0 : let sk = match control_store.eviction_state {
340 : EvictionState::Present => {
341 0 : let wal_store = wal_storage::PhysicalStorage::new(
342 0 : ttid,
343 0 : &timeline_dir,
344 0 : &control_store,
345 0 : conf.no_sync,
346 0 : )?;
347 0 : StateSK::Loaded(SafeKeeper::new(
348 0 : TimelineState::new(control_store),
349 0 : wal_store,
350 0 : conf.my_id,
351 0 : )?)
352 : }
353 : EvictionState::Offloaded(_) => {
354 0 : StateSK::Offloaded(Box::new(TimelineState::new(control_store)))
355 : }
356 : };
357 :
358 0 : Ok(Self::new(sk))
359 0 : }
360 :
361 0 : pub(crate) fn get_wal_seg_size(&self) -> usize {
362 0 : self.sk.state().server.wal_seg_size as usize
363 0 : }
364 :
365 0 : fn get_safekeeper_info(
366 0 : &self,
367 0 : ttid: &TenantTimelineId,
368 0 : conf: &SafeKeeperConf,
369 0 : standby_apply_lsn: Lsn,
370 0 : ) -> SafekeeperTimelineInfo {
371 0 : SafekeeperTimelineInfo {
372 0 : safekeeper_id: conf.my_id.0,
373 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
374 0 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
375 0 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
376 0 : }),
377 0 : term: self.sk.state().acceptor_state.term,
378 0 : last_log_term: self.sk.last_log_term(),
379 0 : flush_lsn: self.sk.flush_lsn().0,
380 0 : // note: this value is not flushed to control file yet and can be lost
381 0 : commit_lsn: self.sk.state().inmem.commit_lsn.0,
382 0 : remote_consistent_lsn: self.sk.state().inmem.remote_consistent_lsn.0,
383 0 : peer_horizon_lsn: self.sk.state().inmem.peer_horizon_lsn.0,
384 0 : safekeeper_connstr: conf
385 0 : .advertise_pg_addr
386 0 : .to_owned()
387 0 : .unwrap_or(conf.listen_pg_addr.clone()),
388 0 : http_connstr: conf.listen_http_addr.to_owned(),
389 0 : backup_lsn: self.sk.state().inmem.backup_lsn.0,
390 0 : local_start_lsn: self.sk.state().local_start_lsn.0,
391 0 : availability_zone: conf.availability_zone.clone(),
392 0 : standby_horizon: standby_apply_lsn.0,
393 0 : }
394 0 : }
395 :
396 : /// Get our latest view of alive peers status on the timeline.
397 : /// We pass our own info through the broker as well, so when we don't have connection
398 : /// to the broker returned vec is empty.
399 0 : pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
400 0 : let now = Instant::now();
401 0 : self.peers_info
402 0 : .0
403 0 : .iter()
404 0 : // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
405 0 : .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
406 0 : .cloned()
407 0 : .collect()
408 0 : }
409 : }
410 :
411 0 : #[derive(Debug, thiserror::Error)]
412 : pub enum TimelineError {
413 : #[error("Timeline {0} was cancelled and cannot be used anymore")]
414 : Cancelled(TenantTimelineId),
415 : #[error("Timeline {0} was not found in global map")]
416 : NotFound(TenantTimelineId),
417 : #[error("Timeline {0} creation is in progress")]
418 : CreationInProgress(TenantTimelineId),
419 : #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
420 : Invalid(TenantTimelineId),
421 : #[error("Timeline {0} is already exists")]
422 : AlreadyExists(TenantTimelineId),
423 : #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
424 : UninitializedWalSegSize(TenantTimelineId),
425 : #[error("Timeline {0} is not initialized, pg_version is unknown")]
426 : UninitialinzedPgVersion(TenantTimelineId),
427 : }
428 :
429 : // Convert to HTTP API error.
430 : impl From<TimelineError> for ApiError {
431 0 : fn from(te: TimelineError) -> ApiError {
432 0 : match te {
433 0 : TimelineError::NotFound(ttid) => {
434 0 : ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
435 : }
436 0 : _ => ApiError::InternalServerError(anyhow!("{}", te)),
437 : }
438 0 : }
439 : }
440 :
441 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
442 : /// It also holds SharedState and provides mutually exclusive access to it.
443 : pub struct Timeline {
444 : pub ttid: TenantTimelineId,
445 : pub remote_path: RemotePath,
446 :
447 : /// Used to broadcast commit_lsn updates to all background jobs.
448 : commit_lsn_watch_tx: watch::Sender<Lsn>,
449 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
450 :
451 : /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
452 : /// them when sending in recovery mode (to walproposer or peers). Note: this
453 : /// is just a notification, WAL reading should always done with lock held as
454 : /// term can change otherwise.
455 : term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
456 : term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
457 :
458 : /// Broadcasts shared state updates.
459 : shared_state_version_tx: watch::Sender<usize>,
460 : shared_state_version_rx: watch::Receiver<usize>,
461 :
462 : /// Safekeeper and other state, that should remain consistent and
463 : /// synchronized with the disk. This is tokio mutex as we write WAL to disk
464 : /// while holding it, ensuring that consensus checks are in order.
465 : mutex: RwLock<SharedState>,
466 : walsenders: Arc<WalSenders>,
467 : walreceivers: Arc<WalReceivers>,
468 : timeline_dir: Utf8PathBuf,
469 : manager_ctl: ManagerCtl,
470 :
471 : /// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
472 : /// this gate, you must respect [`Timeline::cancel`]
473 : pub(crate) gate: Gate,
474 :
475 : /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
476 : pub(crate) cancel: CancellationToken,
477 :
478 : // timeline_manager controlled state
479 : pub(crate) broker_active: AtomicBool,
480 : pub(crate) wal_backup_active: AtomicBool,
481 : pub(crate) last_removed_segno: AtomicU64,
482 : pub(crate) mgr_status: AtomicStatus,
483 : }
484 :
485 : impl Timeline {
486 : /// Constructs a new timeline.
487 0 : pub fn new(
488 0 : ttid: TenantTimelineId,
489 0 : timeline_dir: &Utf8Path,
490 0 : remote_path: &RemotePath,
491 0 : shared_state: SharedState,
492 0 : ) -> Arc<Self> {
493 0 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
494 0 : watch::channel(shared_state.sk.state().commit_lsn);
495 0 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
496 0 : shared_state.sk.last_log_term(),
497 0 : shared_state.sk.flush_lsn(),
498 0 : )));
499 0 : let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
500 0 :
501 0 : let walreceivers = WalReceivers::new();
502 0 :
503 0 : Arc::new(Self {
504 0 : ttid,
505 0 : remote_path: remote_path.to_owned(),
506 0 : timeline_dir: timeline_dir.to_owned(),
507 0 : commit_lsn_watch_tx,
508 0 : commit_lsn_watch_rx,
509 0 : term_flush_lsn_watch_tx,
510 0 : term_flush_lsn_watch_rx,
511 0 : shared_state_version_tx,
512 0 : shared_state_version_rx,
513 0 : mutex: RwLock::new(shared_state),
514 0 : walsenders: WalSenders::new(walreceivers.clone()),
515 0 : walreceivers,
516 0 : gate: Default::default(),
517 0 : cancel: CancellationToken::default(),
518 0 : manager_ctl: ManagerCtl::new(),
519 0 : broker_active: AtomicBool::new(false),
520 0 : wal_backup_active: AtomicBool::new(false),
521 0 : last_removed_segno: AtomicU64::new(0),
522 0 : mgr_status: AtomicStatus::new(),
523 0 : })
524 0 : }
525 :
526 : /// Load existing timeline from disk.
527 0 : pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
528 0 : let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
529 :
530 0 : let shared_state = SharedState::restore(conf, &ttid)?;
531 0 : let timeline_dir = get_timeline_dir(conf, &ttid);
532 0 : let remote_path = remote_timeline_path(&ttid)?;
533 :
534 0 : Ok(Timeline::new(
535 0 : ttid,
536 0 : &timeline_dir,
537 0 : &remote_path,
538 0 : shared_state,
539 0 : ))
540 0 : }
541 :
542 : /// Bootstrap new or existing timeline starting background tasks.
543 0 : pub fn bootstrap(
544 0 : self: &Arc<Timeline>,
545 0 : _shared_state: &mut WriteGuardSharedState<'_>,
546 0 : conf: &SafeKeeperConf,
547 0 : broker_active_set: Arc<TimelinesSet>,
548 0 : partial_backup_rate_limiter: RateLimiter,
549 0 : ) {
550 0 : let (tx, rx) = self.manager_ctl.bootstrap_manager();
551 :
552 0 : let Ok(gate_guard) = self.gate.enter() else {
553 : // Init raced with shutdown
554 0 : return;
555 : };
556 :
557 : // Start manager task which will monitor timeline state and update
558 : // background tasks.
559 0 : tokio::spawn({
560 0 : let this = self.clone();
561 0 : let conf = conf.clone();
562 0 : async move {
563 0 : let _gate_guard = gate_guard;
564 0 : timeline_manager::main_task(
565 0 : ManagerTimeline { tli: this },
566 0 : conf,
567 0 : broker_active_set,
568 0 : tx,
569 0 : rx,
570 0 : partial_backup_rate_limiter,
571 0 : )
572 0 : .await
573 0 : }
574 0 : });
575 0 : }
576 :
577 : /// Background timeline activities (which hold Timeline::gate) will no
578 : /// longer run once this function completes.
579 0 : pub async fn shutdown(&self) {
580 0 : info!("timeline {} shutting down", self.ttid);
581 0 : self.cancel.cancel();
582 0 :
583 0 : // Wait for any concurrent tasks to stop using this timeline, to avoid e.g. attempts
584 0 : // to read deleted files.
585 0 : self.gate.close().await;
586 0 : }
587 :
588 : /// Delete timeline from disk completely, by removing timeline directory.
589 : ///
590 : /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
591 : /// deletion API endpoint is retriable.
592 : ///
593 : /// Timeline must be in shut-down state (i.e. call [`Self::shutdown`] first)
594 0 : pub async fn delete(
595 0 : &self,
596 0 : shared_state: &mut WriteGuardSharedState<'_>,
597 0 : only_local: bool,
598 0 : ) -> Result<bool> {
599 0 : // Assert that [`Self::shutdown`] was already called
600 0 : assert!(self.cancel.is_cancelled());
601 0 : assert!(self.gate.close_complete());
602 :
603 : // Close associated FDs. Nobody will be able to touch timeline data once
604 : // it is cancelled, so WAL storage won't be opened again.
605 0 : shared_state.sk.close_wal_store();
606 0 :
607 0 : let conf = GlobalTimelines::get_global_config();
608 0 : if !only_local && conf.is_wal_backup_enabled() {
609 : // Note: we concurrently delete remote storage data from multiple
610 : // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
611 : // do some retries anyway.
612 0 : wal_backup::delete_timeline(&self.ttid).await?;
613 0 : }
614 0 : let dir_existed = delete_dir(&self.timeline_dir).await?;
615 0 : Ok(dir_existed)
616 0 : }
617 :
618 : /// Returns if timeline is cancelled.
619 0 : pub fn is_cancelled(&self) -> bool {
620 0 : self.cancel.is_cancelled()
621 0 : }
622 :
623 : /// Take a writing mutual exclusive lock on timeline shared_state.
624 0 : pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
625 0 : WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
626 0 : }
627 :
628 0 : pub async fn read_shared_state(&self) -> ReadGuardSharedState {
629 0 : self.mutex.read().await
630 0 : }
631 :
632 : /// Returns commit_lsn watch channel.
633 0 : pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
634 0 : self.commit_lsn_watch_rx.clone()
635 0 : }
636 :
637 : /// Returns term_flush_lsn watch channel.
638 0 : pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
639 0 : self.term_flush_lsn_watch_rx.clone()
640 0 : }
641 :
642 : /// Returns watch channel for SharedState update version.
643 0 : pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
644 0 : self.shared_state_version_rx.clone()
645 0 : }
646 :
647 : /// Returns wal_seg_size.
648 0 : pub async fn get_wal_seg_size(&self) -> usize {
649 0 : self.read_shared_state().await.get_wal_seg_size()
650 0 : }
651 :
652 : /// Returns state of the timeline.
653 0 : pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
654 0 : let state = self.read_shared_state().await;
655 0 : (
656 0 : state.sk.state().inmem.clone(),
657 0 : TimelinePersistentState::clone(state.sk.state()),
658 0 : )
659 0 : }
660 :
661 : /// Returns latest backup_lsn.
662 0 : pub async fn get_wal_backup_lsn(&self) -> Lsn {
663 0 : self.read_shared_state().await.sk.state().inmem.backup_lsn
664 0 : }
665 :
666 : /// Sets backup_lsn to the given value.
667 0 : pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
668 0 : if self.is_cancelled() {
669 0 : bail!(TimelineError::Cancelled(self.ttid));
670 0 : }
671 :
672 0 : let mut state = self.write_shared_state().await;
673 0 : state.sk.state_mut().inmem.backup_lsn = max(state.sk.state().inmem.backup_lsn, backup_lsn);
674 0 : // we should check whether to shut down offloader, but this will be done
675 0 : // soon by peer communication anyway.
676 0 : Ok(())
677 0 : }
678 :
679 : /// Get safekeeper info for broadcasting to broker and other peers.
680 0 : pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
681 0 : let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
682 0 : let shared_state = self.read_shared_state().await;
683 0 : shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
684 0 : }
685 :
686 : /// Update timeline state with peer safekeeper data.
687 0 : pub async fn record_safekeeper_info(
688 0 : self: &Arc<Self>,
689 0 : sk_info: SafekeeperTimelineInfo,
690 0 : ) -> Result<()> {
691 : {
692 0 : let mut shared_state = self.write_shared_state().await;
693 0 : shared_state.sk.record_safekeeper_info(&sk_info).await?;
694 0 : let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
695 0 : shared_state.peers_info.upsert(&peer_info);
696 0 : }
697 0 : Ok(())
698 0 : }
699 :
700 0 : pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
701 0 : let shared_state = self.read_shared_state().await;
702 0 : shared_state.get_peers(conf.heartbeat_timeout)
703 0 : }
704 :
705 0 : pub fn get_walsenders(&self) -> &Arc<WalSenders> {
706 0 : &self.walsenders
707 0 : }
708 :
709 0 : pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
710 0 : &self.walreceivers
711 0 : }
712 :
713 : /// Returns flush_lsn.
714 0 : pub async fn get_flush_lsn(&self) -> Lsn {
715 0 : self.read_shared_state().await.sk.flush_lsn()
716 0 : }
717 :
718 : /// Gather timeline data for metrics.
719 0 : pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
720 0 : if self.is_cancelled() {
721 0 : return None;
722 0 : }
723 0 :
724 0 : let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
725 0 : let state = self.read_shared_state().await;
726 0 : Some(FullTimelineInfo {
727 0 : ttid: self.ttid,
728 0 : ps_feedback_count,
729 0 : last_ps_feedback,
730 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
731 0 : timeline_is_active: self.broker_active.load(Ordering::Relaxed),
732 0 : num_computes: self.walreceivers.get_num() as u32,
733 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
734 0 : epoch_start_lsn: state.sk.term_start_lsn(),
735 0 : mem_state: state.sk.state().inmem.clone(),
736 0 : persisted_state: TimelinePersistentState::clone(state.sk.state()),
737 0 : flush_lsn: state.sk.flush_lsn(),
738 0 : wal_storage: state.sk.wal_storage_metrics(),
739 0 : })
740 0 : }
741 :
742 : /// Returns in-memory timeline state to build a full debug dump.
743 0 : pub async fn memory_dump(&self) -> debug_dump::Memory {
744 0 : let state = self.read_shared_state().await;
745 :
746 0 : let (write_lsn, write_record_lsn, flush_lsn, file_open) =
747 0 : state.sk.wal_storage_internal_state();
748 0 :
749 0 : debug_dump::Memory {
750 0 : is_cancelled: self.is_cancelled(),
751 0 : peers_info_len: state.peers_info.0.len(),
752 0 : walsenders: self.walsenders.get_all(),
753 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
754 0 : active: self.broker_active.load(Ordering::Relaxed),
755 0 : num_computes: self.walreceivers.get_num() as u32,
756 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
757 0 : epoch_start_lsn: state.sk.term_start_lsn(),
758 0 : mem_state: state.sk.state().inmem.clone(),
759 0 : mgr_status: self.mgr_status.get(),
760 0 : write_lsn,
761 0 : write_record_lsn,
762 0 : flush_lsn,
763 0 : file_open,
764 0 : }
765 0 : }
766 :
767 : /// Apply a function to the control file state and persist it.
768 0 : pub async fn map_control_file<T>(
769 0 : self: &Arc<Self>,
770 0 : f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
771 0 : ) -> Result<T> {
772 0 : let mut state = self.write_shared_state().await;
773 0 : let mut persistent_state = state.sk.state_mut().start_change();
774 : // If f returns error, we abort the change and don't persist anything.
775 0 : let res = f(&mut persistent_state)?;
776 : // If persisting fails, we abort the change and return error.
777 0 : state
778 0 : .sk
779 0 : .state_mut()
780 0 : .finish_change(&persistent_state)
781 0 : .await?;
782 0 : Ok(res)
783 0 : }
784 :
785 0 : pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
786 0 : let mut state = self.write_shared_state().await;
787 0 : state.sk.term_bump(to).await
788 0 : }
789 :
790 : /// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
791 0 : async fn do_wal_residence_guard(
792 0 : self: &Arc<Self>,
793 0 : block: bool,
794 0 : ) -> Result<Option<WalResidentTimeline>> {
795 0 : let op_label = if block {
796 0 : "wal_residence_guard"
797 : } else {
798 0 : "try_wal_residence_guard"
799 : };
800 :
801 0 : if self.is_cancelled() {
802 0 : bail!(TimelineError::Cancelled(self.ttid));
803 0 : }
804 0 :
805 0 : debug!("requesting WalResidentTimeline guard");
806 0 : let started_at = Instant::now();
807 0 : let status_before = self.mgr_status.get();
808 :
809 : // Wait 30 seconds for the guard to be acquired. It can time out if someone is
810 : // holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
811 : // is stuck.
812 0 : let res = tokio::time::timeout_at(started_at + Duration::from_secs(30), async {
813 0 : if block {
814 0 : self.manager_ctl.wal_residence_guard().await.map(Some)
815 : } else {
816 0 : self.manager_ctl.try_wal_residence_guard().await
817 : }
818 0 : })
819 0 : .await;
820 :
821 0 : let guard = match res {
822 0 : Ok(Ok(guard)) => {
823 0 : let finished_at = Instant::now();
824 0 : let elapsed = finished_at - started_at;
825 0 : MISC_OPERATION_SECONDS
826 0 : .with_label_values(&[op_label])
827 0 : .observe(elapsed.as_secs_f64());
828 0 :
829 0 : guard
830 : }
831 0 : Ok(Err(e)) => {
832 0 : warn!(
833 0 : "error acquiring in {op_label}, statuses {:?} => {:?}",
834 0 : status_before,
835 0 : self.mgr_status.get()
836 : );
837 0 : return Err(e);
838 : }
839 : Err(_) => {
840 0 : warn!(
841 0 : "timeout acquiring in {op_label} guard, statuses {:?} => {:?}",
842 0 : status_before,
843 0 : self.mgr_status.get()
844 : );
845 0 : anyhow::bail!("timeout while acquiring WalResidentTimeline guard");
846 : }
847 : };
848 :
849 0 : Ok(guard.map(|g| WalResidentTimeline::new(self.clone(), g)))
850 0 : }
851 :
852 : /// Get the timeline guard for reading/writing WAL files.
853 : /// If WAL files are not present on disk (evicted), they will be automatically
854 : /// downloaded from remote storage. This is done in the manager task, which is
855 : /// responsible for issuing all guards.
856 : ///
857 : /// NB: don't use this function from timeline_manager, it will deadlock.
858 : /// NB: don't use this function while holding shared_state lock.
859 0 : pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
860 0 : self.do_wal_residence_guard(true)
861 0 : .await
862 0 : .map(|m| m.expect("Always get Some in block=true mode"))
863 0 : }
864 :
865 : /// Get the timeline guard for reading/writing WAL files if the timeline is resident,
866 : /// else return None
867 0 : pub(crate) async fn try_wal_residence_guard(
868 0 : self: &Arc<Self>,
869 0 : ) -> Result<Option<WalResidentTimeline>> {
870 0 : self.do_wal_residence_guard(false).await
871 0 : }
872 :
873 0 : pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
874 0 : self.manager_ctl.backup_partial_reset().await
875 0 : }
876 : }
877 :
878 : /// This is a guard that allows to read/write disk timeline state.
879 : /// All tasks that are trying to read/write WAL from disk should use this guard.
880 : pub struct WalResidentTimeline {
881 : pub tli: Arc<Timeline>,
882 : _guard: ResidenceGuard,
883 : }
884 :
885 : impl WalResidentTimeline {
886 0 : pub fn new(tli: Arc<Timeline>, _guard: ResidenceGuard) -> Self {
887 0 : WalResidentTimeline { tli, _guard }
888 0 : }
889 : }
890 :
891 : impl Deref for WalResidentTimeline {
892 : type Target = Arc<Timeline>;
893 :
894 0 : fn deref(&self) -> &Self::Target {
895 0 : &self.tli
896 0 : }
897 : }
898 :
899 : impl WalResidentTimeline {
900 : /// Returns true if walsender should stop sending WAL to pageserver. We
901 : /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
902 : /// computes. While there might be nothing to stream already, we learn about
903 : /// remote_consistent_lsn update through replication feedback, and we want
904 : /// to stop pushing to the broker if pageserver is fully caughtup.
905 0 : pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
906 0 : if self.is_cancelled() {
907 0 : return true;
908 0 : }
909 0 : let shared_state = self.read_shared_state().await;
910 0 : if self.walreceivers.get_num() == 0 {
911 0 : return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet
912 0 : reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn;
913 0 : }
914 0 : false
915 0 : }
916 :
917 : /// Ensure that current term is t, erroring otherwise, and lock the state.
918 0 : pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
919 0 : let ss = self.read_shared_state().await;
920 0 : if ss.sk.state().acceptor_state.term != t {
921 0 : bail!(
922 0 : "failed to acquire term {}, current term {}",
923 0 : t,
924 0 : ss.sk.state().acceptor_state.term
925 0 : );
926 0 : }
927 0 : Ok(ss)
928 0 : }
929 :
930 : /// Pass arrived message to the safekeeper.
931 0 : pub async fn process_msg(
932 0 : &self,
933 0 : msg: &ProposerAcceptorMessage,
934 0 : ) -> Result<Option<AcceptorProposerMessage>> {
935 0 : if self.is_cancelled() {
936 0 : bail!(TimelineError::Cancelled(self.ttid));
937 0 : }
938 :
939 : let mut rmsg: Option<AcceptorProposerMessage>;
940 : {
941 0 : let mut shared_state = self.write_shared_state().await;
942 0 : rmsg = shared_state.sk.safekeeper().process_msg(msg).await?;
943 :
944 : // if this is AppendResponse, fill in proper hot standby feedback.
945 0 : if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
946 0 : resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
947 0 : }
948 : }
949 0 : Ok(rmsg)
950 0 : }
951 :
952 0 : pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
953 0 : let (_, persisted_state) = self.get_state().await;
954 0 : let enable_remote_read = GlobalTimelines::get_global_config().is_wal_backup_enabled();
955 0 :
956 0 : WalReader::new(
957 0 : &self.ttid,
958 0 : self.timeline_dir.clone(),
959 0 : &persisted_state,
960 0 : start_lsn,
961 0 : enable_remote_read,
962 0 : )
963 0 : }
964 :
965 0 : pub fn get_timeline_dir(&self) -> Utf8PathBuf {
966 0 : self.timeline_dir.clone()
967 0 : }
968 :
969 : /// Update in memory remote consistent lsn.
970 0 : pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
971 0 : let mut shared_state = self.write_shared_state().await;
972 0 : shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
973 0 : shared_state.sk.state().inmem.remote_consistent_lsn,
974 0 : candidate,
975 0 : );
976 0 : }
977 : }
978 :
979 : /// This struct contains methods that are used by timeline manager task.
980 : pub(crate) struct ManagerTimeline {
981 : pub(crate) tli: Arc<Timeline>,
982 : }
983 :
984 : impl Deref for ManagerTimeline {
985 : type Target = Arc<Timeline>;
986 :
987 0 : fn deref(&self) -> &Self::Target {
988 0 : &self.tli
989 0 : }
990 : }
991 :
992 : impl ManagerTimeline {
993 0 : pub(crate) fn timeline_dir(&self) -> &Utf8PathBuf {
994 0 : &self.tli.timeline_dir
995 0 : }
996 :
997 : /// Manager requests this state on startup.
998 0 : pub(crate) async fn bootstrap_mgr(&self) -> (bool, Option<PartialRemoteSegment>) {
999 0 : let shared_state = self.read_shared_state().await;
1000 0 : let is_offloaded = matches!(
1001 0 : shared_state.sk.state().eviction_state,
1002 : EvictionState::Offloaded(_)
1003 : );
1004 0 : let partial_backup_uploaded = shared_state.sk.state().partial_backup.uploaded_segment();
1005 0 :
1006 0 : (is_offloaded, partial_backup_uploaded)
1007 0 : }
1008 :
1009 : /// Try to switch state Present->Offloaded.
1010 0 : pub(crate) async fn switch_to_offloaded(
1011 0 : &self,
1012 0 : partial: &PartialRemoteSegment,
1013 0 : ) -> anyhow::Result<()> {
1014 0 : let mut shared = self.write_shared_state().await;
1015 :
1016 : // updating control file
1017 0 : let mut pstate = shared.sk.state_mut().start_change();
1018 :
1019 0 : if !matches!(pstate.eviction_state, EvictionState::Present) {
1020 0 : bail!(
1021 0 : "cannot switch to offloaded state, current state is {:?}",
1022 0 : pstate.eviction_state
1023 0 : );
1024 0 : }
1025 0 :
1026 0 : if partial.flush_lsn != shared.sk.flush_lsn() {
1027 0 : bail!(
1028 0 : "flush_lsn mismatch in partial backup, expected {}, got {}",
1029 0 : shared.sk.flush_lsn(),
1030 0 : partial.flush_lsn
1031 0 : );
1032 0 : }
1033 0 :
1034 0 : if partial.commit_lsn != pstate.commit_lsn {
1035 0 : bail!(
1036 0 : "commit_lsn mismatch in partial backup, expected {}, got {}",
1037 0 : pstate.commit_lsn,
1038 0 : partial.commit_lsn
1039 0 : );
1040 0 : }
1041 0 :
1042 0 : if partial.term != shared.sk.last_log_term() {
1043 0 : bail!(
1044 0 : "term mismatch in partial backup, expected {}, got {}",
1045 0 : shared.sk.last_log_term(),
1046 0 : partial.term
1047 0 : );
1048 0 : }
1049 0 :
1050 0 : pstate.eviction_state = EvictionState::Offloaded(shared.sk.flush_lsn());
1051 0 : shared.sk.state_mut().finish_change(&pstate).await?;
1052 : // control file is now switched to Offloaded state
1053 :
1054 : // now we can switch shared.sk to Offloaded, shouldn't fail
1055 0 : let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
1056 0 : let cfile_state = prev_sk.take_state();
1057 0 : shared.sk = StateSK::Offloaded(Box::new(cfile_state));
1058 0 :
1059 0 : Ok(())
1060 0 : }
1061 :
1062 : /// Try to switch state Offloaded->Present.
1063 0 : pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
1064 0 : let conf = GlobalTimelines::get_global_config();
1065 0 : let mut shared = self.write_shared_state().await;
1066 :
1067 : // trying to restore WAL storage
1068 0 : let wal_store = wal_storage::PhysicalStorage::new(
1069 0 : &self.ttid,
1070 0 : &self.timeline_dir,
1071 0 : shared.sk.state(),
1072 0 : conf.no_sync,
1073 0 : )?;
1074 :
1075 : // updating control file
1076 0 : let mut pstate = shared.sk.state_mut().start_change();
1077 :
1078 0 : if !matches!(pstate.eviction_state, EvictionState::Offloaded(_)) {
1079 0 : bail!(
1080 0 : "cannot switch to present state, current state is {:?}",
1081 0 : pstate.eviction_state
1082 0 : );
1083 0 : }
1084 0 :
1085 0 : if wal_store.flush_lsn() != shared.sk.flush_lsn() {
1086 0 : bail!(
1087 0 : "flush_lsn mismatch in restored WAL, expected {}, got {}",
1088 0 : shared.sk.flush_lsn(),
1089 0 : wal_store.flush_lsn()
1090 0 : );
1091 0 : }
1092 0 :
1093 0 : pstate.eviction_state = EvictionState::Present;
1094 0 : shared.sk.state_mut().finish_change(&pstate).await?;
1095 :
1096 : // now we can switch shared.sk to Present, shouldn't fail
1097 0 : let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
1098 0 : let cfile_state = prev_sk.take_state();
1099 0 : shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, conf.my_id)?);
1100 :
1101 0 : Ok(())
1102 0 : }
1103 :
1104 : /// Update current manager state, useful for debugging manager deadlocks.
1105 0 : pub(crate) fn set_status(&self, status: timeline_manager::Status) {
1106 0 : self.mgr_status.store(status, Ordering::Relaxed);
1107 0 : }
1108 : }
1109 :
1110 : /// Deletes directory and it's contents. Returns false if directory does not exist.
1111 0 : async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
1112 0 : match fs::remove_dir_all(path).await {
1113 0 : Ok(_) => Ok(true),
1114 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
1115 0 : Err(e) => Err(e.into()),
1116 : }
1117 0 : }
1118 :
1119 : /// Get a path to the tenant directory. If you just need to get a timeline directory,
1120 : /// use WalResidentTimeline::get_timeline_dir instead.
1121 0 : pub fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
1122 0 : conf.workdir.join(tenant_id.to_string())
1123 0 : }
1124 :
1125 : /// Get a path to the timeline directory. If you need to read WAL files from disk,
1126 : /// use WalResidentTimeline::get_timeline_dir instead. This function does not check
1127 : /// timeline eviction status and WAL files might not be present on disk.
1128 0 : pub fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
1129 0 : get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
1130 0 : }
|