Line data Source code
1 : //! This module implements Timeline lifecycle management and has all necessary code
2 : //! to glue together SafeKeeper and all other background services.
3 :
4 : use std::cmp::max;
5 : use std::ops::{Deref, DerefMut};
6 : use std::sync::Arc;
7 : use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8 : use std::time::Duration;
9 :
10 : use anyhow::{Result, anyhow, bail};
11 : use camino::{Utf8Path, Utf8PathBuf};
12 : use http_utils::error::ApiError;
13 : use remote_storage::RemotePath;
14 : use safekeeper_api::Term;
15 : use safekeeper_api::membership::Configuration;
16 : use safekeeper_api::models::{
17 : PeerInfo, TimelineMembershipSwitchResponse, TimelineTermBumpResponse,
18 : };
19 : use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
20 : use tokio::fs::{self};
21 : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, watch};
22 : use tokio::time::Instant;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::*;
25 : use utils::id::{NodeId, TenantId, TenantTimelineId};
26 : use utils::lsn::Lsn;
27 : use utils::sync::gate::Gate;
28 :
29 : use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics};
30 : use crate::rate_limit::RateLimiter;
31 : use crate::receive_wal::WalReceivers;
32 : use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn};
33 : use crate::send_wal::{WalSenders, WalSendersTimelineMetricValues};
34 : use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
35 : use crate::timeline_guard::ResidenceGuard;
36 : use crate::timeline_manager::{AtomicStatus, ManagerCtl};
37 : use crate::timelines_set::TimelinesSet;
38 : use crate::wal_backup::{self, remote_timeline_path};
39 : use crate::wal_backup_partial::PartialRemoteSegment;
40 : use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
41 : use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage};
42 :
43 0 : fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
44 0 : PeerInfo {
45 0 : sk_id: NodeId(sk_info.safekeeper_id),
46 0 : term: sk_info.term,
47 0 : last_log_term: sk_info.last_log_term,
48 0 : flush_lsn: Lsn(sk_info.flush_lsn),
49 0 : commit_lsn: Lsn(sk_info.commit_lsn),
50 0 : local_start_lsn: Lsn(sk_info.local_start_lsn),
51 0 : pg_connstr: sk_info.safekeeper_connstr.clone(),
52 0 : http_connstr: sk_info.http_connstr.clone(),
53 0 : https_connstr: sk_info.https_connstr.clone(),
54 0 : ts,
55 0 : }
56 0 : }
57 :
58 : // vector-based node id -> peer state map with very limited functionality we
59 : // need.
60 : #[derive(Debug, Clone, Default)]
61 : pub struct PeersInfo(pub Vec<PeerInfo>);
62 :
63 : impl PeersInfo {
64 0 : fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
65 0 : self.0.iter_mut().find(|p| p.sk_id == id)
66 0 : }
67 :
68 0 : fn upsert(&mut self, p: &PeerInfo) {
69 0 : match self.get(p.sk_id) {
70 0 : Some(rp) => *rp = p.clone(),
71 0 : None => self.0.push(p.clone()),
72 : }
73 0 : }
74 : }
75 :
76 : pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
77 :
78 : /// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
79 : /// automatically updates `watch::Sender` channels with state on drop.
80 : pub struct WriteGuardSharedState<'a> {
81 : tli: Arc<Timeline>,
82 : guard: RwLockWriteGuard<'a, SharedState>,
83 : }
84 :
85 : impl<'a> WriteGuardSharedState<'a> {
86 1289 : fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
87 1289 : WriteGuardSharedState { tli, guard }
88 1289 : }
89 : }
90 :
91 : impl Deref for WriteGuardSharedState<'_> {
92 : type Target = SharedState;
93 :
94 0 : fn deref(&self) -> &Self::Target {
95 0 : &self.guard
96 0 : }
97 : }
98 :
99 : impl DerefMut for WriteGuardSharedState<'_> {
100 1284 : fn deref_mut(&mut self) -> &mut Self::Target {
101 1284 : &mut self.guard
102 1284 : }
103 : }
104 :
105 : impl Drop for WriteGuardSharedState<'_> {
106 1289 : fn drop(&mut self) {
107 1289 : let term_flush_lsn =
108 1289 : TermLsn::from((self.guard.sk.last_log_term(), self.guard.sk.flush_lsn()));
109 1289 : let commit_lsn = self.guard.sk.state().inmem.commit_lsn;
110 1289 :
111 1289 : let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
112 1289 : if *old != term_flush_lsn {
113 620 : *old = term_flush_lsn;
114 620 : true
115 : } else {
116 669 : false
117 : }
118 1289 : });
119 1289 :
120 1289 : let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
121 1289 : if *old != commit_lsn {
122 615 : *old = commit_lsn;
123 615 : true
124 : } else {
125 674 : false
126 : }
127 1289 : });
128 1289 :
129 1289 : // send notification about shared state update
130 1289 : self.tli.shared_state_version_tx.send_modify(|old| {
131 1289 : *old += 1;
132 1289 : });
133 1289 : }
134 : }
135 :
136 : /// This structure is stored in shared state and represents the state of the timeline.
137 : ///
138 : /// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
139 : /// case, SafeKeeper is not available (because WAL is not present on disk) and all
140 : /// operations can be done only with control file.
141 : #[allow(clippy::large_enum_variant, reason = "TODO")]
142 : pub enum StateSK {
143 : Loaded(SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>),
144 : Offloaded(Box<TimelineState<control_file::FileStorage>>),
145 : // Not used, required for moving between states.
146 : Empty,
147 : }
148 :
149 : impl StateSK {
150 2752 : pub fn flush_lsn(&self) -> Lsn {
151 2752 : match self {
152 2752 : StateSK::Loaded(sk) => sk.wal_store.flush_lsn(),
153 0 : StateSK::Offloaded(state) => match state.eviction_state {
154 0 : EvictionState::Offloaded(flush_lsn) => flush_lsn,
155 0 : _ => panic!("StateSK::Offloaded mismatches with eviction_state from control_file"),
156 : },
157 0 : StateSK::Empty => unreachable!(),
158 : }
159 2752 : }
160 :
161 : /// Get a reference to the control file's timeline state.
162 2785 : pub fn state(&self) -> &TimelineState<control_file::FileStorage> {
163 2785 : match self {
164 2785 : StateSK::Loaded(sk) => &sk.state,
165 0 : StateSK::Offloaded(s) => s,
166 0 : StateSK::Empty => unreachable!(),
167 : }
168 2785 : }
169 :
170 44 : pub fn state_mut(&mut self) -> &mut TimelineState<control_file::FileStorage> {
171 44 : match self {
172 44 : StateSK::Loaded(sk) => &mut sk.state,
173 0 : StateSK::Offloaded(s) => s,
174 0 : StateSK::Empty => unreachable!(),
175 : }
176 44 : }
177 :
178 1376 : pub fn last_log_term(&self) -> Term {
179 1376 : self.state()
180 1376 : .acceptor_state
181 1376 : .get_last_log_term(self.flush_lsn())
182 1376 : }
183 :
184 0 : pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
185 0 : self.state_mut().term_bump(to).await
186 0 : }
187 :
188 0 : pub async fn membership_switch(
189 0 : &mut self,
190 0 : to: Configuration,
191 0 : ) -> Result<TimelineMembershipSwitchResponse> {
192 0 : self.state_mut().membership_switch(to).await
193 0 : }
194 :
195 : /// Close open WAL files to release FDs.
196 0 : fn close_wal_store(&mut self) {
197 0 : if let StateSK::Loaded(sk) = self {
198 0 : sk.wal_store.close();
199 0 : }
200 0 : }
201 :
202 : /// Update timeline state with peer safekeeper data.
203 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
204 0 : // update commit_lsn if safekeeper is loaded
205 0 : match self {
206 0 : StateSK::Loaded(sk) => sk.record_safekeeper_info(sk_info).await?,
207 0 : StateSK::Offloaded(_) => {}
208 0 : StateSK::Empty => unreachable!(),
209 : }
210 :
211 : // update everything else, including remote_consistent_lsn and backup_lsn
212 0 : let mut sync_control_file = false;
213 0 : let state = self.state_mut();
214 0 : let wal_seg_size = state.server.wal_seg_size as u64;
215 0 :
216 0 : state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
217 0 : sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
218 0 :
219 0 : state.inmem.remote_consistent_lsn = max(
220 0 : Lsn(sk_info.remote_consistent_lsn),
221 0 : state.inmem.remote_consistent_lsn,
222 0 : );
223 0 : sync_control_file |=
224 0 : state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
225 0 :
226 0 : state.inmem.peer_horizon_lsn =
227 0 : max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
228 0 : sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;
229 0 :
230 0 : if sync_control_file {
231 0 : state.flush().await?;
232 0 : }
233 0 : Ok(())
234 0 : }
235 :
236 : /// Previously known as epoch_start_lsn. Needed only for reference in some APIs.
237 0 : pub fn term_start_lsn(&self) -> Lsn {
238 0 : match self {
239 0 : StateSK::Loaded(sk) => sk.term_start_lsn,
240 0 : StateSK::Offloaded(_) => Lsn(0),
241 0 : StateSK::Empty => unreachable!(),
242 : }
243 0 : }
244 :
245 : /// Used for metrics only.
246 0 : pub fn wal_storage_metrics(&self) -> WalStorageMetrics {
247 0 : match self {
248 0 : StateSK::Loaded(sk) => sk.wal_store.get_metrics(),
249 0 : StateSK::Offloaded(_) => WalStorageMetrics::default(),
250 0 : StateSK::Empty => unreachable!(),
251 : }
252 0 : }
253 :
254 : /// Returns WAL storage internal LSNs for debug dump.
255 0 : pub fn wal_storage_internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
256 0 : match self {
257 0 : StateSK::Loaded(sk) => sk.wal_store.internal_state(),
258 : StateSK::Offloaded(_) => {
259 0 : let flush_lsn = self.flush_lsn();
260 0 : (flush_lsn, flush_lsn, flush_lsn, false)
261 : }
262 0 : StateSK::Empty => unreachable!(),
263 : }
264 0 : }
265 :
266 : /// Access to SafeKeeper object. Panics if offloaded, should be good to use from WalResidentTimeline.
267 1240 : pub fn safekeeper(
268 1240 : &mut self,
269 1240 : ) -> &mut SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage> {
270 1240 : match self {
271 1240 : StateSK::Loaded(sk) => sk,
272 : StateSK::Offloaded(_) => {
273 0 : panic!("safekeeper is offloaded, cannot be used")
274 : }
275 0 : StateSK::Empty => unreachable!(),
276 : }
277 1240 : }
278 :
279 : /// Moves control file's state structure out of the enum. Used to switch states.
280 0 : fn take_state(self) -> TimelineState<control_file::FileStorage> {
281 0 : match self {
282 0 : StateSK::Loaded(sk) => sk.state,
283 0 : StateSK::Offloaded(state) => *state,
284 0 : StateSK::Empty => unreachable!(),
285 : }
286 0 : }
287 : }
288 :
289 : /// Shared state associated with database instance
290 : pub struct SharedState {
291 : /// Safekeeper object
292 : pub(crate) sk: StateSK,
293 : /// In memory list containing state of peers sent in latest messages from them.
294 : pub(crate) peers_info: PeersInfo,
295 : // True value hinders old WAL removal; this is used by snapshotting. We
296 : // could make it a counter, but there is no need to.
297 : pub(crate) wal_removal_on_hold: bool,
298 : }
299 :
300 : impl SharedState {
301 : /// Creates a new SharedState.
302 5 : pub fn new(sk: StateSK) -> Self {
303 5 : Self {
304 5 : sk,
305 5 : peers_info: PeersInfo(vec![]),
306 5 : wal_removal_on_hold: false,
307 5 : }
308 5 : }
309 :
310 : /// Restore SharedState from control file. If file doesn't exist, bails out.
311 0 : pub fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
312 0 : let timeline_dir = get_timeline_dir(conf, ttid);
313 0 : let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?;
314 0 : if control_store.server.wal_seg_size == 0 {
315 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
316 0 : }
317 :
318 0 : let sk = match control_store.eviction_state {
319 : EvictionState::Present => {
320 0 : let wal_store = wal_storage::PhysicalStorage::new(
321 0 : ttid,
322 0 : &timeline_dir,
323 0 : &control_store,
324 0 : conf.no_sync,
325 0 : )?;
326 0 : StateSK::Loaded(SafeKeeper::new(
327 0 : TimelineState::new(control_store),
328 0 : wal_store,
329 0 : conf.my_id,
330 0 : )?)
331 : }
332 : EvictionState::Offloaded(_) => {
333 0 : StateSK::Offloaded(Box::new(TimelineState::new(control_store)))
334 : }
335 : };
336 :
337 0 : Ok(Self::new(sk))
338 0 : }
339 :
340 5 : pub(crate) fn get_wal_seg_size(&self) -> usize {
341 5 : self.sk.state().server.wal_seg_size as usize
342 5 : }
343 :
344 0 : fn get_safekeeper_info(
345 0 : &self,
346 0 : ttid: &TenantTimelineId,
347 0 : conf: &SafeKeeperConf,
348 0 : standby_apply_lsn: Lsn,
349 0 : ) -> SafekeeperTimelineInfo {
350 0 : SafekeeperTimelineInfo {
351 0 : safekeeper_id: conf.my_id.0,
352 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
353 0 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
354 0 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
355 0 : }),
356 0 : term: self.sk.state().acceptor_state.term,
357 0 : last_log_term: self.sk.last_log_term(),
358 0 : flush_lsn: self.sk.flush_lsn().0,
359 0 : // note: this value is not flushed to control file yet and can be lost
360 0 : commit_lsn: self.sk.state().inmem.commit_lsn.0,
361 0 : remote_consistent_lsn: self.sk.state().inmem.remote_consistent_lsn.0,
362 0 : peer_horizon_lsn: self.sk.state().inmem.peer_horizon_lsn.0,
363 0 : safekeeper_connstr: conf
364 0 : .advertise_pg_addr
365 0 : .to_owned()
366 0 : .unwrap_or(conf.listen_pg_addr.clone()),
367 0 : http_connstr: conf.listen_http_addr.to_owned(),
368 0 : https_connstr: conf.listen_https_addr.to_owned(),
369 0 : backup_lsn: self.sk.state().inmem.backup_lsn.0,
370 0 : local_start_lsn: self.sk.state().local_start_lsn.0,
371 0 : availability_zone: conf.availability_zone.clone(),
372 0 : standby_horizon: standby_apply_lsn.0,
373 0 : }
374 0 : }
375 :
376 : /// Get our latest view of alive peers status on the timeline.
377 : /// We pass our own info through the broker as well, so when we don't have connection
378 : /// to the broker returned vec is empty.
379 82 : pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
380 82 : let now = Instant::now();
381 82 : self.peers_info
382 82 : .0
383 82 : .iter()
384 82 : // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
385 82 : .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
386 82 : .cloned()
387 82 : .collect()
388 82 : }
389 : }
390 :
391 : #[derive(Debug, thiserror::Error)]
392 : pub enum TimelineError {
393 : #[error("Timeline {0} was cancelled and cannot be used anymore")]
394 : Cancelled(TenantTimelineId),
395 : #[error("Timeline {0} was not found in global map")]
396 : NotFound(TenantTimelineId),
397 : #[error("Timeline {0} creation is in progress")]
398 : CreationInProgress(TenantTimelineId),
399 : #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
400 : Invalid(TenantTimelineId),
401 : #[error("Timeline {0} is already exists")]
402 : AlreadyExists(TenantTimelineId),
403 : #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
404 : UninitializedWalSegSize(TenantTimelineId),
405 : #[error("Timeline {0} is not initialized, pg_version is unknown")]
406 : UninitialinzedPgVersion(TenantTimelineId),
407 : }
408 :
409 : // Convert to HTTP API error.
410 : impl From<TimelineError> for ApiError {
411 0 : fn from(te: TimelineError) -> ApiError {
412 0 : match te {
413 0 : TimelineError::NotFound(ttid) => {
414 0 : ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
415 : }
416 0 : _ => ApiError::InternalServerError(anyhow!("{}", te)),
417 : }
418 0 : }
419 : }
420 :
421 : /// We run remote deletion in a background task, this is how it sends its results back.
422 : type RemoteDeletionReceiver = tokio::sync::watch::Receiver<Option<anyhow::Result<()>>>;
423 :
424 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
425 : /// It also holds SharedState and provides mutually exclusive access to it.
426 : pub struct Timeline {
427 : pub ttid: TenantTimelineId,
428 : pub remote_path: RemotePath,
429 :
430 : /// Used to broadcast commit_lsn updates to all background jobs.
431 : commit_lsn_watch_tx: watch::Sender<Lsn>,
432 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
433 :
434 : /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
435 : /// them when sending in recovery mode (to walproposer or peers). Note: this
436 : /// is just a notification, WAL reading should always done with lock held as
437 : /// term can change otherwise.
438 : term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
439 : term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
440 :
441 : /// Broadcasts shared state updates.
442 : shared_state_version_tx: watch::Sender<usize>,
443 : shared_state_version_rx: watch::Receiver<usize>,
444 :
445 : /// Safekeeper and other state, that should remain consistent and
446 : /// synchronized with the disk. This is tokio mutex as we write WAL to disk
447 : /// while holding it, ensuring that consensus checks are in order.
448 : mutex: RwLock<SharedState>,
449 : walsenders: Arc<WalSenders>,
450 : walreceivers: Arc<WalReceivers>,
451 : timeline_dir: Utf8PathBuf,
452 : manager_ctl: ManagerCtl,
453 : conf: Arc<SafeKeeperConf>,
454 :
455 : remote_deletion: std::sync::Mutex<Option<RemoteDeletionReceiver>>,
456 :
457 : /// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
458 : /// this gate, you must respect [`Timeline::cancel`]
459 : pub(crate) gate: Gate,
460 :
461 : /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
462 : pub(crate) cancel: CancellationToken,
463 :
464 : // timeline_manager controlled state
465 : pub(crate) broker_active: AtomicBool,
466 : pub(crate) wal_backup_active: AtomicBool,
467 : pub(crate) last_removed_segno: AtomicU64,
468 : pub(crate) mgr_status: AtomicStatus,
469 : }
470 :
471 : impl Timeline {
472 : /// Constructs a new timeline.
473 5 : pub fn new(
474 5 : ttid: TenantTimelineId,
475 5 : timeline_dir: &Utf8Path,
476 5 : remote_path: &RemotePath,
477 5 : shared_state: SharedState,
478 5 : conf: Arc<SafeKeeperConf>,
479 5 : ) -> Arc<Self> {
480 5 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
481 5 : watch::channel(shared_state.sk.state().commit_lsn);
482 5 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
483 5 : shared_state.sk.last_log_term(),
484 5 : shared_state.sk.flush_lsn(),
485 5 : )));
486 5 : let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
487 5 :
488 5 : let walreceivers = WalReceivers::new();
489 5 :
490 5 : Arc::new(Self {
491 5 : ttid,
492 5 : remote_path: remote_path.to_owned(),
493 5 : timeline_dir: timeline_dir.to_owned(),
494 5 : commit_lsn_watch_tx,
495 5 : commit_lsn_watch_rx,
496 5 : term_flush_lsn_watch_tx,
497 5 : term_flush_lsn_watch_rx,
498 5 : shared_state_version_tx,
499 5 : shared_state_version_rx,
500 5 : mutex: RwLock::new(shared_state),
501 5 : walsenders: WalSenders::new(walreceivers.clone()),
502 5 : walreceivers,
503 5 : gate: Default::default(),
504 5 : cancel: CancellationToken::default(),
505 5 : remote_deletion: std::sync::Mutex::new(None),
506 5 : manager_ctl: ManagerCtl::new(),
507 5 : conf,
508 5 : broker_active: AtomicBool::new(false),
509 5 : wal_backup_active: AtomicBool::new(false),
510 5 : last_removed_segno: AtomicU64::new(0),
511 5 : mgr_status: AtomicStatus::new(),
512 5 : })
513 5 : }
514 :
515 : /// Load existing timeline from disk.
516 0 : pub fn load_timeline(
517 0 : conf: Arc<SafeKeeperConf>,
518 0 : ttid: TenantTimelineId,
519 0 : ) -> Result<Arc<Timeline>> {
520 0 : let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
521 :
522 0 : let shared_state = SharedState::restore(conf.as_ref(), &ttid)?;
523 0 : let timeline_dir = get_timeline_dir(conf.as_ref(), &ttid);
524 0 : let remote_path = remote_timeline_path(&ttid)?;
525 :
526 0 : Ok(Timeline::new(
527 0 : ttid,
528 0 : &timeline_dir,
529 0 : &remote_path,
530 0 : shared_state,
531 0 : conf,
532 0 : ))
533 0 : }
534 :
535 : /// Bootstrap new or existing timeline starting background tasks.
536 5 : pub fn bootstrap(
537 5 : self: &Arc<Timeline>,
538 5 : _shared_state: &mut WriteGuardSharedState<'_>,
539 5 : conf: &SafeKeeperConf,
540 5 : broker_active_set: Arc<TimelinesSet>,
541 5 : partial_backup_rate_limiter: RateLimiter,
542 5 : ) {
543 5 : let (tx, rx) = self.manager_ctl.bootstrap_manager();
544 :
545 5 : let Ok(gate_guard) = self.gate.enter() else {
546 : // Init raced with shutdown
547 0 : return;
548 : };
549 :
550 : // Start manager task which will monitor timeline state and update
551 : // background tasks.
552 5 : tokio::spawn({
553 5 : let this = self.clone();
554 5 : let conf = conf.clone();
555 5 : async move {
556 5 : let _gate_guard = gate_guard;
557 5 : timeline_manager::main_task(
558 5 : ManagerTimeline { tli: this },
559 5 : conf,
560 5 : broker_active_set,
561 5 : tx,
562 5 : rx,
563 5 : partial_backup_rate_limiter,
564 5 : )
565 5 : .await
566 5 : }
567 5 : });
568 5 : }
569 :
570 : /// Cancel the timeline, requesting background activity to stop. Closing
571 : /// the `self.gate` waits for that.
572 0 : pub async fn cancel(&self) {
573 0 : info!("timeline {} shutting down", self.ttid);
574 0 : self.cancel.cancel();
575 0 : }
576 :
577 : /// Background timeline activities (which hold Timeline::gate) will no
578 : /// longer run once this function completes. `Self::cancel` must have been
579 : /// already called.
580 0 : pub async fn close(&self) {
581 0 : assert!(self.cancel.is_cancelled());
582 :
583 : // Wait for any concurrent tasks to stop using this timeline, to avoid e.g. attempts
584 : // to read deleted files.
585 0 : self.gate.close().await;
586 0 : }
587 :
588 : /// Delete timeline from disk completely, by removing timeline directory.
589 : ///
590 : /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
591 : /// deletion API endpoint is retriable.
592 : ///
593 : /// Timeline must be in shut-down state (i.e. call [`Self::close`] first)
594 0 : pub async fn delete(
595 0 : &self,
596 0 : shared_state: &mut WriteGuardSharedState<'_>,
597 0 : only_local: bool,
598 0 : ) -> Result<bool> {
599 0 : // Assert that [`Self::close`] was already called
600 0 : assert!(self.cancel.is_cancelled());
601 0 : assert!(self.gate.close_complete());
602 :
603 0 : info!("deleting timeline {} from disk", self.ttid);
604 :
605 : // Close associated FDs. Nobody will be able to touch timeline data once
606 : // it is cancelled, so WAL storage won't be opened again.
607 0 : shared_state.sk.close_wal_store();
608 0 :
609 0 : if !only_local && self.conf.is_wal_backup_enabled() {
610 0 : self.remote_delete().await?;
611 0 : }
612 0 : let dir_existed = delete_dir(&self.timeline_dir).await?;
613 0 : Ok(dir_existed)
614 0 : }
615 :
616 : /// Delete timeline content from remote storage. If the returned future is dropped,
617 : /// deletion will continue in the background.
618 : ///
619 : /// This function ordinarily spawns a task and stashes a result receiver into [`Self::remote_deletion`]. If
620 : /// deletion is already happening, it may simply wait for an existing task's result.
621 : ///
622 : /// Note: we concurrently delete remote storage data from multiple
623 : /// safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
624 : /// do some retries anyway.
625 0 : async fn remote_delete(&self) -> Result<()> {
626 : // We will start a background task to do the deletion, so that it proceeds even if our
627 : // API request is dropped. Future requests will see the existing deletion task and wait
628 : // for it to complete.
629 0 : let mut result_rx = {
630 0 : let mut remote_deletion_state = self.remote_deletion.lock().unwrap();
631 0 : let result_rx = if let Some(result_rx) = remote_deletion_state.as_ref() {
632 0 : if let Some(result) = result_rx.borrow().as_ref() {
633 0 : if let Err(e) = result {
634 : // A previous remote deletion failed: we will start a new one
635 0 : tracing::error!("remote deletion failed, will retry ({e})");
636 0 : None
637 : } else {
638 : // A previous remote deletion call already succeeded
639 0 : return Ok(());
640 : }
641 : } else {
642 : // Remote deletion is still in flight
643 0 : Some(result_rx.clone())
644 : }
645 : } else {
646 : // Remote deletion was not attempted yet, start it now.
647 0 : None
648 : };
649 :
650 0 : match result_rx {
651 0 : Some(result_rx) => result_rx,
652 0 : None => self.start_remote_delete(&mut remote_deletion_state),
653 : }
654 : };
655 :
656 : // Wait for a result
657 0 : let Ok(result) = result_rx.wait_for(|v| v.is_some()).await else {
658 : // Unexpected: sender should always send a result before dropping the channel, even if it has an error
659 0 : return Err(anyhow::anyhow!(
660 0 : "remote deletion task future was dropped without sending a result"
661 0 : ));
662 : };
663 :
664 0 : result
665 0 : .as_ref()
666 0 : .expect("We did a wait_for on this being Some above")
667 0 : .as_ref()
668 0 : .map(|_| ())
669 0 : .map_err(|e| anyhow::anyhow!("remote deletion failed: {e}"))
670 0 : }
671 :
672 : /// Spawn background task to do remote deletion, return a receiver for its outcome
673 0 : fn start_remote_delete(
674 0 : &self,
675 0 : guard: &mut std::sync::MutexGuard<Option<RemoteDeletionReceiver>>,
676 0 : ) -> RemoteDeletionReceiver {
677 0 : tracing::info!("starting remote deletion");
678 0 : let (result_tx, result_rx) = tokio::sync::watch::channel(None);
679 0 : let ttid = self.ttid;
680 0 : tokio::task::spawn(
681 0 : async move {
682 0 : let r = wal_backup::delete_timeline(&ttid).await;
683 0 : if let Err(e) = &r {
684 : // Log error here in case nobody ever listens for our result (e.g. dropped API request)
685 0 : tracing::error!("remote deletion failed: {e}");
686 0 : }
687 :
688 : // Ignore send results: it's legal for the Timeline to give up waiting for us.
689 0 : let _ = result_tx.send(Some(r));
690 0 : }
691 0 : .instrument(info_span!("remote_delete", timeline = %self.ttid)),
692 : );
693 :
694 0 : **guard = Some(result_rx.clone());
695 0 :
696 0 : result_rx
697 0 : }
698 :
699 : /// Returns if timeline is cancelled.
700 2508 : pub fn is_cancelled(&self) -> bool {
701 2508 : self.cancel.is_cancelled()
702 2508 : }
703 :
704 : /// Take a writing mutual exclusive lock on timeline shared_state.
705 1289 : pub async fn write_shared_state(self: &Arc<Self>) -> WriteGuardSharedState<'_> {
706 1289 : WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
707 1289 : }
708 :
709 101 : pub async fn read_shared_state(&self) -> ReadGuardSharedState {
710 101 : self.mutex.read().await
711 101 : }
712 :
713 : /// Returns commit_lsn watch channel.
714 5 : pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
715 5 : self.commit_lsn_watch_rx.clone()
716 5 : }
717 :
718 : /// Returns term_flush_lsn watch channel.
719 0 : pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
720 0 : self.term_flush_lsn_watch_rx.clone()
721 0 : }
722 :
723 : /// Returns watch channel for SharedState update version.
724 5 : pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
725 5 : self.shared_state_version_rx.clone()
726 5 : }
727 :
728 : /// Returns wal_seg_size.
729 5 : pub async fn get_wal_seg_size(&self) -> usize {
730 5 : self.read_shared_state().await.get_wal_seg_size()
731 5 : }
732 :
733 : /// Returns state of the timeline.
734 9 : pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
735 9 : let state = self.read_shared_state().await;
736 9 : (
737 9 : state.sk.state().inmem.clone(),
738 9 : TimelinePersistentState::clone(state.sk.state()),
739 9 : )
740 9 : }
741 :
742 : /// Returns latest backup_lsn.
743 0 : pub async fn get_wal_backup_lsn(&self) -> Lsn {
744 0 : self.read_shared_state().await.sk.state().inmem.backup_lsn
745 0 : }
746 :
747 : /// Sets backup_lsn to the given value.
748 0 : pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
749 0 : if self.is_cancelled() {
750 0 : bail!(TimelineError::Cancelled(self.ttid));
751 0 : }
752 :
753 0 : let mut state = self.write_shared_state().await;
754 0 : state.sk.state_mut().inmem.backup_lsn = max(state.sk.state().inmem.backup_lsn, backup_lsn);
755 0 : // we should check whether to shut down offloader, but this will be done
756 0 : // soon by peer communication anyway.
757 0 : Ok(())
758 0 : }
759 :
760 : /// Get safekeeper info for broadcasting to broker and other peers.
761 0 : pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
762 0 : let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
763 0 : let shared_state = self.read_shared_state().await;
764 0 : shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
765 0 : }
766 :
767 : /// Update timeline state with peer safekeeper data.
768 0 : pub async fn record_safekeeper_info(
769 0 : self: &Arc<Self>,
770 0 : sk_info: SafekeeperTimelineInfo,
771 0 : ) -> Result<()> {
772 : {
773 0 : let mut shared_state = self.write_shared_state().await;
774 0 : shared_state.sk.record_safekeeper_info(&sk_info).await?;
775 0 : let peer_info = peer_info_from_sk_info(&sk_info, Instant::now());
776 0 : shared_state.peers_info.upsert(&peer_info);
777 0 : }
778 0 : Ok(())
779 0 : }
780 :
781 0 : pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
782 0 : let shared_state = self.read_shared_state().await;
783 0 : shared_state.get_peers(conf.heartbeat_timeout)
784 0 : }
785 :
786 5 : pub fn get_walsenders(&self) -> &Arc<WalSenders> {
787 5 : &self.walsenders
788 5 : }
789 :
790 25 : pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
791 25 : &self.walreceivers
792 25 : }
793 :
794 : /// Returns flush_lsn.
795 0 : pub async fn get_flush_lsn(&self) -> Lsn {
796 0 : self.read_shared_state().await.sk.flush_lsn()
797 0 : }
798 :
799 : /// Gather timeline data for metrics.
800 0 : pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
801 0 : if self.is_cancelled() {
802 0 : return None;
803 0 : }
804 0 :
805 0 : let WalSendersTimelineMetricValues {
806 0 : ps_feedback_counter,
807 0 : last_ps_feedback,
808 0 : interpreted_wal_reader_tasks,
809 0 : } = self.walsenders.info_for_metrics();
810 :
811 0 : let state = self.read_shared_state().await;
812 0 : Some(FullTimelineInfo {
813 0 : ttid: self.ttid,
814 0 : ps_feedback_count: ps_feedback_counter,
815 0 : last_ps_feedback,
816 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
817 0 : timeline_is_active: self.broker_active.load(Ordering::Relaxed),
818 0 : num_computes: self.walreceivers.get_num() as u32,
819 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
820 0 : interpreted_wal_reader_tasks,
821 0 : epoch_start_lsn: state.sk.term_start_lsn(),
822 0 : mem_state: state.sk.state().inmem.clone(),
823 0 : persisted_state: TimelinePersistentState::clone(state.sk.state()),
824 0 : flush_lsn: state.sk.flush_lsn(),
825 0 : wal_storage: state.sk.wal_storage_metrics(),
826 0 : })
827 0 : }
828 :
829 : /// Returns in-memory timeline state to build a full debug dump.
830 0 : pub async fn memory_dump(&self) -> debug_dump::Memory {
831 0 : let state = self.read_shared_state().await;
832 :
833 0 : let (write_lsn, write_record_lsn, flush_lsn, file_open) =
834 0 : state.sk.wal_storage_internal_state();
835 0 :
836 0 : debug_dump::Memory {
837 0 : is_cancelled: self.is_cancelled(),
838 0 : peers_info_len: state.peers_info.0.len(),
839 0 : walsenders: self.walsenders.get_all_public(),
840 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
841 0 : active: self.broker_active.load(Ordering::Relaxed),
842 0 : num_computes: self.walreceivers.get_num() as u32,
843 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
844 0 : epoch_start_lsn: state.sk.term_start_lsn(),
845 0 : mem_state: state.sk.state().inmem.clone(),
846 0 : mgr_status: self.mgr_status.get(),
847 0 : write_lsn,
848 0 : write_record_lsn,
849 0 : flush_lsn,
850 0 : file_open,
851 0 : }
852 0 : }
853 :
854 : /// Apply a function to the control file state and persist it.
855 0 : pub async fn map_control_file<T>(
856 0 : self: &Arc<Self>,
857 0 : f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
858 0 : ) -> Result<T> {
859 0 : let mut state = self.write_shared_state().await;
860 0 : let mut persistent_state = state.sk.state_mut().start_change();
861 : // If f returns error, we abort the change and don't persist anything.
862 0 : let res = f(&mut persistent_state)?;
863 : // If persisting fails, we abort the change and return error.
864 0 : state
865 0 : .sk
866 0 : .state_mut()
867 0 : .finish_change(&persistent_state)
868 0 : .await?;
869 0 : Ok(res)
870 0 : }
871 :
872 0 : pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
873 0 : let mut state = self.write_shared_state().await;
874 0 : state.sk.term_bump(to).await
875 0 : }
876 :
877 0 : pub async fn membership_switch(
878 0 : self: &Arc<Self>,
879 0 : to: Configuration,
880 0 : ) -> Result<TimelineMembershipSwitchResponse> {
881 0 : let mut state = self.write_shared_state().await;
882 0 : state.sk.membership_switch(to).await
883 0 : }
884 :
885 : /// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
886 10 : async fn do_wal_residence_guard(
887 10 : self: &Arc<Self>,
888 10 : block: bool,
889 10 : ) -> Result<Option<WalResidentTimeline>> {
890 10 : let op_label = if block {
891 10 : "wal_residence_guard"
892 : } else {
893 0 : "try_wal_residence_guard"
894 : };
895 :
896 10 : if self.is_cancelled() {
897 0 : bail!(TimelineError::Cancelled(self.ttid));
898 10 : }
899 10 :
900 10 : debug!("requesting WalResidentTimeline guard");
901 10 : let started_at = Instant::now();
902 10 : let status_before = self.mgr_status.get();
903 :
904 : // Wait 30 seconds for the guard to be acquired. It can time out if someone is
905 : // holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
906 : // is stuck.
907 10 : let res = tokio::time::timeout_at(started_at + Duration::from_secs(30), async {
908 10 : if block {
909 10 : self.manager_ctl.wal_residence_guard().await.map(Some)
910 : } else {
911 0 : self.manager_ctl.try_wal_residence_guard().await
912 : }
913 10 : })
914 10 : .await;
915 :
916 10 : let guard = match res {
917 10 : Ok(Ok(guard)) => {
918 10 : let finished_at = Instant::now();
919 10 : let elapsed = finished_at - started_at;
920 10 : MISC_OPERATION_SECONDS
921 10 : .with_label_values(&[op_label])
922 10 : .observe(elapsed.as_secs_f64());
923 10 :
924 10 : guard
925 : }
926 0 : Ok(Err(e)) => {
927 0 : warn!(
928 0 : "error acquiring in {op_label}, statuses {:?} => {:?}",
929 0 : status_before,
930 0 : self.mgr_status.get()
931 : );
932 0 : return Err(e);
933 : }
934 : Err(_) => {
935 0 : warn!(
936 0 : "timeout acquiring in {op_label} guard, statuses {:?} => {:?}",
937 0 : status_before,
938 0 : self.mgr_status.get()
939 : );
940 0 : anyhow::bail!("timeout while acquiring WalResidentTimeline guard");
941 : }
942 : };
943 :
944 10 : Ok(guard.map(|g| WalResidentTimeline::new(self.clone(), g)))
945 10 : }
946 :
947 : /// Get the timeline guard for reading/writing WAL files.
948 : /// If WAL files are not present on disk (evicted), they will be automatically
949 : /// downloaded from remote storage. This is done in the manager task, which is
950 : /// responsible for issuing all guards.
951 : ///
952 : /// NB: don't use this function from timeline_manager, it will deadlock.
953 : /// NB: don't use this function while holding shared_state lock.
954 10 : pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
955 10 : self.do_wal_residence_guard(true)
956 10 : .await
957 10 : .map(|m| m.expect("Always get Some in block=true mode"))
958 10 : }
959 :
960 : /// Get the timeline guard for reading/writing WAL files if the timeline is resident,
961 : /// else return None
962 0 : pub(crate) async fn try_wal_residence_guard(
963 0 : self: &Arc<Self>,
964 0 : ) -> Result<Option<WalResidentTimeline>> {
965 0 : self.do_wal_residence_guard(false).await
966 0 : }
967 :
968 0 : pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
969 0 : self.manager_ctl.backup_partial_reset().await
970 0 : }
971 : }
972 :
973 : /// This is a guard that allows to read/write disk timeline state.
974 : /// All tasks that are trying to read/write WAL from disk should use this guard.
975 : pub struct WalResidentTimeline {
976 : pub tli: Arc<Timeline>,
977 : _guard: ResidenceGuard,
978 : }
979 :
980 : impl WalResidentTimeline {
981 15 : pub fn new(tli: Arc<Timeline>, _guard: ResidenceGuard) -> Self {
982 15 : WalResidentTimeline { tli, _guard }
983 15 : }
984 : }
985 :
986 : impl Deref for WalResidentTimeline {
987 : type Target = Arc<Timeline>;
988 :
989 5692 : fn deref(&self) -> &Self::Target {
990 5692 : &self.tli
991 5692 : }
992 : }
993 :
994 : impl WalResidentTimeline {
995 : /// Returns true if walsender should stop sending WAL to pageserver. We
996 : /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
997 : /// computes. While there might be nothing to stream already, we learn about
998 : /// remote_consistent_lsn update through replication feedback, and we want
999 : /// to stop pushing to the broker if pageserver is fully caughtup.
1000 0 : pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
1001 0 : if self.is_cancelled() {
1002 0 : return true;
1003 0 : }
1004 0 : let shared_state = self.read_shared_state().await;
1005 0 : if self.walreceivers.get_num() == 0 {
1006 0 : return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet
1007 0 : reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn;
1008 0 : }
1009 0 : false
1010 0 : }
1011 :
1012 : /// Ensure that current term is t, erroring otherwise, and lock the state.
1013 0 : pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
1014 0 : let ss = self.read_shared_state().await;
1015 0 : if ss.sk.state().acceptor_state.term != t {
1016 0 : bail!(
1017 0 : "failed to acquire term {}, current term {}",
1018 0 : t,
1019 0 : ss.sk.state().acceptor_state.term
1020 0 : );
1021 0 : }
1022 0 : Ok(ss)
1023 0 : }
1024 :
1025 : /// Pass arrived message to the safekeeper.
1026 1240 : pub async fn process_msg(
1027 1240 : &self,
1028 1240 : msg: &ProposerAcceptorMessage,
1029 1240 : ) -> Result<Option<AcceptorProposerMessage>> {
1030 1240 : if self.is_cancelled() {
1031 0 : bail!(TimelineError::Cancelled(self.ttid));
1032 1240 : }
1033 :
1034 : let mut rmsg: Option<AcceptorProposerMessage>;
1035 : {
1036 1240 : let mut shared_state = self.write_shared_state().await;
1037 1240 : rmsg = shared_state.sk.safekeeper().process_msg(msg).await?;
1038 :
1039 : // if this is AppendResponse, fill in proper hot standby feedback.
1040 620 : if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
1041 620 : resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
1042 620 : }
1043 : }
1044 1240 : Ok(rmsg)
1045 1240 : }
1046 :
1047 9 : pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
1048 9 : let (_, persisted_state) = self.get_state().await;
1049 9 : let enable_remote_read = self.conf.is_wal_backup_enabled();
1050 9 :
1051 9 : WalReader::new(
1052 9 : &self.ttid,
1053 9 : self.timeline_dir.clone(),
1054 9 : &persisted_state,
1055 9 : start_lsn,
1056 9 : enable_remote_read,
1057 9 : )
1058 9 : }
1059 :
1060 0 : pub fn get_timeline_dir(&self) -> Utf8PathBuf {
1061 0 : self.timeline_dir.clone()
1062 0 : }
1063 :
1064 : /// Update in memory remote consistent lsn.
1065 0 : pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
1066 0 : let mut shared_state = self.write_shared_state().await;
1067 0 : shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
1068 0 : shared_state.sk.state().inmem.remote_consistent_lsn,
1069 0 : candidate,
1070 0 : );
1071 0 : }
1072 : }
1073 :
1074 : /// This struct contains methods that are used by timeline manager task.
1075 : pub(crate) struct ManagerTimeline {
1076 : pub(crate) tli: Arc<Timeline>,
1077 : }
1078 :
1079 : impl Deref for ManagerTimeline {
1080 : type Target = Arc<Timeline>;
1081 :
1082 779 : fn deref(&self) -> &Self::Target {
1083 779 : &self.tli
1084 779 : }
1085 : }
1086 :
1087 : impl ManagerTimeline {
1088 0 : pub(crate) fn timeline_dir(&self) -> &Utf8PathBuf {
1089 0 : &self.tli.timeline_dir
1090 0 : }
1091 :
1092 : /// Manager requests this state on startup.
1093 5 : pub(crate) async fn bootstrap_mgr(&self) -> (bool, Option<PartialRemoteSegment>) {
1094 5 : let shared_state = self.read_shared_state().await;
1095 5 : let is_offloaded = matches!(
1096 5 : shared_state.sk.state().eviction_state,
1097 : EvictionState::Offloaded(_)
1098 : );
1099 5 : let partial_backup_uploaded = shared_state.sk.state().partial_backup.uploaded_segment();
1100 5 :
1101 5 : (is_offloaded, partial_backup_uploaded)
1102 5 : }
1103 :
1104 : /// Try to switch state Present->Offloaded.
1105 0 : pub(crate) async fn switch_to_offloaded(
1106 0 : &self,
1107 0 : partial: &PartialRemoteSegment,
1108 0 : ) -> anyhow::Result<()> {
1109 0 : let mut shared = self.write_shared_state().await;
1110 :
1111 : // updating control file
1112 0 : let mut pstate = shared.sk.state_mut().start_change();
1113 :
1114 0 : if !matches!(pstate.eviction_state, EvictionState::Present) {
1115 0 : bail!(
1116 0 : "cannot switch to offloaded state, current state is {:?}",
1117 0 : pstate.eviction_state
1118 0 : );
1119 0 : }
1120 0 :
1121 0 : if partial.flush_lsn != shared.sk.flush_lsn() {
1122 0 : bail!(
1123 0 : "flush_lsn mismatch in partial backup, expected {}, got {}",
1124 0 : shared.sk.flush_lsn(),
1125 0 : partial.flush_lsn
1126 0 : );
1127 0 : }
1128 0 :
1129 0 : if partial.commit_lsn != pstate.commit_lsn {
1130 0 : bail!(
1131 0 : "commit_lsn mismatch in partial backup, expected {}, got {}",
1132 0 : pstate.commit_lsn,
1133 0 : partial.commit_lsn
1134 0 : );
1135 0 : }
1136 0 :
1137 0 : if partial.term != shared.sk.last_log_term() {
1138 0 : bail!(
1139 0 : "term mismatch in partial backup, expected {}, got {}",
1140 0 : shared.sk.last_log_term(),
1141 0 : partial.term
1142 0 : );
1143 0 : }
1144 0 :
1145 0 : pstate.eviction_state = EvictionState::Offloaded(shared.sk.flush_lsn());
1146 0 : shared.sk.state_mut().finish_change(&pstate).await?;
1147 : // control file is now switched to Offloaded state
1148 :
1149 : // now we can switch shared.sk to Offloaded, shouldn't fail
1150 0 : let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
1151 0 : let cfile_state = prev_sk.take_state();
1152 0 : shared.sk = StateSK::Offloaded(Box::new(cfile_state));
1153 0 :
1154 0 : Ok(())
1155 0 : }
1156 :
1157 : /// Try to switch state Offloaded->Present.
1158 0 : pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
1159 0 : let mut shared = self.write_shared_state().await;
1160 :
1161 : // trying to restore WAL storage
1162 0 : let wal_store = wal_storage::PhysicalStorage::new(
1163 0 : &self.ttid,
1164 0 : &self.timeline_dir,
1165 0 : shared.sk.state(),
1166 0 : self.conf.no_sync,
1167 0 : )?;
1168 :
1169 : // updating control file
1170 0 : let mut pstate = shared.sk.state_mut().start_change();
1171 :
1172 0 : if !matches!(pstate.eviction_state, EvictionState::Offloaded(_)) {
1173 0 : bail!(
1174 0 : "cannot switch to present state, current state is {:?}",
1175 0 : pstate.eviction_state
1176 0 : );
1177 0 : }
1178 0 :
1179 0 : if wal_store.flush_lsn() != shared.sk.flush_lsn() {
1180 0 : bail!(
1181 0 : "flush_lsn mismatch in restored WAL, expected {}, got {}",
1182 0 : shared.sk.flush_lsn(),
1183 0 : wal_store.flush_lsn()
1184 0 : );
1185 0 : }
1186 0 :
1187 0 : pstate.eviction_state = EvictionState::Present;
1188 0 : shared.sk.state_mut().finish_change(&pstate).await?;
1189 :
1190 : // now we can switch shared.sk to Present, shouldn't fail
1191 0 : let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
1192 0 : let cfile_state = prev_sk.take_state();
1193 0 : shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, self.conf.my_id)?);
1194 :
1195 0 : Ok(())
1196 0 : }
1197 :
1198 : /// Update current manager state, useful for debugging manager deadlocks.
1199 414 : pub(crate) fn set_status(&self, status: timeline_manager::Status) {
1200 414 : self.mgr_status.store(status, Ordering::Relaxed);
1201 414 : }
1202 : }
1203 :
1204 : /// Deletes directory and it's contents. Returns false if directory does not exist.
1205 0 : pub async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
1206 0 : match fs::remove_dir_all(path).await {
1207 0 : Ok(_) => Ok(true),
1208 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
1209 0 : Err(e) => Err(e.into()),
1210 : }
1211 0 : }
1212 :
1213 : /// Get a path to the tenant directory. If you just need to get a timeline directory,
1214 : /// use WalResidentTimeline::get_timeline_dir instead.
1215 10 : pub fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
1216 10 : conf.workdir.join(tenant_id.to_string())
1217 10 : }
1218 :
1219 : /// Get a path to the timeline directory. If you need to read WAL files from disk,
1220 : /// use WalResidentTimeline::get_timeline_dir instead. This function does not check
1221 : /// timeline eviction status and WAL files might not be present on disk.
1222 10 : pub fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
1223 10 : get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
1224 10 : }
|