LCOV - code coverage report
Current view: top level - safekeeper/src - timeline.rs (source / functions) Coverage Total Hit
Test: 496e96cdfff2df79370229591d6427cda12fde29.info Lines: 0.0 % 588 0
Test Date: 2024-05-21 18:28:29 Functions: 0.0 % 93 0

            Line data    Source code
       1              : //! This module implements Timeline lifecycle management and has all necessary code
       2              : //! to glue together SafeKeeper and all other background services.
       3              : 
       4              : use anyhow::{anyhow, bail, Result};
       5              : use camino::Utf8PathBuf;
       6              : use postgres_ffi::XLogSegNo;
       7              : use serde::{Deserialize, Serialize};
       8              : use tokio::fs;
       9              : 
      10              : use std::cmp::max;
      11              : use std::sync::Arc;
      12              : use std::time::Duration;
      13              : use tokio::sync::{Mutex, MutexGuard};
      14              : use tokio::{
      15              :     sync::{mpsc::Sender, watch},
      16              :     time::Instant,
      17              : };
      18              : use tracing::*;
      19              : use utils::http::error::ApiError;
      20              : use utils::{
      21              :     id::{NodeId, TenantTimelineId},
      22              :     lsn::Lsn,
      23              : };
      24              : 
      25              : use storage_broker::proto::SafekeeperTimelineInfo;
      26              : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      27              : 
      28              : use crate::receive_wal::WalReceivers;
      29              : use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo};
      30              : use crate::safekeeper::{
      31              :     AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
      32              :     INVALID_TERM,
      33              : };
      34              : use crate::send_wal::WalSenders;
      35              : use crate::state::{TimelineMemState, TimelinePersistentState};
      36              : use crate::wal_backup::{self};
      37              : use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
      38              : 
      39              : use crate::metrics::FullTimelineInfo;
      40              : use crate::wal_storage::Storage as wal_storage_iface;
      41              : use crate::{debug_dump, wal_backup_partial, wal_storage};
      42              : use crate::{GlobalTimelines, SafeKeeperConf};
      43              : 
      44              : /// Things safekeeper should know about timeline state on peers.
      45            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
      46              : pub struct PeerInfo {
      47              :     pub sk_id: NodeId,
      48              :     pub term: Term,
      49              :     /// Term of the last entry.
      50              :     pub last_log_term: Term,
      51              :     /// LSN of the last record.
      52              :     pub flush_lsn: Lsn,
      53              :     pub commit_lsn: Lsn,
      54              :     /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
      55              :     /// sk since backup_lsn.
      56              :     pub local_start_lsn: Lsn,
      57              :     /// When info was received. Serde annotations are not very useful but make
      58              :     /// the code compile -- we don't rely on this field externally.
      59              :     #[serde(skip)]
      60              :     #[serde(default = "Instant::now")]
      61              :     ts: Instant,
      62              :     pub pg_connstr: String,
      63              :     pub http_connstr: String,
      64              : }
      65              : 
      66              : impl PeerInfo {
      67            0 :     fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
      68            0 :         PeerInfo {
      69            0 :             sk_id: NodeId(sk_info.safekeeper_id),
      70            0 :             term: sk_info.term,
      71            0 :             last_log_term: sk_info.last_log_term,
      72            0 :             flush_lsn: Lsn(sk_info.flush_lsn),
      73            0 :             commit_lsn: Lsn(sk_info.commit_lsn),
      74            0 :             local_start_lsn: Lsn(sk_info.local_start_lsn),
      75            0 :             pg_connstr: sk_info.safekeeper_connstr.clone(),
      76            0 :             http_connstr: sk_info.http_connstr.clone(),
      77            0 :             ts,
      78            0 :         }
      79            0 :     }
      80              : }
      81              : 
      82              : // vector-based node id -> peer state map with very limited functionality we
      83              : // need.
      84              : #[derive(Debug, Clone, Default)]
      85              : pub struct PeersInfo(pub Vec<PeerInfo>);
      86              : 
      87              : impl PeersInfo {
      88            0 :     fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
      89            0 :         self.0.iter_mut().find(|p| p.sk_id == id)
      90            0 :     }
      91              : 
      92            0 :     fn upsert(&mut self, p: &PeerInfo) {
      93            0 :         match self.get(p.sk_id) {
      94            0 :             Some(rp) => *rp = p.clone(),
      95            0 :             None => self.0.push(p.clone()),
      96              :         }
      97            0 :     }
      98              : }
      99              : 
     100              : /// Shared state associated with database instance
     101              : pub struct SharedState {
     102              :     /// Safekeeper object
     103              :     sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
     104              :     /// In memory list containing state of peers sent in latest messages from them.
     105              :     peers_info: PeersInfo,
     106              :     /// True when WAL backup launcher oversees the timeline, making sure WAL is
     107              :     /// offloaded, allows to bother launcher less.
     108              :     wal_backup_active: bool,
     109              :     /// True whenever there is at least some pending activity on timeline: live
     110              :     /// compute connection, pageserver is not caughtup (it must have latest WAL
     111              :     /// for new compute start) or WAL backuping is not finished. Practically it
     112              :     /// means safekeepers broadcast info to peers about the timeline, old WAL is
     113              :     /// trimmed.
     114              :     ///
     115              :     /// TODO: it might be better to remove tli completely from GlobalTimelines
     116              :     /// when tli is inactive instead of having this flag.
     117              :     active: bool,
     118              :     last_removed_segno: XLogSegNo,
     119              : }
     120              : 
     121              : impl SharedState {
     122              :     /// Initialize fresh timeline state without persisting anything to disk.
     123            0 :     fn create_new(
     124            0 :         conf: &SafeKeeperConf,
     125            0 :         ttid: &TenantTimelineId,
     126            0 :         state: TimelinePersistentState,
     127            0 :     ) -> Result<Self> {
     128            0 :         if state.server.wal_seg_size == 0 {
     129            0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     130            0 :         }
     131            0 : 
     132            0 :         if state.server.pg_version == UNKNOWN_SERVER_VERSION {
     133            0 :             bail!(TimelineError::UninitialinzedPgVersion(*ttid));
     134            0 :         }
     135            0 : 
     136            0 :         if state.commit_lsn < state.local_start_lsn {
     137            0 :             bail!(
     138            0 :                 "commit_lsn {} is higher than local_start_lsn {}",
     139            0 :                 state.commit_lsn,
     140            0 :                 state.local_start_lsn
     141            0 :             );
     142            0 :         }
     143            0 : 
     144            0 :         // We don't want to write anything to disk, because we may have existing timeline there.
     145            0 :         // These functions should not change anything on disk.
     146            0 :         let timeline_dir = conf.timeline_dir(ttid);
     147            0 :         let control_store = control_file::FileStorage::create_new(timeline_dir, conf, state)?;
     148            0 :         let wal_store =
     149            0 :             wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
     150            0 :         let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
     151              : 
     152            0 :         Ok(Self {
     153            0 :             sk,
     154            0 :             peers_info: PeersInfo(vec![]),
     155            0 :             wal_backup_active: false,
     156            0 :             active: false,
     157            0 :             last_removed_segno: 0,
     158            0 :         })
     159            0 :     }
     160              : 
     161              :     /// Restore SharedState from control file. If file doesn't exist, bails out.
     162            0 :     fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
     163            0 :         let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
     164            0 :         if control_store.server.wal_seg_size == 0 {
     165            0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     166            0 :         }
     167              : 
     168            0 :         let wal_store =
     169            0 :             wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
     170              : 
     171              :         Ok(Self {
     172            0 :             sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
     173            0 :             peers_info: PeersInfo(vec![]),
     174              :             wal_backup_active: false,
     175              :             active: false,
     176              :             last_removed_segno: 0,
     177              :         })
     178            0 :     }
     179              : 
     180            0 :     fn is_active(&self, num_computes: usize) -> bool {
     181            0 :         self.is_wal_backup_required(num_computes)
     182              :             // FIXME: add tracking of relevant pageservers and check them here individually,
     183              :             // otherwise migration won't work (we suspend too early).
     184            0 :             || self.sk.state.inmem.remote_consistent_lsn < self.sk.state.inmem.commit_lsn
     185            0 :     }
     186              : 
     187              :     /// Mark timeline active/inactive and return whether s3 offloading requires
     188              :     /// start/stop action. If timeline is deactivated, control file is persisted
     189              :     /// as maintenance task does that only for active timelines.
     190            0 :     async fn update_status(&mut self, num_computes: usize, ttid: TenantTimelineId) -> bool {
     191            0 :         let is_active = self.is_active(num_computes);
     192            0 :         if self.active != is_active {
     193            0 :             info!(
     194            0 :                 "timeline {} active={} now, remote_consistent_lsn={}, commit_lsn={}",
     195              :                 ttid,
     196              :                 is_active,
     197              :                 self.sk.state.inmem.remote_consistent_lsn,
     198              :                 self.sk.state.inmem.commit_lsn
     199              :             );
     200            0 :             if !is_active {
     201            0 :                 if let Err(e) = self.sk.state.flush().await {
     202            0 :                     warn!("control file save in update_status failed: {:?}", e);
     203            0 :                 }
     204            0 :             }
     205            0 :         }
     206            0 :         self.active = is_active;
     207            0 :         self.is_wal_backup_action_pending(num_computes)
     208            0 :     }
     209              : 
     210              :     /// Should we run s3 offloading in current state?
     211            0 :     fn is_wal_backup_required(&self, num_computes: usize) -> bool {
     212            0 :         let seg_size = self.get_wal_seg_size();
     213            0 :         num_computes > 0 ||
     214              :         // Currently only the whole segment is offloaded, so compare segment numbers.
     215            0 :             (self.sk.state.inmem.commit_lsn.segment_number(seg_size) >
     216            0 :              self.sk.state.inmem.backup_lsn.segment_number(seg_size))
     217            0 :     }
     218              : 
     219              :     /// Is current state of s3 offloading is not what it ought to be?
     220            0 :     fn is_wal_backup_action_pending(&self, num_computes: usize) -> bool {
     221            0 :         let res = self.wal_backup_active != self.is_wal_backup_required(num_computes);
     222            0 :         if res {
     223            0 :             let action_pending = if self.is_wal_backup_required(num_computes) {
     224            0 :                 "start"
     225              :             } else {
     226            0 :                 "stop"
     227              :             };
     228            0 :             trace!(
     229            0 :                 "timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
     230            0 :                 self.sk.state.timeline_id, action_pending, num_computes, self.sk.state.inmem.commit_lsn, self.sk.state.inmem.backup_lsn
     231              :             );
     232            0 :         }
     233            0 :         res
     234            0 :     }
     235              : 
     236              :     /// Returns whether s3 offloading is required and sets current status as
     237              :     /// matching.
     238            0 :     fn wal_backup_attend(&mut self, num_computes: usize) -> bool {
     239            0 :         self.wal_backup_active = self.is_wal_backup_required(num_computes);
     240            0 :         self.wal_backup_active
     241            0 :     }
     242              : 
     243            0 :     fn get_wal_seg_size(&self) -> usize {
     244            0 :         self.sk.state.server.wal_seg_size as usize
     245            0 :     }
     246              : 
     247            0 :     fn get_safekeeper_info(
     248            0 :         &self,
     249            0 :         ttid: &TenantTimelineId,
     250            0 :         conf: &SafeKeeperConf,
     251            0 :         standby_apply_lsn: Lsn,
     252            0 :     ) -> SafekeeperTimelineInfo {
     253            0 :         SafekeeperTimelineInfo {
     254            0 :             safekeeper_id: conf.my_id.0,
     255            0 :             tenant_timeline_id: Some(ProtoTenantTimelineId {
     256            0 :                 tenant_id: ttid.tenant_id.as_ref().to_owned(),
     257            0 :                 timeline_id: ttid.timeline_id.as_ref().to_owned(),
     258            0 :             }),
     259            0 :             term: self.sk.state.acceptor_state.term,
     260            0 :             last_log_term: self.sk.get_epoch(),
     261            0 :             flush_lsn: self.sk.flush_lsn().0,
     262            0 :             // note: this value is not flushed to control file yet and can be lost
     263            0 :             commit_lsn: self.sk.state.inmem.commit_lsn.0,
     264            0 :             remote_consistent_lsn: self.sk.state.inmem.remote_consistent_lsn.0,
     265            0 :             peer_horizon_lsn: self.sk.state.inmem.peer_horizon_lsn.0,
     266            0 :             safekeeper_connstr: conf
     267            0 :                 .advertise_pg_addr
     268            0 :                 .to_owned()
     269            0 :                 .unwrap_or(conf.listen_pg_addr.clone()),
     270            0 :             http_connstr: conf.listen_http_addr.to_owned(),
     271            0 :             backup_lsn: self.sk.state.inmem.backup_lsn.0,
     272            0 :             local_start_lsn: self.sk.state.local_start_lsn.0,
     273            0 :             availability_zone: conf.availability_zone.clone(),
     274            0 :             standby_horizon: standby_apply_lsn.0,
     275            0 :         }
     276            0 :     }
     277              : 
     278              :     /// Get our latest view of alive peers status on the timeline.
     279              :     /// We pass our own info through the broker as well, so when we don't have connection
     280              :     /// to the broker returned vec is empty.
     281            0 :     fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
     282            0 :         let now = Instant::now();
     283            0 :         self.peers_info
     284            0 :             .0
     285            0 :             .iter()
     286            0 :             // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
     287            0 :             .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
     288            0 :             .cloned()
     289            0 :             .collect()
     290            0 :     }
     291              : 
     292              :     /// Get oldest segno we still need to keep. We hold WAL till it is consumed
     293              :     /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
     294              :     /// offloading.
     295              :     /// While it is safe to use inmem values for determining horizon,
     296              :     /// we use persistent to make possible normal states less surprising.
     297            0 :     fn get_horizon_segno(
     298            0 :         &self,
     299            0 :         wal_backup_enabled: bool,
     300            0 :         extra_horizon_lsn: Option<Lsn>,
     301            0 :     ) -> XLogSegNo {
     302            0 :         let state = &self.sk.state;
     303            0 : 
     304            0 :         use std::cmp::min;
     305            0 :         let mut horizon_lsn = min(state.remote_consistent_lsn, state.peer_horizon_lsn);
     306            0 :         if wal_backup_enabled {
     307            0 :             horizon_lsn = min(horizon_lsn, state.backup_lsn);
     308            0 :         }
     309            0 :         if let Some(extra_horizon_lsn) = extra_horizon_lsn {
     310            0 :             horizon_lsn = min(horizon_lsn, extra_horizon_lsn);
     311            0 :         }
     312            0 :         horizon_lsn.segment_number(state.server.wal_seg_size as usize)
     313            0 :     }
     314              : }
     315              : 
     316            0 : #[derive(Debug, thiserror::Error)]
     317              : pub enum TimelineError {
     318              :     #[error("Timeline {0} was cancelled and cannot be used anymore")]
     319              :     Cancelled(TenantTimelineId),
     320              :     #[error("Timeline {0} was not found in global map")]
     321              :     NotFound(TenantTimelineId),
     322              :     #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
     323              :     Invalid(TenantTimelineId),
     324              :     #[error("Timeline {0} is already exists")]
     325              :     AlreadyExists(TenantTimelineId),
     326              :     #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
     327              :     UninitializedWalSegSize(TenantTimelineId),
     328              :     #[error("Timeline {0} is not initialized, pg_version is unknown")]
     329              :     UninitialinzedPgVersion(TenantTimelineId),
     330              : }
     331              : 
     332              : // Convert to HTTP API error.
     333              : impl From<TimelineError> for ApiError {
     334            0 :     fn from(te: TimelineError) -> ApiError {
     335            0 :         match te {
     336            0 :             TimelineError::NotFound(ttid) => {
     337            0 :                 ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
     338              :             }
     339            0 :             _ => ApiError::InternalServerError(anyhow!("{}", te)),
     340              :         }
     341            0 :     }
     342              : }
     343              : 
     344              : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
     345              : /// It also holds SharedState and provides mutually exclusive access to it.
     346              : pub struct Timeline {
     347              :     pub ttid: TenantTimelineId,
     348              : 
     349              :     /// Sending here asks for wal backup launcher attention (start/stop
     350              :     /// offloading). Sending ttid instead of concrete command allows to do
     351              :     /// sending without timeline lock.
     352              :     pub wal_backup_launcher_tx: Sender<TenantTimelineId>,
     353              : 
     354              :     /// Used to broadcast commit_lsn updates to all background jobs.
     355              :     commit_lsn_watch_tx: watch::Sender<Lsn>,
     356              :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     357              : 
     358              :     /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
     359              :     /// them when sending in recovery mode (to walproposer or peers). Note: this
     360              :     /// is just a notification, WAL reading should always done with lock held as
     361              :     /// term can change otherwise.
     362              :     term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
     363              :     term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
     364              : 
     365              :     /// Safekeeper and other state, that should remain consistent and
     366              :     /// synchronized with the disk. This is tokio mutex as we write WAL to disk
     367              :     /// while holding it, ensuring that consensus checks are in order.
     368              :     mutex: Mutex<SharedState>,
     369              :     walsenders: Arc<WalSenders>,
     370              :     walreceivers: Arc<WalReceivers>,
     371              : 
     372              :     /// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
     373              :     cancellation_tx: watch::Sender<bool>,
     374              : 
     375              :     /// Timeline should not be used after cancellation. Background tasks should
     376              :     /// monitor this channel and stop eventually after receiving `true` from this channel.
     377              :     cancellation_rx: watch::Receiver<bool>,
     378              : 
     379              :     /// Directory where timeline state is stored.
     380              :     pub timeline_dir: Utf8PathBuf,
     381              : 
     382              :     /// Should we keep WAL on disk for active replication connections.
     383              :     /// Especially useful for sharding, when different shards process WAL
     384              :     /// with different speed.
     385              :     // TODO: add `Arc<SafeKeeperConf>` here instead of adding each field separately.
     386              :     walsenders_keep_horizon: bool,
     387              : }
     388              : 
     389              : impl Timeline {
     390              :     /// Load existing timeline from disk.
     391            0 :     pub fn load_timeline(
     392            0 :         conf: &SafeKeeperConf,
     393            0 :         ttid: TenantTimelineId,
     394            0 :         wal_backup_launcher_tx: Sender<TenantTimelineId>,
     395            0 :     ) -> Result<Timeline> {
     396            0 :         let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
     397              : 
     398            0 :         let shared_state = SharedState::restore(conf, &ttid)?;
     399            0 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
     400            0 :             watch::channel(shared_state.sk.state.commit_lsn);
     401            0 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
     402            0 :             shared_state.sk.get_term(),
     403            0 :             shared_state.sk.flush_lsn(),
     404            0 :         )));
     405            0 :         let (cancellation_tx, cancellation_rx) = watch::channel(false);
     406            0 : 
     407            0 :         let walreceivers = WalReceivers::new();
     408            0 :         Ok(Timeline {
     409            0 :             ttid,
     410            0 :             wal_backup_launcher_tx,
     411            0 :             commit_lsn_watch_tx,
     412            0 :             commit_lsn_watch_rx,
     413            0 :             term_flush_lsn_watch_tx,
     414            0 :             term_flush_lsn_watch_rx,
     415            0 :             mutex: Mutex::new(shared_state),
     416            0 :             walsenders: WalSenders::new(walreceivers.clone()),
     417            0 :             walreceivers,
     418            0 :             cancellation_rx,
     419            0 :             cancellation_tx,
     420            0 :             timeline_dir: conf.timeline_dir(&ttid),
     421            0 :             walsenders_keep_horizon: conf.walsenders_keep_horizon,
     422            0 :         })
     423            0 :     }
     424              : 
     425              :     /// Create a new timeline, which is not yet persisted to disk.
     426            0 :     pub fn create_empty(
     427            0 :         conf: &SafeKeeperConf,
     428            0 :         ttid: TenantTimelineId,
     429            0 :         wal_backup_launcher_tx: Sender<TenantTimelineId>,
     430            0 :         server_info: ServerInfo,
     431            0 :         commit_lsn: Lsn,
     432            0 :         local_start_lsn: Lsn,
     433            0 :     ) -> Result<Timeline> {
     434            0 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
     435            0 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
     436            0 :             watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
     437            0 :         let (cancellation_tx, cancellation_rx) = watch::channel(false);
     438            0 :         let state =
     439            0 :             TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
     440            0 : 
     441            0 :         let walreceivers = WalReceivers::new();
     442            0 :         Ok(Timeline {
     443            0 :             ttid,
     444            0 :             wal_backup_launcher_tx,
     445            0 :             commit_lsn_watch_tx,
     446            0 :             commit_lsn_watch_rx,
     447            0 :             term_flush_lsn_watch_tx,
     448            0 :             term_flush_lsn_watch_rx,
     449            0 :             mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
     450            0 :             walsenders: WalSenders::new(walreceivers.clone()),
     451            0 :             walreceivers,
     452            0 :             cancellation_rx,
     453            0 :             cancellation_tx,
     454            0 :             timeline_dir: conf.timeline_dir(&ttid),
     455            0 :             walsenders_keep_horizon: conf.walsenders_keep_horizon,
     456              :         })
     457            0 :     }
     458              : 
     459              :     /// Initialize fresh timeline on disk and start background tasks. If init
     460              :     /// fails, timeline is cancelled and cannot be used anymore.
     461              :     ///
     462              :     /// Init is transactional, so if it fails, created files will be deleted,
     463              :     /// and state on disk should remain unchanged.
     464            0 :     pub async fn init_new(
     465            0 :         self: &Arc<Timeline>,
     466            0 :         shared_state: &mut MutexGuard<'_, SharedState>,
     467            0 :         conf: &SafeKeeperConf,
     468            0 :     ) -> Result<()> {
     469            0 :         match fs::metadata(&self.timeline_dir).await {
     470              :             Ok(_) => {
     471              :                 // Timeline directory exists on disk, we should leave state unchanged
     472              :                 // and return error.
     473            0 :                 bail!(TimelineError::Invalid(self.ttid));
     474              :             }
     475            0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     476            0 :             Err(e) => {
     477            0 :                 return Err(e.into());
     478              :             }
     479              :         }
     480              : 
     481              :         // Create timeline directory.
     482            0 :         fs::create_dir_all(&self.timeline_dir).await?;
     483              : 
     484              :         // Write timeline to disk and start background tasks.
     485            0 :         if let Err(e) = shared_state.sk.state.flush().await {
     486              :             // Bootstrap failed, cancel timeline and remove timeline directory.
     487            0 :             self.cancel(shared_state);
     488              : 
     489            0 :             if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
     490            0 :                 warn!(
     491            0 :                     "failed to remove timeline {} directory after bootstrap failure: {}",
     492            0 :                     self.ttid, fs_err
     493              :                 );
     494            0 :             }
     495              : 
     496            0 :             return Err(e);
     497            0 :         }
     498            0 :         self.bootstrap(conf);
     499            0 :         Ok(())
     500            0 :     }
     501              : 
     502              :     /// Bootstrap new or existing timeline starting background stasks.
     503            0 :     pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
     504            0 :         // Start recovery task which always runs on the timeline.
     505            0 :         if conf.peer_recovery_enabled {
     506            0 :             tokio::spawn(recovery_main(self.clone(), conf.clone()));
     507            0 :         }
     508            0 :         if conf.is_wal_backup_enabled() && conf.partial_backup_enabled {
     509            0 :             tokio::spawn(wal_backup_partial::main_task(self.clone(), conf.clone()));
     510            0 :         }
     511            0 :     }
     512              : 
     513              :     /// Delete timeline from disk completely, by removing timeline directory.
     514              :     /// Background timeline activities will stop eventually.
     515              :     ///
     516              :     /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
     517              :     /// deletion API endpoint is retriable.
     518            0 :     pub async fn delete(
     519            0 :         &self,
     520            0 :         shared_state: &mut MutexGuard<'_, SharedState>,
     521            0 :         only_local: bool,
     522            0 :     ) -> Result<(bool, bool)> {
     523            0 :         let was_active = shared_state.active;
     524            0 :         self.cancel(shared_state);
     525            0 : 
     526            0 :         // TODO: It's better to wait for s3 offloader termination before
     527            0 :         // removing data from s3. Though since s3 doesn't have transactions it
     528            0 :         // still wouldn't guarantee absense of data after removal.
     529            0 :         let conf = GlobalTimelines::get_global_config();
     530            0 :         if !only_local && conf.is_wal_backup_enabled() {
     531              :             // Note: we concurrently delete remote storage data from multiple
     532              :             // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
     533              :             // do some retries anyway.
     534            0 :             wal_backup::delete_timeline(&self.ttid).await?;
     535            0 :         }
     536            0 :         let dir_existed = delete_dir(&self.timeline_dir).await?;
     537            0 :         Ok((dir_existed, was_active))
     538            0 :     }
     539              : 
     540              :     /// Cancel timeline to prevent further usage. Background tasks will stop
     541              :     /// eventually after receiving cancellation signal.
     542              :     ///
     543              :     /// Note that we can't notify backup launcher here while holding
     544              :     /// shared_state lock, as this is a potential deadlock: caller is
     545              :     /// responsible for that. Generally we should probably make WAL backup tasks
     546              :     /// to shut down on their own, checking once in a while whether it is the
     547              :     /// time.
     548            0 :     fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
     549            0 :         info!("timeline {} is cancelled", self.ttid);
     550            0 :         let _ = self.cancellation_tx.send(true);
     551            0 :         // Close associated FDs. Nobody will be able to touch timeline data once
     552            0 :         // it is cancelled, so WAL storage won't be opened again.
     553            0 :         shared_state.sk.wal_store.close();
     554            0 :     }
     555              : 
     556              :     /// Returns if timeline is cancelled.
     557            0 :     pub fn is_cancelled(&self) -> bool {
     558            0 :         *self.cancellation_rx.borrow()
     559            0 :     }
     560              : 
     561              :     /// Returns watch channel which gets value when timeline is cancelled. It is
     562              :     /// guaranteed to have not cancelled value observed (errors otherwise).
     563            0 :     pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
     564            0 :         let rx = self.cancellation_rx.clone();
     565            0 :         if *rx.borrow() {
     566            0 :             bail!(TimelineError::Cancelled(self.ttid));
     567            0 :         }
     568            0 :         Ok(rx)
     569            0 :     }
     570              : 
     571              :     /// Take a writing mutual exclusive lock on timeline shared_state.
     572            0 :     pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
     573            0 :         self.mutex.lock().await
     574            0 :     }
     575              : 
     576            0 :     async fn update_status(&self, shared_state: &mut SharedState) -> bool {
     577            0 :         shared_state
     578            0 :             .update_status(self.walreceivers.get_num(), self.ttid)
     579            0 :             .await
     580            0 :     }
     581              : 
     582              :     /// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
     583            0 :     pub async fn update_status_notify(&self) -> Result<()> {
     584            0 :         if self.is_cancelled() {
     585            0 :             bail!(TimelineError::Cancelled(self.ttid));
     586            0 :         }
     587            0 :         let is_wal_backup_action_pending: bool = {
     588            0 :             let mut shared_state = self.write_shared_state().await;
     589            0 :             self.update_status(&mut shared_state).await
     590              :         };
     591            0 :         if is_wal_backup_action_pending {
     592              :             // Can fail only if channel to a static thread got closed, which is not normal at all.
     593            0 :             self.wal_backup_launcher_tx.send(self.ttid).await?;
     594            0 :         }
     595            0 :         Ok(())
     596            0 :     }
     597              : 
     598              :     /// Returns true if walsender should stop sending WAL to pageserver. We
     599              :     /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
     600              :     /// computes. While there might be nothing to stream already, we learn about
     601              :     /// remote_consistent_lsn update through replication feedback, and we want
     602              :     /// to stop pushing to the broker if pageserver is fully caughtup.
     603            0 :     pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
     604            0 :         if self.is_cancelled() {
     605            0 :             return true;
     606            0 :         }
     607            0 :         let shared_state = self.write_shared_state().await;
     608            0 :         if self.walreceivers.get_num() == 0 {
     609            0 :             return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
     610            0 :             reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
     611            0 :         }
     612            0 :         false
     613            0 :     }
     614              : 
     615              :     /// Ensure taht current term is t, erroring otherwise, and lock the state.
     616            0 :     pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
     617            0 :         let ss = self.write_shared_state().await;
     618            0 :         if ss.sk.state.acceptor_state.term != t {
     619            0 :             bail!(
     620            0 :                 "failed to acquire term {}, current term {}",
     621            0 :                 t,
     622            0 :                 ss.sk.state.acceptor_state.term
     623            0 :             );
     624            0 :         }
     625            0 :         Ok(ss)
     626            0 :     }
     627              : 
     628              :     /// Returns whether s3 offloading is required and sets current status as
     629              :     /// matching it.
     630            0 :     pub async fn wal_backup_attend(&self) -> bool {
     631            0 :         if self.is_cancelled() {
     632            0 :             return false;
     633            0 :         }
     634            0 : 
     635            0 :         self.write_shared_state()
     636            0 :             .await
     637            0 :             .wal_backup_attend(self.walreceivers.get_num())
     638            0 :     }
     639              : 
     640              :     /// Returns commit_lsn watch channel.
     641            0 :     pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
     642            0 :         self.commit_lsn_watch_rx.clone()
     643            0 :     }
     644              : 
     645              :     /// Returns term_flush_lsn watch channel.
     646            0 :     pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
     647            0 :         self.term_flush_lsn_watch_rx.clone()
     648            0 :     }
     649              : 
     650              :     /// Pass arrived message to the safekeeper.
     651            0 :     pub async fn process_msg(
     652            0 :         &self,
     653            0 :         msg: &ProposerAcceptorMessage,
     654            0 :     ) -> Result<Option<AcceptorProposerMessage>> {
     655            0 :         if self.is_cancelled() {
     656            0 :             bail!(TimelineError::Cancelled(self.ttid));
     657            0 :         }
     658              : 
     659              :         let mut rmsg: Option<AcceptorProposerMessage>;
     660              :         let commit_lsn: Lsn;
     661              :         let term_flush_lsn: TermLsn;
     662              :         {
     663            0 :             let mut shared_state = self.write_shared_state().await;
     664            0 :             rmsg = shared_state.sk.process_msg(msg).await?;
     665              : 
     666              :             // if this is AppendResponse, fill in proper hot standby feedback.
     667            0 :             if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
     668            0 :                 resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
     669            0 :             }
     670              : 
     671            0 :             commit_lsn = shared_state.sk.state.inmem.commit_lsn;
     672            0 :             term_flush_lsn =
     673            0 :                 TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
     674            0 :         }
     675            0 :         self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
     676            0 :         self.commit_lsn_watch_tx.send(commit_lsn)?;
     677            0 :         Ok(rmsg)
     678            0 :     }
     679              : 
     680              :     /// Returns wal_seg_size.
     681            0 :     pub async fn get_wal_seg_size(&self) -> usize {
     682            0 :         self.write_shared_state().await.get_wal_seg_size()
     683            0 :     }
     684              : 
     685              :     /// Returns true only if the timeline is loaded and active.
     686            0 :     pub async fn is_active(&self) -> bool {
     687            0 :         if self.is_cancelled() {
     688            0 :             return false;
     689            0 :         }
     690            0 : 
     691            0 :         self.write_shared_state().await.active
     692            0 :     }
     693              : 
     694              :     /// Returns state of the timeline.
     695            0 :     pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
     696            0 :         let state = self.write_shared_state().await;
     697            0 :         (state.sk.state.inmem.clone(), state.sk.state.clone())
     698            0 :     }
     699              : 
     700              :     /// Returns latest backup_lsn.
     701            0 :     pub async fn get_wal_backup_lsn(&self) -> Lsn {
     702            0 :         self.write_shared_state().await.sk.state.inmem.backup_lsn
     703            0 :     }
     704              : 
     705              :     /// Sets backup_lsn to the given value.
     706            0 :     pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
     707            0 :         if self.is_cancelled() {
     708            0 :             bail!(TimelineError::Cancelled(self.ttid));
     709            0 :         }
     710              : 
     711            0 :         let mut state = self.write_shared_state().await;
     712            0 :         state.sk.state.inmem.backup_lsn = max(state.sk.state.inmem.backup_lsn, backup_lsn);
     713            0 :         // we should check whether to shut down offloader, but this will be done
     714            0 :         // soon by peer communication anyway.
     715            0 :         Ok(())
     716            0 :     }
     717              : 
     718              :     /// Get safekeeper info for broadcasting to broker and other peers.
     719            0 :     pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
     720            0 :         let shared_state = self.write_shared_state().await;
     721            0 :         let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
     722            0 :         shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
     723            0 :     }
     724              : 
     725              :     /// Update timeline state with peer safekeeper data.
     726            0 :     pub async fn record_safekeeper_info(&self, sk_info: SafekeeperTimelineInfo) -> Result<()> {
     727              :         let is_wal_backup_action_pending: bool;
     728              :         let commit_lsn: Lsn;
     729              :         {
     730            0 :             let mut shared_state = self.write_shared_state().await;
     731            0 :             shared_state.sk.record_safekeeper_info(&sk_info).await?;
     732            0 :             let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
     733            0 :             shared_state.peers_info.upsert(&peer_info);
     734            0 :             is_wal_backup_action_pending = self.update_status(&mut shared_state).await;
     735            0 :             commit_lsn = shared_state.sk.state.inmem.commit_lsn;
     736            0 :         }
     737            0 :         self.commit_lsn_watch_tx.send(commit_lsn)?;
     738              :         // Wake up wal backup launcher, if it is time to stop the offloading.
     739            0 :         if is_wal_backup_action_pending {
     740            0 :             self.wal_backup_launcher_tx.send(self.ttid).await?;
     741            0 :         }
     742            0 :         Ok(())
     743            0 :     }
     744              : 
     745              :     /// Update in memory remote consistent lsn.
     746            0 :     pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
     747            0 :         let mut shared_state = self.write_shared_state().await;
     748            0 :         shared_state.sk.state.inmem.remote_consistent_lsn =
     749            0 :             max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
     750            0 :     }
     751              : 
     752            0 :     pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
     753            0 :         let shared_state = self.write_shared_state().await;
     754            0 :         shared_state.get_peers(conf.heartbeat_timeout)
     755            0 :     }
     756              : 
     757              :     /// Should we start fetching WAL from a peer safekeeper, and if yes, from
     758              :     /// which? Answer is yes, i.e. .donors is not empty if 1) there is something
     759              :     /// to fetch, and we can do that without running elections; 2) there is no
     760              :     /// actively streaming compute, as we don't want to compete with it.
     761              :     ///
     762              :     /// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal
     763              :     /// to its last_log_term so we are sure such a leader ever had been elected.
     764              :     ///
     765              :     /// All possible donors are returned so that we could keep connection to the
     766              :     /// current one if it is good even if it slightly lags behind.
     767              :     ///
     768              :     /// Note that term conditions above might be not met, but safekeepers are
     769              :     /// still not aligned on last flush_lsn. Generally in this case until
     770              :     /// elections are run it is not possible to say which safekeeper should
     771              :     /// recover from which one -- history which would be committed is different
     772              :     /// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
     773              :     /// Thus we don't try to predict it here.
     774            0 :     pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
     775            0 :         let ss = self.write_shared_state().await;
     776            0 :         let term = ss.sk.state.acceptor_state.term;
     777            0 :         let last_log_term = ss.sk.get_epoch();
     778            0 :         let flush_lsn = ss.sk.flush_lsn();
     779            0 :         // note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us.
     780            0 :         let mut peers = ss.get_peers(heartbeat_timeout);
     781            0 :         // Sort by <last log term, lsn> pairs.
     782            0 :         peers.sort_by(|p1, p2| {
     783            0 :             let tl1 = TermLsn {
     784            0 :                 term: p1.last_log_term,
     785            0 :                 lsn: p1.flush_lsn,
     786            0 :             };
     787            0 :             let tl2 = TermLsn {
     788            0 :                 term: p2.last_log_term,
     789            0 :                 lsn: p2.flush_lsn,
     790            0 :             };
     791            0 :             tl2.cmp(&tl1) // desc
     792            0 :         });
     793            0 :         let num_streaming_computes = self.walreceivers.get_num_streaming();
     794            0 :         let donors = if num_streaming_computes > 0 {
     795            0 :             vec![] // If there is a streaming compute, don't try to recover to not intervene.
     796              :         } else {
     797            0 :             peers
     798            0 :                 .iter()
     799            0 :                 .filter_map(|candidate| {
     800            0 :                     // Are we interested in this candidate?
     801            0 :                     let candidate_tl = TermLsn {
     802            0 :                         term: candidate.last_log_term,
     803            0 :                         lsn: candidate.flush_lsn,
     804            0 :                     };
     805            0 :                     let my_tl = TermLsn {
     806            0 :                         term: last_log_term,
     807            0 :                         lsn: flush_lsn,
     808            0 :                     };
     809            0 :                     if my_tl < candidate_tl {
     810              :                         // Yes, we are interested. Can we pull from it without
     811              :                         // (re)running elections? It is possible if 1) his term
     812              :                         // is equal to his last_log_term so we could act on
     813              :                         // behalf of leader of this term (we must be sure he was
     814              :                         // ever elected) and 2) our term is not higher, or we'll refuse data.
     815            0 :                         if candidate.term == candidate.last_log_term && candidate.term >= term {
     816            0 :                             Some(Donor::from(candidate))
     817              :                         } else {
     818            0 :                             None
     819              :                         }
     820              :                     } else {
     821            0 :                         None
     822              :                     }
     823            0 :                 })
     824            0 :                 .collect()
     825              :         };
     826            0 :         RecoveryNeededInfo {
     827            0 :             term,
     828            0 :             last_log_term,
     829            0 :             flush_lsn,
     830            0 :             peers,
     831            0 :             num_streaming_computes,
     832            0 :             donors,
     833            0 :         }
     834            0 :     }
     835              : 
     836            0 :     pub fn get_walsenders(&self) -> &Arc<WalSenders> {
     837            0 :         &self.walsenders
     838            0 :     }
     839              : 
     840            0 :     pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
     841            0 :         &self.walreceivers
     842            0 :     }
     843              : 
     844              :     /// Returns flush_lsn.
     845            0 :     pub async fn get_flush_lsn(&self) -> Lsn {
     846            0 :         self.write_shared_state().await.sk.wal_store.flush_lsn()
     847            0 :     }
     848              : 
     849              :     /// Delete WAL segments from disk that are no longer needed. This is determined
     850              :     /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
     851            0 :     pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
     852            0 :         if self.is_cancelled() {
     853            0 :             bail!(TimelineError::Cancelled(self.ttid));
     854            0 :         }
     855              : 
     856              :         // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
     857              :         // This allows to get better read speed for pageservers that are lagging behind,
     858              :         // at the cost of keeping more WAL on disk.
     859            0 :         let replication_horizon_lsn = if self.walsenders_keep_horizon {
     860            0 :             self.walsenders.laggard_lsn()
     861              :         } else {
     862            0 :             None
     863              :         };
     864              : 
     865              :         let horizon_segno: XLogSegNo;
     866            0 :         let remover = {
     867            0 :             let shared_state = self.write_shared_state().await;
     868            0 :             horizon_segno =
     869            0 :                 shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn);
     870            0 :             if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
     871            0 :                 return Ok(()); // nothing to do
     872            0 :             }
     873            0 : 
     874            0 :             // release the lock before removing
     875            0 :             shared_state.sk.wal_store.remove_up_to(horizon_segno - 1)
     876            0 :         };
     877            0 : 
     878            0 :         // delete old WAL files
     879            0 :         remover.await?;
     880              : 
     881              :         // update last_removed_segno
     882            0 :         let mut shared_state = self.write_shared_state().await;
     883            0 :         shared_state.last_removed_segno = horizon_segno;
     884            0 :         Ok(())
     885            0 :     }
     886              : 
     887              :     /// Persist control file if there is something to save and enough time
     888              :     /// passed after the last save. This helps to keep remote_consistent_lsn up
     889              :     /// to date so that storage nodes restart doesn't cause many pageserver ->
     890              :     /// safekeeper reconnections.
     891            0 :     pub async fn maybe_persist_control_file(&self) -> Result<()> {
     892            0 :         self.write_shared_state()
     893            0 :             .await
     894              :             .sk
     895            0 :             .maybe_persist_inmem_control_file()
     896            0 :             .await
     897            0 :     }
     898              : 
     899              :     /// Gather timeline data for metrics. If the timeline is not active, returns
     900              :     /// None, we do not collect these.
     901            0 :     pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
     902            0 :         if self.is_cancelled() {
     903            0 :             return None;
     904            0 :         }
     905            0 : 
     906            0 :         let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
     907            0 :         let state = self.write_shared_state().await;
     908            0 :         if state.active {
     909            0 :             Some(FullTimelineInfo {
     910            0 :                 ttid: self.ttid,
     911            0 :                 ps_feedback_count,
     912            0 :                 last_ps_feedback,
     913            0 :                 wal_backup_active: state.wal_backup_active,
     914            0 :                 timeline_is_active: state.active,
     915            0 :                 num_computes: self.walreceivers.get_num() as u32,
     916            0 :                 last_removed_segno: state.last_removed_segno,
     917            0 :                 epoch_start_lsn: state.sk.epoch_start_lsn,
     918            0 :                 mem_state: state.sk.state.inmem.clone(),
     919            0 :                 persisted_state: state.sk.state.clone(),
     920            0 :                 flush_lsn: state.sk.wal_store.flush_lsn(),
     921            0 :                 wal_storage: state.sk.wal_store.get_metrics(),
     922            0 :             })
     923              :         } else {
     924            0 :             None
     925              :         }
     926            0 :     }
     927              : 
     928              :     /// Returns in-memory timeline state to build a full debug dump.
     929            0 :     pub async fn memory_dump(&self) -> debug_dump::Memory {
     930            0 :         let state = self.write_shared_state().await;
     931              : 
     932            0 :         let (write_lsn, write_record_lsn, flush_lsn, file_open) =
     933            0 :             state.sk.wal_store.internal_state();
     934            0 : 
     935            0 :         debug_dump::Memory {
     936            0 :             is_cancelled: self.is_cancelled(),
     937            0 :             peers_info_len: state.peers_info.0.len(),
     938            0 :             walsenders: self.walsenders.get_all(),
     939            0 :             wal_backup_active: state.wal_backup_active,
     940            0 :             active: state.active,
     941            0 :             num_computes: self.walreceivers.get_num() as u32,
     942            0 :             last_removed_segno: state.last_removed_segno,
     943            0 :             epoch_start_lsn: state.sk.epoch_start_lsn,
     944            0 :             mem_state: state.sk.state.inmem.clone(),
     945            0 :             write_lsn,
     946            0 :             write_record_lsn,
     947            0 :             flush_lsn,
     948            0 :             file_open,
     949            0 :         }
     950            0 :     }
     951              : 
     952              :     /// Apply a function to the control file state and persist it.
     953            0 :     pub async fn map_control_file<T>(
     954            0 :         &self,
     955            0 :         f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
     956            0 :     ) -> Result<T> {
     957            0 :         let mut state = self.write_shared_state().await;
     958            0 :         let mut persistent_state = state.sk.state.start_change();
     959              :         // If f returns error, we abort the change and don't persist anything.
     960            0 :         let res = f(&mut persistent_state)?;
     961              :         // If persisting fails, we abort the change and return error.
     962            0 :         state.sk.state.finish_change(&persistent_state).await?;
     963            0 :         Ok(res)
     964            0 :     }
     965              : }
     966              : 
     967              : /// Deletes directory and it's contents. Returns false if directory does not exist.
     968            0 : async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
     969            0 :     match fs::remove_dir_all(path).await {
     970            0 :         Ok(_) => Ok(true),
     971            0 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
     972            0 :         Err(e) => Err(e.into()),
     973              :     }
     974            0 : }
        

Generated by: LCOV version 2.1-beta