Line data Source code
1 : //! This module implements Timeline lifecycle management and has all necessary code
2 : //! to glue together SafeKeeper and all other background services.
3 :
4 : use anyhow::{anyhow, bail, Result};
5 : use camino::{Utf8Path, Utf8PathBuf};
6 : use remote_storage::RemotePath;
7 : use safekeeper_api::membership::Configuration;
8 : use safekeeper_api::models::{
9 : PeerInfo, TimelineMembershipSwitchResponse, TimelineTermBumpResponse,
10 : };
11 : use safekeeper_api::Term;
12 : use tokio::fs::{self};
13 : use tokio_util::sync::CancellationToken;
14 : use utils::id::TenantId;
15 : use utils::sync::gate::Gate;
16 :
17 : use http_utils::error::ApiError;
18 : use std::cmp::max;
19 : use std::ops::{Deref, DerefMut};
20 : use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
21 : use std::sync::Arc;
22 : use std::time::Duration;
23 : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
24 : use tokio::{sync::watch, time::Instant};
25 : use tracing::*;
26 : use utils::{
27 : id::{NodeId, TenantTimelineId},
28 : lsn::Lsn,
29 : };
30 :
31 : use storage_broker::proto::SafekeeperTimelineInfo;
32 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
33 :
34 : use crate::control_file;
35 : use crate::rate_limit::RateLimiter;
36 : use crate::receive_wal::WalReceivers;
37 : use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn};
38 : use crate::send_wal::{WalSenders, WalSendersTimelineMetricValues};
39 : use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
40 : use crate::timeline_guard::ResidenceGuard;
41 : use crate::timeline_manager::{AtomicStatus, ManagerCtl};
42 : use crate::timelines_set::TimelinesSet;
43 : use crate::wal_backup::{self, remote_timeline_path};
44 : use crate::wal_backup_partial::PartialRemoteSegment;
45 :
46 : use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
47 : use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
48 : use crate::SafeKeeperConf;
49 : use crate::{debug_dump, timeline_manager, wal_storage};
50 :
51 0 : fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
52 0 : PeerInfo {
53 0 : sk_id: NodeId(sk_info.safekeeper_id),
54 0 : term: sk_info.term,
55 0 : last_log_term: sk_info.last_log_term,
56 0 : flush_lsn: Lsn(sk_info.flush_lsn),
57 0 : commit_lsn: Lsn(sk_info.commit_lsn),
58 0 : local_start_lsn: Lsn(sk_info.local_start_lsn),
59 0 : pg_connstr: sk_info.safekeeper_connstr.clone(),
60 0 : http_connstr: sk_info.http_connstr.clone(),
61 0 : ts,
62 0 : }
63 0 : }
64 :
65 : // vector-based node id -> peer state map with very limited functionality we
66 : // need.
67 : #[derive(Debug, Clone, Default)]
68 : pub struct PeersInfo(pub Vec<PeerInfo>);
69 :
70 : impl PeersInfo {
71 0 : fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
72 0 : self.0.iter_mut().find(|p| p.sk_id == id)
73 0 : }
74 :
75 0 : fn upsert(&mut self, p: &PeerInfo) {
76 0 : match self.get(p.sk_id) {
77 0 : Some(rp) => *rp = p.clone(),
78 0 : None => self.0.push(p.clone()),
79 : }
80 0 : }
81 : }
82 :
83 : pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
84 :
85 : /// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
86 : /// automatically updates `watch::Sender` channels with state on drop.
87 : pub struct WriteGuardSharedState<'a> {
88 : tli: Arc<Timeline>,
89 : guard: RwLockWriteGuard<'a, SharedState>,
90 : }
91 :
92 : impl<'a> WriteGuardSharedState<'a> {
93 1212 : fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
94 1212 : WriteGuardSharedState { tli, guard }
95 1212 : }
96 : }
97 :
98 : impl Deref for WriteGuardSharedState<'_> {
99 : type Target = SharedState;
100 :
101 0 : fn deref(&self) -> &Self::Target {
102 0 : &self.guard
103 0 : }
104 : }
105 :
106 : impl DerefMut for WriteGuardSharedState<'_> {
107 1209 : fn deref_mut(&mut self) -> &mut Self::Target {
108 1209 : &mut self.guard
109 1209 : }
110 : }
111 :
112 : impl Drop for WriteGuardSharedState<'_> {
113 1212 : fn drop(&mut self) {
114 1212 : let term_flush_lsn =
115 1212 : TermLsn::from((self.guard.sk.last_log_term(), self.guard.sk.flush_lsn()));
116 1212 : let commit_lsn = self.guard.sk.state().inmem.commit_lsn;
117 1212 :
118 1212 : let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
119 1212 : if *old != term_flush_lsn {
120 600 : *old = term_flush_lsn;
121 600 : true
122 : } else {
123 612 : false
124 : }
125 1212 : });
126 1212 :
127 1212 : let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
128 1212 : if *old != commit_lsn {
129 597 : *old = commit_lsn;
130 597 : true
131 : } else {
132 615 : false
133 : }
134 1212 : });
135 1212 :
136 1212 : // send notification about shared state update
137 1212 : self.tli.shared_state_version_tx.send_modify(|old| {
138 1212 : *old += 1;
139 1212 : });
140 1212 : }
141 : }
142 :
143 : /// This structure is stored in shared state and represents the state of the timeline.
144 : ///
145 : /// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
146 : /// case, SafeKeeper is not available (because WAL is not present on disk) and all
147 : /// operations can be done only with control file.
148 : pub enum StateSK {
149 : Loaded(SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>),
150 : Offloaded(Box<TimelineState<control_file::FileStorage>>),
151 : // Not used, required for moving between states.
152 : Empty,
153 : }
154 :
155 : impl StateSK {
156 2488 : pub fn flush_lsn(&self) -> Lsn {
157 2488 : match self {
158 2488 : StateSK::Loaded(sk) => sk.wal_store.flush_lsn(),
159 0 : StateSK::Offloaded(state) => match state.eviction_state {
160 0 : EvictionState::Offloaded(flush_lsn) => flush_lsn,
161 0 : _ => panic!("StateSK::Offloaded mismatches with eviction_state from control_file"),
162 : },
163 0 : StateSK::Empty => unreachable!(),
164 : }
165 2488 : }
166 :
167 : /// Get a reference to the control file's timeline state.
168 2507 : pub fn state(&self) -> &TimelineState<control_file::FileStorage> {
169 2507 : match self {
170 2507 : StateSK::Loaded(sk) => &sk.state,
171 0 : StateSK::Offloaded(ref s) => s,
172 0 : StateSK::Empty => unreachable!(),
173 : }
174 2507 : }
175 :
176 9 : pub fn state_mut(&mut self) -> &mut TimelineState<control_file::FileStorage> {
177 9 : match self {
178 9 : StateSK::Loaded(sk) => &mut sk.state,
179 0 : StateSK::Offloaded(ref mut s) => s,
180 0 : StateSK::Empty => unreachable!(),
181 : }
182 9 : }
183 :
184 1244 : pub fn last_log_term(&self) -> Term {
185 1244 : self.state()
186 1244 : .acceptor_state
187 1244 : .get_last_log_term(self.flush_lsn())
188 1244 : }
189 :
190 0 : pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
191 0 : self.state_mut().term_bump(to).await
192 0 : }
193 :
194 0 : pub async fn membership_switch(
195 0 : &mut self,
196 0 : to: Configuration,
197 0 : ) -> Result<TimelineMembershipSwitchResponse> {
198 0 : self.state_mut().membership_switch(to).await
199 0 : }
200 :
201 : /// Close open WAL files to release FDs.
202 0 : fn close_wal_store(&mut self) {
203 0 : if let StateSK::Loaded(sk) = self {
204 0 : sk.wal_store.close();
205 0 : }
206 0 : }
207 :
208 : /// Update timeline state with peer safekeeper data.
209 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
210 0 : // update commit_lsn if safekeeper is loaded
211 0 : match self {
212 0 : StateSK::Loaded(sk) => sk.record_safekeeper_info(sk_info).await?,
213 0 : StateSK::Offloaded(_) => {}
214 0 : StateSK::Empty => unreachable!(),
215 : }
216 :
217 : // update everything else, including remote_consistent_lsn and backup_lsn
218 0 : let mut sync_control_file = false;
219 0 : let state = self.state_mut();
220 0 : let wal_seg_size = state.server.wal_seg_size as u64;
221 0 :
222 0 : state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
223 0 : sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
224 0 :
225 0 : state.inmem.remote_consistent_lsn = max(
226 0 : Lsn(sk_info.remote_consistent_lsn),
227 0 : state.inmem.remote_consistent_lsn,
228 0 : );
229 0 : sync_control_file |=
230 0 : state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
231 0 :
232 0 : state.inmem.peer_horizon_lsn =
233 0 : max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
234 0 : sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;
235 0 :
236 0 : if sync_control_file {
237 0 : state.flush().await?;
238 0 : }
239 0 : Ok(())
240 0 : }
241 :
242 : /// Previously known as epoch_start_lsn. Needed only for reference in some APIs.
243 0 : pub fn term_start_lsn(&self) -> Lsn {
244 0 : match self {
245 0 : StateSK::Loaded(sk) => sk.term_start_lsn,
246 0 : StateSK::Offloaded(_) => Lsn(0),
247 0 : StateSK::Empty => unreachable!(),
248 : }
249 0 : }
250 :
251 : /// Used for metrics only.
252 0 : pub fn wal_storage_metrics(&self) -> WalStorageMetrics {
253 0 : match self {
254 0 : StateSK::Loaded(sk) => sk.wal_store.get_metrics(),
255 0 : StateSK::Offloaded(_) => WalStorageMetrics::default(),
256 0 : StateSK::Empty => unreachable!(),
257 : }
258 0 : }
259 :
260 : /// Returns WAL storage internal LSNs for debug dump.
261 0 : pub fn wal_storage_internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
262 0 : match self {
263 0 : StateSK::Loaded(sk) => sk.wal_store.internal_state(),
264 : StateSK::Offloaded(_) => {
265 0 : let flush_lsn = self.flush_lsn();
266 0 : (flush_lsn, flush_lsn, flush_lsn, false)
267 : }
268 0 : StateSK::Empty => unreachable!(),
269 : }
270 0 : }
271 :
272 : /// Access to SafeKeeper object. Panics if offloaded, should be good to use from WalResidentTimeline.
273 1200 : pub fn safekeeper(
274 1200 : &mut self,
275 1200 : ) -> &mut SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage> {
276 1200 : match self {
277 1200 : StateSK::Loaded(sk) => sk,
278 : StateSK::Offloaded(_) => {
279 0 : panic!("safekeeper is offloaded, cannot be used")
280 : }
281 0 : StateSK::Empty => unreachable!(),
282 : }
283 1200 : }
284 :
285 : /// Moves control file's state structure out of the enum. Used to switch states.
286 0 : fn take_state(self) -> TimelineState<control_file::FileStorage> {
287 0 : match self {
288 0 : StateSK::Loaded(sk) => sk.state,
289 0 : StateSK::Offloaded(state) => *state,
290 0 : StateSK::Empty => unreachable!(),
291 : }
292 0 : }
293 : }
294 :
295 : /// Shared state associated with database instance
296 : pub struct SharedState {
297 : /// Safekeeper object
298 : pub(crate) sk: StateSK,
299 : /// In memory list containing state of peers sent in latest messages from them.
300 : pub(crate) peers_info: PeersInfo,
301 : // True value hinders old WAL removal; this is used by snapshotting. We
302 : // could make it a counter, but there is no need to.
303 : pub(crate) wal_removal_on_hold: bool,
304 : }
305 :
306 : impl SharedState {
307 : /// Creates a new SharedState.
308 3 : pub fn new(sk: StateSK) -> Self {
309 3 : Self {
310 3 : sk,
311 3 : peers_info: PeersInfo(vec![]),
312 3 : wal_removal_on_hold: false,
313 3 : }
314 3 : }
315 :
316 : /// Restore SharedState from control file. If file doesn't exist, bails out.
317 0 : pub fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
318 0 : let timeline_dir = get_timeline_dir(conf, ttid);
319 0 : let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?;
320 0 : if control_store.server.wal_seg_size == 0 {
321 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
322 0 : }
323 :
324 0 : let sk = match control_store.eviction_state {
325 : EvictionState::Present => {
326 0 : let wal_store = wal_storage::PhysicalStorage::new(
327 0 : ttid,
328 0 : &timeline_dir,
329 0 : &control_store,
330 0 : conf.no_sync,
331 0 : )?;
332 0 : StateSK::Loaded(SafeKeeper::new(
333 0 : TimelineState::new(control_store),
334 0 : wal_store,
335 0 : conf.my_id,
336 0 : )?)
337 : }
338 : EvictionState::Offloaded(_) => {
339 0 : StateSK::Offloaded(Box::new(TimelineState::new(control_store)))
340 : }
341 : };
342 :
343 0 : Ok(Self::new(sk))
344 0 : }
345 :
346 3 : pub(crate) fn get_wal_seg_size(&self) -> usize {
347 3 : self.sk.state().server.wal_seg_size as usize
348 3 : }
349 :
350 0 : fn get_safekeeper_info(
351 0 : &self,
352 0 : ttid: &TenantTimelineId,
353 0 : conf: &SafeKeeperConf,
354 0 : standby_apply_lsn: Lsn,
355 0 : ) -> SafekeeperTimelineInfo {
356 0 : SafekeeperTimelineInfo {
357 0 : safekeeper_id: conf.my_id.0,
358 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
359 0 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
360 0 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
361 0 : }),
362 0 : term: self.sk.state().acceptor_state.term,
363 0 : last_log_term: self.sk.last_log_term(),
364 0 : flush_lsn: self.sk.flush_lsn().0,
365 0 : // note: this value is not flushed to control file yet and can be lost
366 0 : commit_lsn: self.sk.state().inmem.commit_lsn.0,
367 0 : remote_consistent_lsn: self.sk.state().inmem.remote_consistent_lsn.0,
368 0 : peer_horizon_lsn: self.sk.state().inmem.peer_horizon_lsn.0,
369 0 : safekeeper_connstr: conf
370 0 : .advertise_pg_addr
371 0 : .to_owned()
372 0 : .unwrap_or(conf.listen_pg_addr.clone()),
373 0 : http_connstr: conf.listen_http_addr.to_owned(),
374 0 : backup_lsn: self.sk.state().inmem.backup_lsn.0,
375 0 : local_start_lsn: self.sk.state().local_start_lsn.0,
376 0 : availability_zone: conf.availability_zone.clone(),
377 0 : standby_horizon: standby_apply_lsn.0,
378 0 : }
379 0 : }
380 :
381 : /// Get our latest view of alive peers status on the timeline.
382 : /// We pass our own info through the broker as well, so when we don't have connection
383 : /// to the broker returned vec is empty.
384 29 : pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
385 29 : let now = Instant::now();
386 29 : self.peers_info
387 29 : .0
388 29 : .iter()
389 29 : // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
390 29 : .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
391 29 : .cloned()
392 29 : .collect()
393 29 : }
394 : }
395 :
396 : #[derive(Debug, thiserror::Error)]
397 : pub enum TimelineError {
398 : #[error("Timeline {0} was cancelled and cannot be used anymore")]
399 : Cancelled(TenantTimelineId),
400 : #[error("Timeline {0} was not found in global map")]
401 : NotFound(TenantTimelineId),
402 : #[error("Timeline {0} creation is in progress")]
403 : CreationInProgress(TenantTimelineId),
404 : #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
405 : Invalid(TenantTimelineId),
406 : #[error("Timeline {0} is already exists")]
407 : AlreadyExists(TenantTimelineId),
408 : #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
409 : UninitializedWalSegSize(TenantTimelineId),
410 : #[error("Timeline {0} is not initialized, pg_version is unknown")]
411 : UninitialinzedPgVersion(TenantTimelineId),
412 : }
413 :
414 : // Convert to HTTP API error.
415 : impl From<TimelineError> for ApiError {
416 0 : fn from(te: TimelineError) -> ApiError {
417 0 : match te {
418 0 : TimelineError::NotFound(ttid) => {
419 0 : ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
420 : }
421 0 : _ => ApiError::InternalServerError(anyhow!("{}", te)),
422 : }
423 0 : }
424 : }
425 :
426 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
427 : /// It also holds SharedState and provides mutually exclusive access to it.
428 : pub struct Timeline {
429 : pub ttid: TenantTimelineId,
430 : pub remote_path: RemotePath,
431 :
432 : /// Used to broadcast commit_lsn updates to all background jobs.
433 : commit_lsn_watch_tx: watch::Sender<Lsn>,
434 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
435 :
436 : /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
437 : /// them when sending in recovery mode (to walproposer or peers). Note: this
438 : /// is just a notification, WAL reading should always done with lock held as
439 : /// term can change otherwise.
440 : term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
441 : term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
442 :
443 : /// Broadcasts shared state updates.
444 : shared_state_version_tx: watch::Sender<usize>,
445 : shared_state_version_rx: watch::Receiver<usize>,
446 :
447 : /// Safekeeper and other state, that should remain consistent and
448 : /// synchronized with the disk. This is tokio mutex as we write WAL to disk
449 : /// while holding it, ensuring that consensus checks are in order.
450 : mutex: RwLock<SharedState>,
451 : walsenders: Arc<WalSenders>,
452 : walreceivers: Arc<WalReceivers>,
453 : timeline_dir: Utf8PathBuf,
454 : manager_ctl: ManagerCtl,
455 : conf: Arc<SafeKeeperConf>,
456 :
457 : /// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
458 : /// this gate, you must respect [`Timeline::cancel`]
459 : pub(crate) gate: Gate,
460 :
461 : /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
462 : pub(crate) cancel: CancellationToken,
463 :
464 : // timeline_manager controlled state
465 : pub(crate) broker_active: AtomicBool,
466 : pub(crate) wal_backup_active: AtomicBool,
467 : pub(crate) last_removed_segno: AtomicU64,
468 : pub(crate) mgr_status: AtomicStatus,
469 : }
470 :
471 : impl Timeline {
472 : /// Constructs a new timeline.
473 3 : pub fn new(
474 3 : ttid: TenantTimelineId,
475 3 : timeline_dir: &Utf8Path,
476 3 : remote_path: &RemotePath,
477 3 : shared_state: SharedState,
478 3 : conf: Arc<SafeKeeperConf>,
479 3 : ) -> Arc<Self> {
480 3 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
481 3 : watch::channel(shared_state.sk.state().commit_lsn);
482 3 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
483 3 : shared_state.sk.last_log_term(),
484 3 : shared_state.sk.flush_lsn(),
485 3 : )));
486 3 : let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
487 3 :
488 3 : let walreceivers = WalReceivers::new();
489 3 :
490 3 : Arc::new(Self {
491 3 : ttid,
492 3 : remote_path: remote_path.to_owned(),
493 3 : timeline_dir: timeline_dir.to_owned(),
494 3 : commit_lsn_watch_tx,
495 3 : commit_lsn_watch_rx,
496 3 : term_flush_lsn_watch_tx,
497 3 : term_flush_lsn_watch_rx,
498 3 : shared_state_version_tx,
499 3 : shared_state_version_rx,
500 3 : mutex: RwLock::new(shared_state),
501 3 : walsenders: WalSenders::new(walreceivers.clone()),
502 3 : walreceivers,
503 3 : gate: Default::default(),
504 3 : cancel: CancellationToken::default(),
505 3 : manager_ctl: ManagerCtl::new(),
506 3 : conf,
507 3 : broker_active: AtomicBool::new(false),
508 3 : wal_backup_active: AtomicBool::new(false),
509 3 : last_removed_segno: AtomicU64::new(0),
510 3 : mgr_status: AtomicStatus::new(),
511 3 : })
512 3 : }
513 :
514 : /// Load existing timeline from disk.
515 0 : pub fn load_timeline(
516 0 : conf: Arc<SafeKeeperConf>,
517 0 : ttid: TenantTimelineId,
518 0 : ) -> Result<Arc<Timeline>> {
519 0 : let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
520 :
521 0 : let shared_state = SharedState::restore(conf.as_ref(), &ttid)?;
522 0 : let timeline_dir = get_timeline_dir(conf.as_ref(), &ttid);
523 0 : let remote_path = remote_timeline_path(&ttid)?;
524 :
525 0 : Ok(Timeline::new(
526 0 : ttid,
527 0 : &timeline_dir,
528 0 : &remote_path,
529 0 : shared_state,
530 0 : conf,
531 0 : ))
532 0 : }
533 :
534 : /// Bootstrap new or existing timeline starting background tasks.
535 3 : pub fn bootstrap(
536 3 : self: &Arc<Timeline>,
537 3 : _shared_state: &mut WriteGuardSharedState<'_>,
538 3 : conf: &SafeKeeperConf,
539 3 : broker_active_set: Arc<TimelinesSet>,
540 3 : partial_backup_rate_limiter: RateLimiter,
541 3 : ) {
542 3 : let (tx, rx) = self.manager_ctl.bootstrap_manager();
543 :
544 3 : let Ok(gate_guard) = self.gate.enter() else {
545 : // Init raced with shutdown
546 0 : return;
547 : };
548 :
549 : // Start manager task which will monitor timeline state and update
550 : // background tasks.
551 3 : tokio::spawn({
552 3 : let this = self.clone();
553 3 : let conf = conf.clone();
554 3 : async move {
555 3 : let _gate_guard = gate_guard;
556 3 : timeline_manager::main_task(
557 3 : ManagerTimeline { tli: this },
558 3 : conf,
559 3 : broker_active_set,
560 3 : tx,
561 3 : rx,
562 3 : partial_backup_rate_limiter,
563 3 : )
564 3 : .await
565 3 : }
566 3 : });
567 3 : }
568 :
569 : /// Background timeline activities (which hold Timeline::gate) will no
570 : /// longer run once this function completes.
571 0 : pub async fn shutdown(&self) {
572 0 : info!("timeline {} shutting down", self.ttid);
573 0 : self.cancel.cancel();
574 0 :
575 0 : // Wait for any concurrent tasks to stop using this timeline, to avoid e.g. attempts
576 0 : // to read deleted files.
577 0 : self.gate.close().await;
578 0 : }
579 :
580 : /// Delete timeline from disk completely, by removing timeline directory.
581 : ///
582 : /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
583 : /// deletion API endpoint is retriable.
584 : ///
585 : /// Timeline must be in shut-down state (i.e. call [`Self::shutdown`] first)
586 0 : pub async fn delete(
587 0 : &self,
588 0 : shared_state: &mut WriteGuardSharedState<'_>,
589 0 : only_local: bool,
590 0 : ) -> Result<bool> {
591 0 : // Assert that [`Self::shutdown`] was already called
592 0 : assert!(self.cancel.is_cancelled());
593 0 : assert!(self.gate.close_complete());
594 :
595 0 : info!("deleting timeline {} from disk", self.ttid);
596 :
597 : // Close associated FDs. Nobody will be able to touch timeline data once
598 : // it is cancelled, so WAL storage won't be opened again.
599 0 : shared_state.sk.close_wal_store();
600 0 :
601 0 : if !only_local && self.conf.is_wal_backup_enabled() {
602 : // Note: we concurrently delete remote storage data from multiple
603 : // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
604 : // do some retries anyway.
605 0 : wal_backup::delete_timeline(&self.ttid).await?;
606 0 : }
607 0 : let dir_existed = delete_dir(&self.timeline_dir).await?;
608 0 : Ok(dir_existed)
609 0 : }
610 :
611 : /// Returns if timeline is cancelled.
612 2415 : pub fn is_cancelled(&self) -> bool {
613 2415 : self.cancel.is_cancelled()
614 2415 : }
615 :
616 : /// Take a writing mutual exclusive lock on timeline shared_state.
617 1212 : pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
618 1212 : WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
619 1212 : }
620 :
621 40 : pub async fn read_shared_state(&self) -> ReadGuardSharedState {
622 40 : self.mutex.read().await
623 40 : }
624 :
625 : /// Returns commit_lsn watch channel.
626 3 : pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
627 3 : self.commit_lsn_watch_rx.clone()
628 3 : }
629 :
630 : /// Returns term_flush_lsn watch channel.
631 0 : pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
632 0 : self.term_flush_lsn_watch_rx.clone()
633 0 : }
634 :
635 : /// Returns watch channel for SharedState update version.
636 3 : pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
637 3 : self.shared_state_version_rx.clone()
638 3 : }
639 :
640 : /// Returns wal_seg_size.
641 3 : pub async fn get_wal_seg_size(&self) -> usize {
642 3 : self.read_shared_state().await.get_wal_seg_size()
643 3 : }
644 :
645 : /// Returns state of the timeline.
646 5 : pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
647 5 : let state = self.read_shared_state().await;
648 5 : (
649 5 : state.sk.state().inmem.clone(),
650 5 : TimelinePersistentState::clone(state.sk.state()),
651 5 : )
652 5 : }
653 :
654 : /// Returns latest backup_lsn.
655 0 : pub async fn get_wal_backup_lsn(&self) -> Lsn {
656 0 : self.read_shared_state().await.sk.state().inmem.backup_lsn
657 0 : }
658 :
659 : /// Sets backup_lsn to the given value.
660 0 : pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
661 0 : if self.is_cancelled() {
662 0 : bail!(TimelineError::Cancelled(self.ttid));
663 0 : }
664 :
665 0 : let mut state = self.write_shared_state().await;
666 0 : state.sk.state_mut().inmem.backup_lsn = max(state.sk.state().inmem.backup_lsn, backup_lsn);
667 0 : // we should check whether to shut down offloader, but this will be done
668 0 : // soon by peer communication anyway.
669 0 : Ok(())
670 0 : }
671 :
672 : /// Get safekeeper info for broadcasting to broker and other peers.
673 0 : pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
674 0 : let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
675 0 : let shared_state = self.read_shared_state().await;
676 0 : shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
677 0 : }
678 :
679 : /// Update timeline state with peer safekeeper data.
680 0 : pub async fn record_safekeeper_info(
681 0 : self: &Arc<Self>,
682 0 : sk_info: SafekeeperTimelineInfo,
683 0 : ) -> Result<()> {
684 : {
685 0 : let mut shared_state = self.write_shared_state().await;
686 0 : shared_state.sk.record_safekeeper_info(&sk_info).await?;
687 0 : let peer_info = peer_info_from_sk_info(&sk_info, Instant::now());
688 0 : shared_state.peers_info.upsert(&peer_info);
689 0 : }
690 0 : Ok(())
691 0 : }
692 :
693 0 : pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
694 0 : let shared_state = self.read_shared_state().await;
695 0 : shared_state.get_peers(conf.heartbeat_timeout)
696 0 : }
697 :
698 3 : pub fn get_walsenders(&self) -> &Arc<WalSenders> {
699 3 : &self.walsenders
700 3 : }
701 :
702 9 : pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
703 9 : &self.walreceivers
704 9 : }
705 :
706 : /// Returns flush_lsn.
707 0 : pub async fn get_flush_lsn(&self) -> Lsn {
708 0 : self.read_shared_state().await.sk.flush_lsn()
709 0 : }
710 :
711 : /// Gather timeline data for metrics.
712 0 : pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
713 0 : if self.is_cancelled() {
714 0 : return None;
715 0 : }
716 0 :
717 0 : let WalSendersTimelineMetricValues {
718 0 : ps_feedback_counter,
719 0 : last_ps_feedback,
720 0 : interpreted_wal_reader_tasks,
721 0 : } = self.walsenders.info_for_metrics();
722 :
723 0 : let state = self.read_shared_state().await;
724 0 : Some(FullTimelineInfo {
725 0 : ttid: self.ttid,
726 0 : ps_feedback_count: ps_feedback_counter,
727 0 : last_ps_feedback,
728 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
729 0 : timeline_is_active: self.broker_active.load(Ordering::Relaxed),
730 0 : num_computes: self.walreceivers.get_num() as u32,
731 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
732 0 : interpreted_wal_reader_tasks,
733 0 : epoch_start_lsn: state.sk.term_start_lsn(),
734 0 : mem_state: state.sk.state().inmem.clone(),
735 0 : persisted_state: TimelinePersistentState::clone(state.sk.state()),
736 0 : flush_lsn: state.sk.flush_lsn(),
737 0 : wal_storage: state.sk.wal_storage_metrics(),
738 0 : })
739 0 : }
740 :
741 : /// Returns in-memory timeline state to build a full debug dump.
742 0 : pub async fn memory_dump(&self) -> debug_dump::Memory {
743 0 : let state = self.read_shared_state().await;
744 :
745 0 : let (write_lsn, write_record_lsn, flush_lsn, file_open) =
746 0 : state.sk.wal_storage_internal_state();
747 0 :
748 0 : debug_dump::Memory {
749 0 : is_cancelled: self.is_cancelled(),
750 0 : peers_info_len: state.peers_info.0.len(),
751 0 : walsenders: self.walsenders.get_all_public(),
752 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
753 0 : active: self.broker_active.load(Ordering::Relaxed),
754 0 : num_computes: self.walreceivers.get_num() as u32,
755 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
756 0 : epoch_start_lsn: state.sk.term_start_lsn(),
757 0 : mem_state: state.sk.state().inmem.clone(),
758 0 : mgr_status: self.mgr_status.get(),
759 0 : write_lsn,
760 0 : write_record_lsn,
761 0 : flush_lsn,
762 0 : file_open,
763 0 : }
764 0 : }
765 :
766 : /// Apply a function to the control file state and persist it.
767 0 : pub async fn map_control_file<T>(
768 0 : self: &Arc<Self>,
769 0 : f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
770 0 : ) -> Result<T> {
771 0 : let mut state = self.write_shared_state().await;
772 0 : let mut persistent_state = state.sk.state_mut().start_change();
773 : // If f returns error, we abort the change and don't persist anything.
774 0 : let res = f(&mut persistent_state)?;
775 : // If persisting fails, we abort the change and return error.
776 0 : state
777 0 : .sk
778 0 : .state_mut()
779 0 : .finish_change(&persistent_state)
780 0 : .await?;
781 0 : Ok(res)
782 0 : }
783 :
784 0 : pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
785 0 : let mut state = self.write_shared_state().await;
786 0 : state.sk.term_bump(to).await
787 0 : }
788 :
789 0 : pub async fn membership_switch(
790 0 : self: &Arc<Self>,
791 0 : to: Configuration,
792 0 : ) -> Result<TimelineMembershipSwitchResponse> {
793 0 : let mut state = self.write_shared_state().await;
794 0 : state.sk.membership_switch(to).await
795 0 : }
796 :
797 : /// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
798 6 : async fn do_wal_residence_guard(
799 6 : self: &Arc<Self>,
800 6 : block: bool,
801 6 : ) -> Result<Option<WalResidentTimeline>> {
802 6 : let op_label = if block {
803 6 : "wal_residence_guard"
804 : } else {
805 0 : "try_wal_residence_guard"
806 : };
807 :
808 6 : if self.is_cancelled() {
809 0 : bail!(TimelineError::Cancelled(self.ttid));
810 6 : }
811 6 :
812 6 : debug!("requesting WalResidentTimeline guard");
813 6 : let started_at = Instant::now();
814 6 : let status_before = self.mgr_status.get();
815 :
816 : // Wait 30 seconds for the guard to be acquired. It can time out if someone is
817 : // holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
818 : // is stuck.
819 6 : let res = tokio::time::timeout_at(started_at + Duration::from_secs(30), async {
820 6 : if block {
821 6 : self.manager_ctl.wal_residence_guard().await.map(Some)
822 : } else {
823 0 : self.manager_ctl.try_wal_residence_guard().await
824 : }
825 6 : })
826 6 : .await;
827 :
828 6 : let guard = match res {
829 6 : Ok(Ok(guard)) => {
830 6 : let finished_at = Instant::now();
831 6 : let elapsed = finished_at - started_at;
832 6 : MISC_OPERATION_SECONDS
833 6 : .with_label_values(&[op_label])
834 6 : .observe(elapsed.as_secs_f64());
835 6 :
836 6 : guard
837 : }
838 0 : Ok(Err(e)) => {
839 0 : warn!(
840 0 : "error acquiring in {op_label}, statuses {:?} => {:?}",
841 0 : status_before,
842 0 : self.mgr_status.get()
843 : );
844 0 : return Err(e);
845 : }
846 : Err(_) => {
847 0 : warn!(
848 0 : "timeout acquiring in {op_label} guard, statuses {:?} => {:?}",
849 0 : status_before,
850 0 : self.mgr_status.get()
851 : );
852 0 : anyhow::bail!("timeout while acquiring WalResidentTimeline guard");
853 : }
854 : };
855 :
856 6 : Ok(guard.map(|g| WalResidentTimeline::new(self.clone(), g)))
857 6 : }
858 :
859 : /// Get the timeline guard for reading/writing WAL files.
860 : /// If WAL files are not present on disk (evicted), they will be automatically
861 : /// downloaded from remote storage. This is done in the manager task, which is
862 : /// responsible for issuing all guards.
863 : ///
864 : /// NB: don't use this function from timeline_manager, it will deadlock.
865 : /// NB: don't use this function while holding shared_state lock.
866 6 : pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
867 6 : self.do_wal_residence_guard(true)
868 6 : .await
869 6 : .map(|m| m.expect("Always get Some in block=true mode"))
870 6 : }
871 :
872 : /// Get the timeline guard for reading/writing WAL files if the timeline is resident,
873 : /// else return None
874 0 : pub(crate) async fn try_wal_residence_guard(
875 0 : self: &Arc<Self>,
876 0 : ) -> Result<Option<WalResidentTimeline>> {
877 0 : self.do_wal_residence_guard(false).await
878 0 : }
879 :
880 0 : pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
881 0 : self.manager_ctl.backup_partial_reset().await
882 0 : }
883 : }
884 :
885 : /// This is a guard that allows to read/write disk timeline state.
886 : /// All tasks that are trying to read/write WAL from disk should use this guard.
887 : pub struct WalResidentTimeline {
888 : pub tli: Arc<Timeline>,
889 : _guard: ResidenceGuard,
890 : }
891 :
892 : impl WalResidentTimeline {
893 9 : pub fn new(tli: Arc<Timeline>, _guard: ResidenceGuard) -> Self {
894 9 : WalResidentTimeline { tli, _guard }
895 9 : }
896 : }
897 :
898 : impl Deref for WalResidentTimeline {
899 : type Target = Arc<Timeline>;
900 :
901 5450 : fn deref(&self) -> &Self::Target {
902 5450 : &self.tli
903 5450 : }
904 : }
905 :
906 : impl WalResidentTimeline {
907 : /// Returns true if walsender should stop sending WAL to pageserver. We
908 : /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
909 : /// computes. While there might be nothing to stream already, we learn about
910 : /// remote_consistent_lsn update through replication feedback, and we want
911 : /// to stop pushing to the broker if pageserver is fully caughtup.
912 0 : pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
913 0 : if self.is_cancelled() {
914 0 : return true;
915 0 : }
916 0 : let shared_state = self.read_shared_state().await;
917 0 : if self.walreceivers.get_num() == 0 {
918 0 : return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet
919 0 : reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn;
920 0 : }
921 0 : false
922 0 : }
923 :
924 : /// Ensure that current term is t, erroring otherwise, and lock the state.
925 0 : pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
926 0 : let ss = self.read_shared_state().await;
927 0 : if ss.sk.state().acceptor_state.term != t {
928 0 : bail!(
929 0 : "failed to acquire term {}, current term {}",
930 0 : t,
931 0 : ss.sk.state().acceptor_state.term
932 0 : );
933 0 : }
934 0 : Ok(ss)
935 0 : }
936 :
937 : /// Pass arrived message to the safekeeper.
938 1200 : pub async fn process_msg(
939 1200 : &self,
940 1200 : msg: &ProposerAcceptorMessage,
941 1200 : ) -> Result<Option<AcceptorProposerMessage>> {
942 1200 : if self.is_cancelled() {
943 0 : bail!(TimelineError::Cancelled(self.ttid));
944 1200 : }
945 :
946 : let mut rmsg: Option<AcceptorProposerMessage>;
947 : {
948 1200 : let mut shared_state = self.write_shared_state().await;
949 1200 : rmsg = shared_state.sk.safekeeper().process_msg(msg).await?;
950 :
951 : // if this is AppendResponse, fill in proper hot standby feedback.
952 600 : if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
953 600 : resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
954 600 : }
955 : }
956 1200 : Ok(rmsg)
957 1200 : }
958 :
959 5 : pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
960 5 : let (_, persisted_state) = self.get_state().await;
961 5 : let enable_remote_read = self.conf.is_wal_backup_enabled();
962 5 :
963 5 : WalReader::new(
964 5 : &self.ttid,
965 5 : self.timeline_dir.clone(),
966 5 : &persisted_state,
967 5 : start_lsn,
968 5 : enable_remote_read,
969 5 : )
970 5 : }
971 :
972 0 : pub fn get_timeline_dir(&self) -> Utf8PathBuf {
973 0 : self.timeline_dir.clone()
974 0 : }
975 :
976 : /// Update in memory remote consistent lsn.
977 0 : pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
978 0 : let mut shared_state = self.write_shared_state().await;
979 0 : shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
980 0 : shared_state.sk.state().inmem.remote_consistent_lsn,
981 0 : candidate,
982 0 : );
983 0 : }
984 : }
985 :
986 : /// This struct contains methods that are used by timeline manager task.
987 : pub(crate) struct ManagerTimeline {
988 : pub(crate) tli: Arc<Timeline>,
989 : }
990 :
991 : impl Deref for ManagerTimeline {
992 : type Target = Arc<Timeline>;
993 :
994 316 : fn deref(&self) -> &Self::Target {
995 316 : &self.tli
996 316 : }
997 : }
998 :
999 : impl ManagerTimeline {
1000 0 : pub(crate) fn timeline_dir(&self) -> &Utf8PathBuf {
1001 0 : &self.tli.timeline_dir
1002 0 : }
1003 :
1004 : /// Manager requests this state on startup.
1005 3 : pub(crate) async fn bootstrap_mgr(&self) -> (bool, Option<PartialRemoteSegment>) {
1006 3 : let shared_state = self.read_shared_state().await;
1007 3 : let is_offloaded = matches!(
1008 3 : shared_state.sk.state().eviction_state,
1009 : EvictionState::Offloaded(_)
1010 : );
1011 3 : let partial_backup_uploaded = shared_state.sk.state().partial_backup.uploaded_segment();
1012 3 :
1013 3 : (is_offloaded, partial_backup_uploaded)
1014 3 : }
1015 :
1016 : /// Try to switch state Present->Offloaded.
1017 0 : pub(crate) async fn switch_to_offloaded(
1018 0 : &self,
1019 0 : partial: &PartialRemoteSegment,
1020 0 : ) -> anyhow::Result<()> {
1021 0 : let mut shared = self.write_shared_state().await;
1022 :
1023 : // updating control file
1024 0 : let mut pstate = shared.sk.state_mut().start_change();
1025 :
1026 0 : if !matches!(pstate.eviction_state, EvictionState::Present) {
1027 0 : bail!(
1028 0 : "cannot switch to offloaded state, current state is {:?}",
1029 0 : pstate.eviction_state
1030 0 : );
1031 0 : }
1032 0 :
1033 0 : if partial.flush_lsn != shared.sk.flush_lsn() {
1034 0 : bail!(
1035 0 : "flush_lsn mismatch in partial backup, expected {}, got {}",
1036 0 : shared.sk.flush_lsn(),
1037 0 : partial.flush_lsn
1038 0 : );
1039 0 : }
1040 0 :
1041 0 : if partial.commit_lsn != pstate.commit_lsn {
1042 0 : bail!(
1043 0 : "commit_lsn mismatch in partial backup, expected {}, got {}",
1044 0 : pstate.commit_lsn,
1045 0 : partial.commit_lsn
1046 0 : );
1047 0 : }
1048 0 :
1049 0 : if partial.term != shared.sk.last_log_term() {
1050 0 : bail!(
1051 0 : "term mismatch in partial backup, expected {}, got {}",
1052 0 : shared.sk.last_log_term(),
1053 0 : partial.term
1054 0 : );
1055 0 : }
1056 0 :
1057 0 : pstate.eviction_state = EvictionState::Offloaded(shared.sk.flush_lsn());
1058 0 : shared.sk.state_mut().finish_change(&pstate).await?;
1059 : // control file is now switched to Offloaded state
1060 :
1061 : // now we can switch shared.sk to Offloaded, shouldn't fail
1062 0 : let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
1063 0 : let cfile_state = prev_sk.take_state();
1064 0 : shared.sk = StateSK::Offloaded(Box::new(cfile_state));
1065 0 :
1066 0 : Ok(())
1067 0 : }
1068 :
1069 : /// Try to switch state Offloaded->Present.
1070 0 : pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
1071 0 : let mut shared = self.write_shared_state().await;
1072 :
1073 : // trying to restore WAL storage
1074 0 : let wal_store = wal_storage::PhysicalStorage::new(
1075 0 : &self.ttid,
1076 0 : &self.timeline_dir,
1077 0 : shared.sk.state(),
1078 0 : self.conf.no_sync,
1079 0 : )?;
1080 :
1081 : // updating control file
1082 0 : let mut pstate = shared.sk.state_mut().start_change();
1083 :
1084 0 : if !matches!(pstate.eviction_state, EvictionState::Offloaded(_)) {
1085 0 : bail!(
1086 0 : "cannot switch to present state, current state is {:?}",
1087 0 : pstate.eviction_state
1088 0 : );
1089 0 : }
1090 0 :
1091 0 : if wal_store.flush_lsn() != shared.sk.flush_lsn() {
1092 0 : bail!(
1093 0 : "flush_lsn mismatch in restored WAL, expected {}, got {}",
1094 0 : shared.sk.flush_lsn(),
1095 0 : wal_store.flush_lsn()
1096 0 : );
1097 0 : }
1098 0 :
1099 0 : pstate.eviction_state = EvictionState::Present;
1100 0 : shared.sk.state_mut().finish_change(&pstate).await?;
1101 :
1102 : // now we can switch shared.sk to Present, shouldn't fail
1103 0 : let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
1104 0 : let cfile_state = prev_sk.take_state();
1105 0 : shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, self.conf.my_id)?);
1106 :
1107 0 : Ok(())
1108 0 : }
1109 :
1110 : /// Update current manager state, useful for debugging manager deadlocks.
1111 170 : pub(crate) fn set_status(&self, status: timeline_manager::Status) {
1112 170 : self.mgr_status.store(status, Ordering::Relaxed);
1113 170 : }
1114 : }
1115 :
1116 : /// Deletes directory and it's contents. Returns false if directory does not exist.
1117 0 : async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
1118 0 : match fs::remove_dir_all(path).await {
1119 0 : Ok(_) => Ok(true),
1120 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
1121 0 : Err(e) => Err(e.into()),
1122 : }
1123 0 : }
1124 :
1125 : /// Get a path to the tenant directory. If you just need to get a timeline directory,
1126 : /// use WalResidentTimeline::get_timeline_dir instead.
1127 6 : pub fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
1128 6 : conf.workdir.join(tenant_id.to_string())
1129 6 : }
1130 :
1131 : /// Get a path to the timeline directory. If you need to read WAL files from disk,
1132 : /// use WalResidentTimeline::get_timeline_dir instead. This function does not check
1133 : /// timeline eviction status and WAL files might not be present on disk.
1134 6 : pub fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
1135 6 : get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
1136 6 : }
|