Line data Source code
1 : //! This module implements Timeline lifecycle management and has all necessary code
2 : //! to glue together SafeKeeper and all other background services.
3 :
4 : use std::cmp::max;
5 : use std::ops::{Deref, DerefMut};
6 : use std::sync::Arc;
7 : use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8 : use std::time::Duration;
9 :
10 : use anyhow::{Result, anyhow, bail};
11 : use camino::{Utf8Path, Utf8PathBuf};
12 : use http_utils::error::ApiError;
13 : use remote_storage::RemotePath;
14 : use safekeeper_api::Term;
15 : use safekeeper_api::membership::Configuration;
16 : use safekeeper_api::models::{
17 : PeerInfo, TimelineMembershipSwitchResponse, TimelineTermBumpResponse,
18 : };
19 : use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
20 : use tokio::fs::{self};
21 : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, watch};
22 : use tokio::time::Instant;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::*;
25 : use utils::id::{NodeId, TenantId, TenantTimelineId};
26 : use utils::lsn::Lsn;
27 : use utils::sync::gate::Gate;
28 :
29 : use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics};
30 : use crate::rate_limit::RateLimiter;
31 : use crate::receive_wal::WalReceivers;
32 : use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn};
33 : use crate::send_wal::{WalSenders, WalSendersTimelineMetricValues};
34 : use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
35 : use crate::timeline_guard::ResidenceGuard;
36 : use crate::timeline_manager::{AtomicStatus, ManagerCtl};
37 : use crate::timelines_set::TimelinesSet;
38 : use crate::wal_backup::{self, remote_timeline_path};
39 : use crate::wal_backup_partial::PartialRemoteSegment;
40 : use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
41 : use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage};
42 :
43 0 : fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
44 0 : PeerInfo {
45 0 : sk_id: NodeId(sk_info.safekeeper_id),
46 0 : term: sk_info.term,
47 0 : last_log_term: sk_info.last_log_term,
48 0 : flush_lsn: Lsn(sk_info.flush_lsn),
49 0 : commit_lsn: Lsn(sk_info.commit_lsn),
50 0 : local_start_lsn: Lsn(sk_info.local_start_lsn),
51 0 : pg_connstr: sk_info.safekeeper_connstr.clone(),
52 0 : http_connstr: sk_info.http_connstr.clone(),
53 0 : ts,
54 0 : }
55 0 : }
56 :
57 : // vector-based node id -> peer state map with very limited functionality we
58 : // need.
59 : #[derive(Debug, Clone, Default)]
60 : pub struct PeersInfo(pub Vec<PeerInfo>);
61 :
62 : impl PeersInfo {
63 0 : fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
64 0 : self.0.iter_mut().find(|p| p.sk_id == id)
65 0 : }
66 :
67 0 : fn upsert(&mut self, p: &PeerInfo) {
68 0 : match self.get(p.sk_id) {
69 0 : Some(rp) => *rp = p.clone(),
70 0 : None => self.0.push(p.clone()),
71 : }
72 0 : }
73 : }
74 :
75 : pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
76 :
77 : /// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
78 : /// automatically updates `watch::Sender` channels with state on drop.
79 : pub struct WriteGuardSharedState<'a> {
80 : tli: Arc<Timeline>,
81 : guard: RwLockWriteGuard<'a, SharedState>,
82 : }
83 :
84 : impl<'a> WriteGuardSharedState<'a> {
85 1238 : fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
86 1238 : WriteGuardSharedState { tli, guard }
87 1238 : }
88 : }
89 :
90 : impl Deref for WriteGuardSharedState<'_> {
91 : type Target = SharedState;
92 :
93 0 : fn deref(&self) -> &Self::Target {
94 0 : &self.guard
95 0 : }
96 : }
97 :
98 : impl DerefMut for WriteGuardSharedState<'_> {
99 1234 : fn deref_mut(&mut self) -> &mut Self::Target {
100 1234 : &mut self.guard
101 1234 : }
102 : }
103 :
104 : impl Drop for WriteGuardSharedState<'_> {
105 1238 : fn drop(&mut self) {
106 1238 : let term_flush_lsn =
107 1238 : TermLsn::from((self.guard.sk.last_log_term(), self.guard.sk.flush_lsn()));
108 1238 : let commit_lsn = self.guard.sk.state().inmem.commit_lsn;
109 1238 :
110 1238 : let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
111 1238 : if *old != term_flush_lsn {
112 610 : *old = term_flush_lsn;
113 610 : true
114 : } else {
115 628 : false
116 : }
117 1238 : });
118 1238 :
119 1238 : let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
120 1238 : if *old != commit_lsn {
121 606 : *old = commit_lsn;
122 606 : true
123 : } else {
124 632 : false
125 : }
126 1238 : });
127 1238 :
128 1238 : // send notification about shared state update
129 1238 : self.tli.shared_state_version_tx.send_modify(|old| {
130 1238 : *old += 1;
131 1238 : });
132 1238 : }
133 : }
134 :
135 : /// This structure is stored in shared state and represents the state of the timeline.
136 : ///
137 : /// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
138 : /// case, SafeKeeper is not available (because WAL is not present on disk) and all
139 : /// operations can be done only with control file.
140 : pub enum StateSK {
141 : Loaded(SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>),
142 : Offloaded(Box<TimelineState<control_file::FileStorage>>),
143 : // Not used, required for moving between states.
144 : Empty,
145 : }
146 :
147 : impl StateSK {
148 2570 : pub fn flush_lsn(&self) -> Lsn {
149 2570 : match self {
150 2570 : StateSK::Loaded(sk) => sk.wal_store.flush_lsn(),
151 0 : StateSK::Offloaded(state) => match state.eviction_state {
152 0 : EvictionState::Offloaded(flush_lsn) => flush_lsn,
153 0 : _ => panic!("StateSK::Offloaded mismatches with eviction_state from control_file"),
154 : },
155 0 : StateSK::Empty => unreachable!(),
156 : }
157 2570 : }
158 :
159 : /// Get a reference to the control file's timeline state.
160 2598 : pub fn state(&self) -> &TimelineState<control_file::FileStorage> {
161 2598 : match self {
162 2598 : StateSK::Loaded(sk) => &sk.state,
163 0 : StateSK::Offloaded(s) => s,
164 0 : StateSK::Empty => unreachable!(),
165 : }
166 2598 : }
167 :
168 14 : pub fn state_mut(&mut self) -> &mut TimelineState<control_file::FileStorage> {
169 14 : match self {
170 14 : StateSK::Loaded(sk) => &mut sk.state,
171 0 : StateSK::Offloaded(s) => s,
172 0 : StateSK::Empty => unreachable!(),
173 : }
174 14 : }
175 :
176 1285 : pub fn last_log_term(&self) -> Term {
177 1285 : self.state()
178 1285 : .acceptor_state
179 1285 : .get_last_log_term(self.flush_lsn())
180 1285 : }
181 :
182 0 : pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
183 0 : self.state_mut().term_bump(to).await
184 0 : }
185 :
186 0 : pub async fn membership_switch(
187 0 : &mut self,
188 0 : to: Configuration,
189 0 : ) -> Result<TimelineMembershipSwitchResponse> {
190 0 : self.state_mut().membership_switch(to).await
191 0 : }
192 :
193 : /// Close open WAL files to release FDs.
194 0 : fn close_wal_store(&mut self) {
195 0 : if let StateSK::Loaded(sk) = self {
196 0 : sk.wal_store.close();
197 0 : }
198 0 : }
199 :
200 : /// Update timeline state with peer safekeeper data.
201 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
202 0 : // update commit_lsn if safekeeper is loaded
203 0 : match self {
204 0 : StateSK::Loaded(sk) => sk.record_safekeeper_info(sk_info).await?,
205 0 : StateSK::Offloaded(_) => {}
206 0 : StateSK::Empty => unreachable!(),
207 : }
208 :
209 : // update everything else, including remote_consistent_lsn and backup_lsn
210 0 : let mut sync_control_file = false;
211 0 : let state = self.state_mut();
212 0 : let wal_seg_size = state.server.wal_seg_size as u64;
213 0 :
214 0 : state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
215 0 : sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
216 0 :
217 0 : state.inmem.remote_consistent_lsn = max(
218 0 : Lsn(sk_info.remote_consistent_lsn),
219 0 : state.inmem.remote_consistent_lsn,
220 0 : );
221 0 : sync_control_file |=
222 0 : state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
223 0 :
224 0 : state.inmem.peer_horizon_lsn =
225 0 : max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
226 0 : sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;
227 0 :
228 0 : if sync_control_file {
229 0 : state.flush().await?;
230 0 : }
231 0 : Ok(())
232 0 : }
233 :
234 : /// Previously known as epoch_start_lsn. Needed only for reference in some APIs.
235 0 : pub fn term_start_lsn(&self) -> Lsn {
236 0 : match self {
237 0 : StateSK::Loaded(sk) => sk.term_start_lsn,
238 0 : StateSK::Offloaded(_) => Lsn(0),
239 0 : StateSK::Empty => unreachable!(),
240 : }
241 0 : }
242 :
243 : /// Used for metrics only.
244 0 : pub fn wal_storage_metrics(&self) -> WalStorageMetrics {
245 0 : match self {
246 0 : StateSK::Loaded(sk) => sk.wal_store.get_metrics(),
247 0 : StateSK::Offloaded(_) => WalStorageMetrics::default(),
248 0 : StateSK::Empty => unreachable!(),
249 : }
250 0 : }
251 :
252 : /// Returns WAL storage internal LSNs for debug dump.
253 0 : pub fn wal_storage_internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
254 0 : match self {
255 0 : StateSK::Loaded(sk) => sk.wal_store.internal_state(),
256 : StateSK::Offloaded(_) => {
257 0 : let flush_lsn = self.flush_lsn();
258 0 : (flush_lsn, flush_lsn, flush_lsn, false)
259 : }
260 0 : StateSK::Empty => unreachable!(),
261 : }
262 0 : }
263 :
264 : /// Access to SafeKeeper object. Panics if offloaded, should be good to use from WalResidentTimeline.
265 1220 : pub fn safekeeper(
266 1220 : &mut self,
267 1220 : ) -> &mut SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage> {
268 1220 : match self {
269 1220 : StateSK::Loaded(sk) => sk,
270 : StateSK::Offloaded(_) => {
271 0 : panic!("safekeeper is offloaded, cannot be used")
272 : }
273 0 : StateSK::Empty => unreachable!(),
274 : }
275 1220 : }
276 :
277 : /// Moves control file's state structure out of the enum. Used to switch states.
278 0 : fn take_state(self) -> TimelineState<control_file::FileStorage> {
279 0 : match self {
280 0 : StateSK::Loaded(sk) => sk.state,
281 0 : StateSK::Offloaded(state) => *state,
282 0 : StateSK::Empty => unreachable!(),
283 : }
284 0 : }
285 : }
286 :
287 : /// Shared state associated with database instance
288 : pub struct SharedState {
289 : /// Safekeeper object
290 : pub(crate) sk: StateSK,
291 : /// In memory list containing state of peers sent in latest messages from them.
292 : pub(crate) peers_info: PeersInfo,
293 : // True value hinders old WAL removal; this is used by snapshotting. We
294 : // could make it a counter, but there is no need to.
295 : pub(crate) wal_removal_on_hold: bool,
296 : }
297 :
298 : impl SharedState {
299 : /// Creates a new SharedState.
300 4 : pub fn new(sk: StateSK) -> Self {
301 4 : Self {
302 4 : sk,
303 4 : peers_info: PeersInfo(vec![]),
304 4 : wal_removal_on_hold: false,
305 4 : }
306 4 : }
307 :
308 : /// Restore SharedState from control file. If file doesn't exist, bails out.
309 0 : pub fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
310 0 : let timeline_dir = get_timeline_dir(conf, ttid);
311 0 : let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?;
312 0 : if control_store.server.wal_seg_size == 0 {
313 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
314 0 : }
315 :
316 0 : let sk = match control_store.eviction_state {
317 : EvictionState::Present => {
318 0 : let wal_store = wal_storage::PhysicalStorage::new(
319 0 : ttid,
320 0 : &timeline_dir,
321 0 : &control_store,
322 0 : conf.no_sync,
323 0 : )?;
324 0 : StateSK::Loaded(SafeKeeper::new(
325 0 : TimelineState::new(control_store),
326 0 : wal_store,
327 0 : conf.my_id,
328 0 : )?)
329 : }
330 : EvictionState::Offloaded(_) => {
331 0 : StateSK::Offloaded(Box::new(TimelineState::new(control_store)))
332 : }
333 : };
334 :
335 0 : Ok(Self::new(sk))
336 0 : }
337 :
338 4 : pub(crate) fn get_wal_seg_size(&self) -> usize {
339 4 : self.sk.state().server.wal_seg_size as usize
340 4 : }
341 :
342 0 : fn get_safekeeper_info(
343 0 : &self,
344 0 : ttid: &TenantTimelineId,
345 0 : conf: &SafeKeeperConf,
346 0 : standby_apply_lsn: Lsn,
347 0 : ) -> SafekeeperTimelineInfo {
348 0 : SafekeeperTimelineInfo {
349 0 : safekeeper_id: conf.my_id.0,
350 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
351 0 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
352 0 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
353 0 : }),
354 0 : term: self.sk.state().acceptor_state.term,
355 0 : last_log_term: self.sk.last_log_term(),
356 0 : flush_lsn: self.sk.flush_lsn().0,
357 0 : // note: this value is not flushed to control file yet and can be lost
358 0 : commit_lsn: self.sk.state().inmem.commit_lsn.0,
359 0 : remote_consistent_lsn: self.sk.state().inmem.remote_consistent_lsn.0,
360 0 : peer_horizon_lsn: self.sk.state().inmem.peer_horizon_lsn.0,
361 0 : safekeeper_connstr: conf
362 0 : .advertise_pg_addr
363 0 : .to_owned()
364 0 : .unwrap_or(conf.listen_pg_addr.clone()),
365 0 : http_connstr: conf.listen_http_addr.to_owned(),
366 0 : backup_lsn: self.sk.state().inmem.backup_lsn.0,
367 0 : local_start_lsn: self.sk.state().local_start_lsn.0,
368 0 : availability_zone: conf.availability_zone.clone(),
369 0 : standby_horizon: standby_apply_lsn.0,
370 0 : }
371 0 : }
372 :
373 : /// Get our latest view of alive peers status on the timeline.
374 : /// We pass our own info through the broker as well, so when we don't have connection
375 : /// to the broker returned vec is empty.
376 43 : pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
377 43 : let now = Instant::now();
378 43 : self.peers_info
379 43 : .0
380 43 : .iter()
381 43 : // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
382 43 : .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
383 43 : .cloned()
384 43 : .collect()
385 43 : }
386 : }
387 :
388 : #[derive(Debug, thiserror::Error)]
389 : pub enum TimelineError {
390 : #[error("Timeline {0} was cancelled and cannot be used anymore")]
391 : Cancelled(TenantTimelineId),
392 : #[error("Timeline {0} was not found in global map")]
393 : NotFound(TenantTimelineId),
394 : #[error("Timeline {0} creation is in progress")]
395 : CreationInProgress(TenantTimelineId),
396 : #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
397 : Invalid(TenantTimelineId),
398 : #[error("Timeline {0} is already exists")]
399 : AlreadyExists(TenantTimelineId),
400 : #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
401 : UninitializedWalSegSize(TenantTimelineId),
402 : #[error("Timeline {0} is not initialized, pg_version is unknown")]
403 : UninitialinzedPgVersion(TenantTimelineId),
404 : }
405 :
406 : // Convert to HTTP API error.
407 : impl From<TimelineError> for ApiError {
408 0 : fn from(te: TimelineError) -> ApiError {
409 0 : match te {
410 0 : TimelineError::NotFound(ttid) => {
411 0 : ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
412 : }
413 0 : _ => ApiError::InternalServerError(anyhow!("{}", te)),
414 : }
415 0 : }
416 : }
417 :
418 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
419 : /// It also holds SharedState and provides mutually exclusive access to it.
420 : pub struct Timeline {
421 : pub ttid: TenantTimelineId,
422 : pub remote_path: RemotePath,
423 :
424 : /// Used to broadcast commit_lsn updates to all background jobs.
425 : commit_lsn_watch_tx: watch::Sender<Lsn>,
426 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
427 :
428 : /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
429 : /// them when sending in recovery mode (to walproposer or peers). Note: this
430 : /// is just a notification, WAL reading should always done with lock held as
431 : /// term can change otherwise.
432 : term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
433 : term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
434 :
435 : /// Broadcasts shared state updates.
436 : shared_state_version_tx: watch::Sender<usize>,
437 : shared_state_version_rx: watch::Receiver<usize>,
438 :
439 : /// Safekeeper and other state, that should remain consistent and
440 : /// synchronized with the disk. This is tokio mutex as we write WAL to disk
441 : /// while holding it, ensuring that consensus checks are in order.
442 : mutex: RwLock<SharedState>,
443 : walsenders: Arc<WalSenders>,
444 : walreceivers: Arc<WalReceivers>,
445 : timeline_dir: Utf8PathBuf,
446 : manager_ctl: ManagerCtl,
447 : conf: Arc<SafeKeeperConf>,
448 :
449 : /// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
450 : /// this gate, you must respect [`Timeline::cancel`]
451 : pub(crate) gate: Gate,
452 :
453 : /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
454 : pub(crate) cancel: CancellationToken,
455 :
456 : // timeline_manager controlled state
457 : pub(crate) broker_active: AtomicBool,
458 : pub(crate) wal_backup_active: AtomicBool,
459 : pub(crate) last_removed_segno: AtomicU64,
460 : pub(crate) mgr_status: AtomicStatus,
461 : }
462 :
463 : impl Timeline {
464 : /// Constructs a new timeline.
465 4 : pub fn new(
466 4 : ttid: TenantTimelineId,
467 4 : timeline_dir: &Utf8Path,
468 4 : remote_path: &RemotePath,
469 4 : shared_state: SharedState,
470 4 : conf: Arc<SafeKeeperConf>,
471 4 : ) -> Arc<Self> {
472 4 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
473 4 : watch::channel(shared_state.sk.state().commit_lsn);
474 4 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
475 4 : shared_state.sk.last_log_term(),
476 4 : shared_state.sk.flush_lsn(),
477 4 : )));
478 4 : let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
479 4 :
480 4 : let walreceivers = WalReceivers::new();
481 4 :
482 4 : Arc::new(Self {
483 4 : ttid,
484 4 : remote_path: remote_path.to_owned(),
485 4 : timeline_dir: timeline_dir.to_owned(),
486 4 : commit_lsn_watch_tx,
487 4 : commit_lsn_watch_rx,
488 4 : term_flush_lsn_watch_tx,
489 4 : term_flush_lsn_watch_rx,
490 4 : shared_state_version_tx,
491 4 : shared_state_version_rx,
492 4 : mutex: RwLock::new(shared_state),
493 4 : walsenders: WalSenders::new(walreceivers.clone()),
494 4 : walreceivers,
495 4 : gate: Default::default(),
496 4 : cancel: CancellationToken::default(),
497 4 : manager_ctl: ManagerCtl::new(),
498 4 : conf,
499 4 : broker_active: AtomicBool::new(false),
500 4 : wal_backup_active: AtomicBool::new(false),
501 4 : last_removed_segno: AtomicU64::new(0),
502 4 : mgr_status: AtomicStatus::new(),
503 4 : })
504 4 : }
505 :
506 : /// Load existing timeline from disk.
507 0 : pub fn load_timeline(
508 0 : conf: Arc<SafeKeeperConf>,
509 0 : ttid: TenantTimelineId,
510 0 : ) -> Result<Arc<Timeline>> {
511 0 : let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
512 :
513 0 : let shared_state = SharedState::restore(conf.as_ref(), &ttid)?;
514 0 : let timeline_dir = get_timeline_dir(conf.as_ref(), &ttid);
515 0 : let remote_path = remote_timeline_path(&ttid)?;
516 :
517 0 : Ok(Timeline::new(
518 0 : ttid,
519 0 : &timeline_dir,
520 0 : &remote_path,
521 0 : shared_state,
522 0 : conf,
523 0 : ))
524 0 : }
525 :
526 : /// Bootstrap new or existing timeline starting background tasks.
527 4 : pub fn bootstrap(
528 4 : self: &Arc<Timeline>,
529 4 : _shared_state: &mut WriteGuardSharedState<'_>,
530 4 : conf: &SafeKeeperConf,
531 4 : broker_active_set: Arc<TimelinesSet>,
532 4 : partial_backup_rate_limiter: RateLimiter,
533 4 : ) {
534 4 : let (tx, rx) = self.manager_ctl.bootstrap_manager();
535 :
536 4 : let Ok(gate_guard) = self.gate.enter() else {
537 : // Init raced with shutdown
538 0 : return;
539 : };
540 :
541 : // Start manager task which will monitor timeline state and update
542 : // background tasks.
543 4 : tokio::spawn({
544 4 : let this = self.clone();
545 4 : let conf = conf.clone();
546 4 : async move {
547 4 : let _gate_guard = gate_guard;
548 4 : timeline_manager::main_task(
549 4 : ManagerTimeline { tli: this },
550 4 : conf,
551 4 : broker_active_set,
552 4 : tx,
553 4 : rx,
554 4 : partial_backup_rate_limiter,
555 4 : )
556 4 : .await
557 4 : }
558 4 : });
559 4 : }
560 :
561 : /// Cancel the timeline, requesting background activity to stop. Closing
562 : /// the `self.gate` waits for that.
563 0 : pub async fn cancel(&self) {
564 0 : info!("timeline {} shutting down", self.ttid);
565 0 : self.cancel.cancel();
566 0 : }
567 :
568 : /// Background timeline activities (which hold Timeline::gate) will no
569 : /// longer run once this function completes. `Self::cancel` must have been
570 : /// already called.
571 0 : pub async fn close(&self) {
572 0 : assert!(self.cancel.is_cancelled());
573 :
574 : // Wait for any concurrent tasks to stop using this timeline, to avoid e.g. attempts
575 : // to read deleted files.
576 0 : self.gate.close().await;
577 0 : }
578 :
579 : /// Delete timeline from disk completely, by removing timeline directory.
580 : ///
581 : /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
582 : /// deletion API endpoint is retriable.
583 : ///
584 : /// Timeline must be in shut-down state (i.e. call [`Self::close`] first)
585 0 : pub async fn delete(
586 0 : &self,
587 0 : shared_state: &mut WriteGuardSharedState<'_>,
588 0 : only_local: bool,
589 0 : ) -> Result<bool> {
590 0 : // Assert that [`Self::close`] was already called
591 0 : assert!(self.cancel.is_cancelled());
592 0 : assert!(self.gate.close_complete());
593 :
594 0 : info!("deleting timeline {} from disk", self.ttid);
595 :
596 : // Close associated FDs. Nobody will be able to touch timeline data once
597 : // it is cancelled, so WAL storage won't be opened again.
598 0 : shared_state.sk.close_wal_store();
599 0 :
600 0 : if !only_local && self.conf.is_wal_backup_enabled() {
601 : // Note: we concurrently delete remote storage data from multiple
602 : // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
603 : // do some retries anyway.
604 0 : wal_backup::delete_timeline(&self.ttid).await?;
605 0 : }
606 0 : let dir_existed = delete_dir(&self.timeline_dir).await?;
607 0 : Ok(dir_existed)
608 0 : }
609 :
610 : /// Returns if timeline is cancelled.
611 2460 : pub fn is_cancelled(&self) -> bool {
612 2460 : self.cancel.is_cancelled()
613 2460 : }
614 :
615 : /// Take a writing mutual exclusive lock on timeline shared_state.
616 1238 : pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
617 1238 : WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
618 1238 : }
619 :
620 59 : pub async fn read_shared_state(&self) -> ReadGuardSharedState {
621 59 : self.mutex.read().await
622 59 : }
623 :
624 : /// Returns commit_lsn watch channel.
625 4 : pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
626 4 : self.commit_lsn_watch_rx.clone()
627 4 : }
628 :
629 : /// Returns term_flush_lsn watch channel.
630 0 : pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
631 0 : self.term_flush_lsn_watch_rx.clone()
632 0 : }
633 :
634 : /// Returns watch channel for SharedState update version.
635 4 : pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
636 4 : self.shared_state_version_rx.clone()
637 4 : }
638 :
639 : /// Returns wal_seg_size.
640 4 : pub async fn get_wal_seg_size(&self) -> usize {
641 4 : self.read_shared_state().await.get_wal_seg_size()
642 4 : }
643 :
644 : /// Returns state of the timeline.
645 8 : pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
646 8 : let state = self.read_shared_state().await;
647 8 : (
648 8 : state.sk.state().inmem.clone(),
649 8 : TimelinePersistentState::clone(state.sk.state()),
650 8 : )
651 8 : }
652 :
653 : /// Returns latest backup_lsn.
654 0 : pub async fn get_wal_backup_lsn(&self) -> Lsn {
655 0 : self.read_shared_state().await.sk.state().inmem.backup_lsn
656 0 : }
657 :
658 : /// Sets backup_lsn to the given value.
659 0 : pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
660 0 : if self.is_cancelled() {
661 0 : bail!(TimelineError::Cancelled(self.ttid));
662 0 : }
663 :
664 0 : let mut state = self.write_shared_state().await;
665 0 : state.sk.state_mut().inmem.backup_lsn = max(state.sk.state().inmem.backup_lsn, backup_lsn);
666 0 : // we should check whether to shut down offloader, but this will be done
667 0 : // soon by peer communication anyway.
668 0 : Ok(())
669 0 : }
670 :
671 : /// Get safekeeper info for broadcasting to broker and other peers.
672 0 : pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
673 0 : let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
674 0 : let shared_state = self.read_shared_state().await;
675 0 : shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
676 0 : }
677 :
678 : /// Update timeline state with peer safekeeper data.
679 0 : pub async fn record_safekeeper_info(
680 0 : self: &Arc<Self>,
681 0 : sk_info: SafekeeperTimelineInfo,
682 0 : ) -> Result<()> {
683 : {
684 0 : let mut shared_state = self.write_shared_state().await;
685 0 : shared_state.sk.record_safekeeper_info(&sk_info).await?;
686 0 : let peer_info = peer_info_from_sk_info(&sk_info, Instant::now());
687 0 : shared_state.peers_info.upsert(&peer_info);
688 0 : }
689 0 : Ok(())
690 0 : }
691 :
692 0 : pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
693 0 : let shared_state = self.read_shared_state().await;
694 0 : shared_state.get_peers(conf.heartbeat_timeout)
695 0 : }
696 :
697 4 : pub fn get_walsenders(&self) -> &Arc<WalSenders> {
698 4 : &self.walsenders
699 4 : }
700 :
701 14 : pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
702 14 : &self.walreceivers
703 14 : }
704 :
705 : /// Returns flush_lsn.
706 0 : pub async fn get_flush_lsn(&self) -> Lsn {
707 0 : self.read_shared_state().await.sk.flush_lsn()
708 0 : }
709 :
710 : /// Gather timeline data for metrics.
711 0 : pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
712 0 : if self.is_cancelled() {
713 0 : return None;
714 0 : }
715 0 :
716 0 : let WalSendersTimelineMetricValues {
717 0 : ps_feedback_counter,
718 0 : last_ps_feedback,
719 0 : interpreted_wal_reader_tasks,
720 0 : } = self.walsenders.info_for_metrics();
721 :
722 0 : let state = self.read_shared_state().await;
723 0 : Some(FullTimelineInfo {
724 0 : ttid: self.ttid,
725 0 : ps_feedback_count: ps_feedback_counter,
726 0 : last_ps_feedback,
727 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
728 0 : timeline_is_active: self.broker_active.load(Ordering::Relaxed),
729 0 : num_computes: self.walreceivers.get_num() as u32,
730 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
731 0 : interpreted_wal_reader_tasks,
732 0 : epoch_start_lsn: state.sk.term_start_lsn(),
733 0 : mem_state: state.sk.state().inmem.clone(),
734 0 : persisted_state: TimelinePersistentState::clone(state.sk.state()),
735 0 : flush_lsn: state.sk.flush_lsn(),
736 0 : wal_storage: state.sk.wal_storage_metrics(),
737 0 : })
738 0 : }
739 :
740 : /// Returns in-memory timeline state to build a full debug dump.
741 0 : pub async fn memory_dump(&self) -> debug_dump::Memory {
742 0 : let state = self.read_shared_state().await;
743 :
744 0 : let (write_lsn, write_record_lsn, flush_lsn, file_open) =
745 0 : state.sk.wal_storage_internal_state();
746 0 :
747 0 : debug_dump::Memory {
748 0 : is_cancelled: self.is_cancelled(),
749 0 : peers_info_len: state.peers_info.0.len(),
750 0 : walsenders: self.walsenders.get_all_public(),
751 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
752 0 : active: self.broker_active.load(Ordering::Relaxed),
753 0 : num_computes: self.walreceivers.get_num() as u32,
754 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
755 0 : epoch_start_lsn: state.sk.term_start_lsn(),
756 0 : mem_state: state.sk.state().inmem.clone(),
757 0 : mgr_status: self.mgr_status.get(),
758 0 : write_lsn,
759 0 : write_record_lsn,
760 0 : flush_lsn,
761 0 : file_open,
762 0 : }
763 0 : }
764 :
765 : /// Apply a function to the control file state and persist it.
766 0 : pub async fn map_control_file<T>(
767 0 : self: &Arc<Self>,
768 0 : f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
769 0 : ) -> Result<T> {
770 0 : let mut state = self.write_shared_state().await;
771 0 : let mut persistent_state = state.sk.state_mut().start_change();
772 : // If f returns error, we abort the change and don't persist anything.
773 0 : let res = f(&mut persistent_state)?;
774 : // If persisting fails, we abort the change and return error.
775 0 : state
776 0 : .sk
777 0 : .state_mut()
778 0 : .finish_change(&persistent_state)
779 0 : .await?;
780 0 : Ok(res)
781 0 : }
782 :
783 0 : pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
784 0 : let mut state = self.write_shared_state().await;
785 0 : state.sk.term_bump(to).await
786 0 : }
787 :
788 0 : pub async fn membership_switch(
789 0 : self: &Arc<Self>,
790 0 : to: Configuration,
791 0 : ) -> Result<TimelineMembershipSwitchResponse> {
792 0 : let mut state = self.write_shared_state().await;
793 0 : state.sk.membership_switch(to).await
794 0 : }
795 :
796 : /// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
797 8 : async fn do_wal_residence_guard(
798 8 : self: &Arc<Self>,
799 8 : block: bool,
800 8 : ) -> Result<Option<WalResidentTimeline>> {
801 8 : let op_label = if block {
802 8 : "wal_residence_guard"
803 : } else {
804 0 : "try_wal_residence_guard"
805 : };
806 :
807 8 : if self.is_cancelled() {
808 0 : bail!(TimelineError::Cancelled(self.ttid));
809 8 : }
810 8 :
811 8 : debug!("requesting WalResidentTimeline guard");
812 8 : let started_at = Instant::now();
813 8 : let status_before = self.mgr_status.get();
814 :
815 : // Wait 30 seconds for the guard to be acquired. It can time out if someone is
816 : // holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
817 : // is stuck.
818 8 : let res = tokio::time::timeout_at(started_at + Duration::from_secs(30), async {
819 8 : if block {
820 8 : self.manager_ctl.wal_residence_guard().await.map(Some)
821 : } else {
822 0 : self.manager_ctl.try_wal_residence_guard().await
823 : }
824 8 : })
825 8 : .await;
826 :
827 8 : let guard = match res {
828 8 : Ok(Ok(guard)) => {
829 8 : let finished_at = Instant::now();
830 8 : let elapsed = finished_at - started_at;
831 8 : MISC_OPERATION_SECONDS
832 8 : .with_label_values(&[op_label])
833 8 : .observe(elapsed.as_secs_f64());
834 8 :
835 8 : guard
836 : }
837 0 : Ok(Err(e)) => {
838 0 : warn!(
839 0 : "error acquiring in {op_label}, statuses {:?} => {:?}",
840 0 : status_before,
841 0 : self.mgr_status.get()
842 : );
843 0 : return Err(e);
844 : }
845 : Err(_) => {
846 0 : warn!(
847 0 : "timeout acquiring in {op_label} guard, statuses {:?} => {:?}",
848 0 : status_before,
849 0 : self.mgr_status.get()
850 : );
851 0 : anyhow::bail!("timeout while acquiring WalResidentTimeline guard");
852 : }
853 : };
854 :
855 8 : Ok(guard.map(|g| WalResidentTimeline::new(self.clone(), g)))
856 8 : }
857 :
858 : /// Get the timeline guard for reading/writing WAL files.
859 : /// If WAL files are not present on disk (evicted), they will be automatically
860 : /// downloaded from remote storage. This is done in the manager task, which is
861 : /// responsible for issuing all guards.
862 : ///
863 : /// NB: don't use this function from timeline_manager, it will deadlock.
864 : /// NB: don't use this function while holding shared_state lock.
865 8 : pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
866 8 : self.do_wal_residence_guard(true)
867 8 : .await
868 8 : .map(|m| m.expect("Always get Some in block=true mode"))
869 8 : }
870 :
871 : /// Get the timeline guard for reading/writing WAL files if the timeline is resident,
872 : /// else return None
873 0 : pub(crate) async fn try_wal_residence_guard(
874 0 : self: &Arc<Self>,
875 0 : ) -> Result<Option<WalResidentTimeline>> {
876 0 : self.do_wal_residence_guard(false).await
877 0 : }
878 :
879 0 : pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
880 0 : self.manager_ctl.backup_partial_reset().await
881 0 : }
882 : }
883 :
884 : /// This is a guard that allows to read/write disk timeline state.
885 : /// All tasks that are trying to read/write WAL from disk should use this guard.
886 : pub struct WalResidentTimeline {
887 : pub tli: Arc<Timeline>,
888 : _guard: ResidenceGuard,
889 : }
890 :
891 : impl WalResidentTimeline {
892 12 : pub fn new(tli: Arc<Timeline>, _guard: ResidenceGuard) -> Self {
893 12 : WalResidentTimeline { tli, _guard }
894 12 : }
895 : }
896 :
897 : impl Deref for WalResidentTimeline {
898 : type Target = Arc<Timeline>;
899 :
900 5566 : fn deref(&self) -> &Self::Target {
901 5566 : &self.tli
902 5566 : }
903 : }
904 :
905 : impl WalResidentTimeline {
906 : /// Returns true if walsender should stop sending WAL to pageserver. We
907 : /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
908 : /// computes. While there might be nothing to stream already, we learn about
909 : /// remote_consistent_lsn update through replication feedback, and we want
910 : /// to stop pushing to the broker if pageserver is fully caughtup.
911 0 : pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
912 0 : if self.is_cancelled() {
913 0 : return true;
914 0 : }
915 0 : let shared_state = self.read_shared_state().await;
916 0 : if self.walreceivers.get_num() == 0 {
917 0 : return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet
918 0 : reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn;
919 0 : }
920 0 : false
921 0 : }
922 :
923 : /// Ensure that current term is t, erroring otherwise, and lock the state.
924 0 : pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
925 0 : let ss = self.read_shared_state().await;
926 0 : if ss.sk.state().acceptor_state.term != t {
927 0 : bail!(
928 0 : "failed to acquire term {}, current term {}",
929 0 : t,
930 0 : ss.sk.state().acceptor_state.term
931 0 : );
932 0 : }
933 0 : Ok(ss)
934 0 : }
935 :
936 : /// Pass arrived message to the safekeeper.
937 1220 : pub async fn process_msg(
938 1220 : &self,
939 1220 : msg: &ProposerAcceptorMessage,
940 1220 : ) -> Result<Option<AcceptorProposerMessage>> {
941 1220 : if self.is_cancelled() {
942 0 : bail!(TimelineError::Cancelled(self.ttid));
943 1220 : }
944 :
945 : let mut rmsg: Option<AcceptorProposerMessage>;
946 : {
947 1220 : let mut shared_state = self.write_shared_state().await;
948 1220 : rmsg = shared_state.sk.safekeeper().process_msg(msg).await?;
949 :
950 : // if this is AppendResponse, fill in proper hot standby feedback.
951 610 : if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
952 610 : resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
953 610 : }
954 : }
955 1220 : Ok(rmsg)
956 1220 : }
957 :
958 8 : pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
959 8 : let (_, persisted_state) = self.get_state().await;
960 8 : let enable_remote_read = self.conf.is_wal_backup_enabled();
961 8 :
962 8 : WalReader::new(
963 8 : &self.ttid,
964 8 : self.timeline_dir.clone(),
965 8 : &persisted_state,
966 8 : start_lsn,
967 8 : enable_remote_read,
968 8 : )
969 8 : }
970 :
971 0 : pub fn get_timeline_dir(&self) -> Utf8PathBuf {
972 0 : self.timeline_dir.clone()
973 0 : }
974 :
975 : /// Update in memory remote consistent lsn.
976 0 : pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
977 0 : let mut shared_state = self.write_shared_state().await;
978 0 : shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
979 0 : shared_state.sk.state().inmem.remote_consistent_lsn,
980 0 : candidate,
981 0 : );
982 0 : }
983 : }
984 :
985 : /// This struct contains methods that are used by timeline manager task.
986 : pub(crate) struct ManagerTimeline {
987 : pub(crate) tli: Arc<Timeline>,
988 : }
989 :
990 : impl Deref for ManagerTimeline {
991 : type Target = Arc<Timeline>;
992 :
993 445 : fn deref(&self) -> &Self::Target {
994 445 : &self.tli
995 445 : }
996 : }
997 :
998 : impl ManagerTimeline {
999 0 : pub(crate) fn timeline_dir(&self) -> &Utf8PathBuf {
1000 0 : &self.tli.timeline_dir
1001 0 : }
1002 :
1003 : /// Manager requests this state on startup.
1004 4 : pub(crate) async fn bootstrap_mgr(&self) -> (bool, Option<PartialRemoteSegment>) {
1005 4 : let shared_state = self.read_shared_state().await;
1006 4 : let is_offloaded = matches!(
1007 4 : shared_state.sk.state().eviction_state,
1008 : EvictionState::Offloaded(_)
1009 : );
1010 4 : let partial_backup_uploaded = shared_state.sk.state().partial_backup.uploaded_segment();
1011 4 :
1012 4 : (is_offloaded, partial_backup_uploaded)
1013 4 : }
1014 :
1015 : /// Try to switch state Present->Offloaded.
1016 0 : pub(crate) async fn switch_to_offloaded(
1017 0 : &self,
1018 0 : partial: &PartialRemoteSegment,
1019 0 : ) -> anyhow::Result<()> {
1020 0 : let mut shared = self.write_shared_state().await;
1021 :
1022 : // updating control file
1023 0 : let mut pstate = shared.sk.state_mut().start_change();
1024 :
1025 0 : if !matches!(pstate.eviction_state, EvictionState::Present) {
1026 0 : bail!(
1027 0 : "cannot switch to offloaded state, current state is {:?}",
1028 0 : pstate.eviction_state
1029 0 : );
1030 0 : }
1031 0 :
1032 0 : if partial.flush_lsn != shared.sk.flush_lsn() {
1033 0 : bail!(
1034 0 : "flush_lsn mismatch in partial backup, expected {}, got {}",
1035 0 : shared.sk.flush_lsn(),
1036 0 : partial.flush_lsn
1037 0 : );
1038 0 : }
1039 0 :
1040 0 : if partial.commit_lsn != pstate.commit_lsn {
1041 0 : bail!(
1042 0 : "commit_lsn mismatch in partial backup, expected {}, got {}",
1043 0 : pstate.commit_lsn,
1044 0 : partial.commit_lsn
1045 0 : );
1046 0 : }
1047 0 :
1048 0 : if partial.term != shared.sk.last_log_term() {
1049 0 : bail!(
1050 0 : "term mismatch in partial backup, expected {}, got {}",
1051 0 : shared.sk.last_log_term(),
1052 0 : partial.term
1053 0 : );
1054 0 : }
1055 0 :
1056 0 : pstate.eviction_state = EvictionState::Offloaded(shared.sk.flush_lsn());
1057 0 : shared.sk.state_mut().finish_change(&pstate).await?;
1058 : // control file is now switched to Offloaded state
1059 :
1060 : // now we can switch shared.sk to Offloaded, shouldn't fail
1061 0 : let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
1062 0 : let cfile_state = prev_sk.take_state();
1063 0 : shared.sk = StateSK::Offloaded(Box::new(cfile_state));
1064 0 :
1065 0 : Ok(())
1066 0 : }
1067 :
1068 : /// Try to switch state Offloaded->Present.
1069 0 : pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
1070 0 : let mut shared = self.write_shared_state().await;
1071 :
1072 : // trying to restore WAL storage
1073 0 : let wal_store = wal_storage::PhysicalStorage::new(
1074 0 : &self.ttid,
1075 0 : &self.timeline_dir,
1076 0 : shared.sk.state(),
1077 0 : self.conf.no_sync,
1078 0 : )?;
1079 :
1080 : // updating control file
1081 0 : let mut pstate = shared.sk.state_mut().start_change();
1082 :
1083 0 : if !matches!(pstate.eviction_state, EvictionState::Offloaded(_)) {
1084 0 : bail!(
1085 0 : "cannot switch to present state, current state is {:?}",
1086 0 : pstate.eviction_state
1087 0 : );
1088 0 : }
1089 0 :
1090 0 : if wal_store.flush_lsn() != shared.sk.flush_lsn() {
1091 0 : bail!(
1092 0 : "flush_lsn mismatch in restored WAL, expected {}, got {}",
1093 0 : shared.sk.flush_lsn(),
1094 0 : wal_store.flush_lsn()
1095 0 : );
1096 0 : }
1097 0 :
1098 0 : pstate.eviction_state = EvictionState::Present;
1099 0 : shared.sk.state_mut().finish_change(&pstate).await?;
1100 :
1101 : // now we can switch shared.sk to Present, shouldn't fail
1102 0 : let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
1103 0 : let cfile_state = prev_sk.take_state();
1104 0 : shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, self.conf.my_id)?);
1105 :
1106 0 : Ok(())
1107 0 : }
1108 :
1109 : /// Update current manager state, useful for debugging manager deadlocks.
1110 239 : pub(crate) fn set_status(&self, status: timeline_manager::Status) {
1111 239 : self.mgr_status.store(status, Ordering::Relaxed);
1112 239 : }
1113 : }
1114 :
1115 : /// Deletes directory and it's contents. Returns false if directory does not exist.
1116 0 : pub async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
1117 0 : match fs::remove_dir_all(path).await {
1118 0 : Ok(_) => Ok(true),
1119 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
1120 0 : Err(e) => Err(e.into()),
1121 : }
1122 0 : }
1123 :
1124 : /// Get a path to the tenant directory. If you just need to get a timeline directory,
1125 : /// use WalResidentTimeline::get_timeline_dir instead.
1126 8 : pub fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
1127 8 : conf.workdir.join(tenant_id.to_string())
1128 8 : }
1129 :
1130 : /// Get a path to the timeline directory. If you need to read WAL files from disk,
1131 : /// use WalResidentTimeline::get_timeline_dir instead. This function does not check
1132 : /// timeline eviction status and WAL files might not be present on disk.
1133 8 : pub fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
1134 8 : get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
1135 8 : }
|