Line data Source code
1 : //! This module implements Timeline lifecycle management and has all necessary code
2 : //! to glue together SafeKeeper and all other background services.
3 :
4 : use anyhow::{anyhow, bail, Result};
5 : use camino::Utf8PathBuf;
6 : use serde::{Deserialize, Serialize};
7 : use tokio::fs::{self};
8 : use tokio_util::sync::CancellationToken;
9 : use utils::id::TenantId;
10 :
11 : use std::cmp::max;
12 : use std::ops::{Deref, DerefMut};
13 : use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14 : use std::sync::Arc;
15 : use std::time::Duration;
16 : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
17 : use tokio::{sync::watch, time::Instant};
18 : use tracing::*;
19 : use utils::http::error::ApiError;
20 : use utils::{
21 : id::{NodeId, TenantTimelineId},
22 : lsn::Lsn,
23 : };
24 :
25 : use storage_broker::proto::SafekeeperTimelineInfo;
26 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
27 :
28 : use crate::receive_wal::WalReceivers;
29 : use crate::safekeeper::{
30 : AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
31 : INVALID_TERM,
32 : };
33 : use crate::send_wal::WalSenders;
34 : use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
35 : use crate::timeline_guard::ResidenceGuard;
36 : use crate::timeline_manager::{AtomicStatus, ManagerCtl};
37 : use crate::timelines_set::TimelinesSet;
38 : use crate::wal_backup::{self};
39 : use crate::wal_backup_partial::{PartialRemoteSegment, RateLimiter};
40 : use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
41 :
42 : use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
43 : use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
44 : use crate::{debug_dump, timeline_manager, wal_storage};
45 : use crate::{GlobalTimelines, SafeKeeperConf};
46 :
47 : /// Things safekeeper should know about timeline state on peers.
48 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
49 : pub struct PeerInfo {
50 : pub sk_id: NodeId,
51 : pub term: Term,
52 : /// Term of the last entry.
53 : pub last_log_term: Term,
54 : /// LSN of the last record.
55 : pub flush_lsn: Lsn,
56 : pub commit_lsn: Lsn,
57 : /// Since which LSN safekeeper has WAL.
58 : pub local_start_lsn: Lsn,
59 : /// When info was received. Serde annotations are not very useful but make
60 : /// the code compile -- we don't rely on this field externally.
61 : #[serde(skip)]
62 : #[serde(default = "Instant::now")]
63 : ts: Instant,
64 : pub pg_connstr: String,
65 : pub http_connstr: String,
66 : }
67 :
68 : impl PeerInfo {
69 0 : fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
70 0 : PeerInfo {
71 0 : sk_id: NodeId(sk_info.safekeeper_id),
72 0 : term: sk_info.term,
73 0 : last_log_term: sk_info.last_log_term,
74 0 : flush_lsn: Lsn(sk_info.flush_lsn),
75 0 : commit_lsn: Lsn(sk_info.commit_lsn),
76 0 : local_start_lsn: Lsn(sk_info.local_start_lsn),
77 0 : pg_connstr: sk_info.safekeeper_connstr.clone(),
78 0 : http_connstr: sk_info.http_connstr.clone(),
79 0 : ts,
80 0 : }
81 0 : }
82 : }
83 :
84 : // vector-based node id -> peer state map with very limited functionality we
85 : // need.
86 : #[derive(Debug, Clone, Default)]
87 : pub struct PeersInfo(pub Vec<PeerInfo>);
88 :
89 : impl PeersInfo {
90 0 : fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
91 0 : self.0.iter_mut().find(|p| p.sk_id == id)
92 0 : }
93 :
94 0 : fn upsert(&mut self, p: &PeerInfo) {
95 0 : match self.get(p.sk_id) {
96 0 : Some(rp) => *rp = p.clone(),
97 0 : None => self.0.push(p.clone()),
98 : }
99 0 : }
100 : }
101 :
102 : pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
103 :
104 : /// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
105 : /// automatically updates `watch::Sender` channels with state on drop.
106 : pub struct WriteGuardSharedState<'a> {
107 : tli: Arc<Timeline>,
108 : guard: RwLockWriteGuard<'a, SharedState>,
109 : skip_update: bool,
110 : }
111 :
112 : impl<'a> WriteGuardSharedState<'a> {
113 0 : fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
114 0 : WriteGuardSharedState {
115 0 : tli,
116 0 : guard,
117 0 : skip_update: false,
118 0 : }
119 0 : }
120 : }
121 :
122 : impl<'a> Deref for WriteGuardSharedState<'a> {
123 : type Target = SharedState;
124 :
125 0 : fn deref(&self) -> &Self::Target {
126 0 : &self.guard
127 0 : }
128 : }
129 :
130 : impl<'a> DerefMut for WriteGuardSharedState<'a> {
131 0 : fn deref_mut(&mut self) -> &mut Self::Target {
132 0 : &mut self.guard
133 0 : }
134 : }
135 :
136 : impl<'a> Drop for WriteGuardSharedState<'a> {
137 0 : fn drop(&mut self) {
138 0 : let term_flush_lsn =
139 0 : TermLsn::from((self.guard.sk.last_log_term(), self.guard.sk.flush_lsn()));
140 0 : let commit_lsn = self.guard.sk.state().inmem.commit_lsn;
141 0 :
142 0 : let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
143 0 : if *old != term_flush_lsn {
144 0 : *old = term_flush_lsn;
145 0 : true
146 : } else {
147 0 : false
148 : }
149 0 : });
150 0 :
151 0 : let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
152 0 : if *old != commit_lsn {
153 0 : *old = commit_lsn;
154 0 : true
155 : } else {
156 0 : false
157 : }
158 0 : });
159 0 :
160 0 : if !self.skip_update {
161 0 : // send notification about shared state update
162 0 : self.tli.shared_state_version_tx.send_modify(|old| {
163 0 : *old += 1;
164 0 : });
165 0 : }
166 0 : }
167 : }
168 :
169 : /// This structure is stored in shared state and represents the state of the timeline.
170 : /// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
171 : /// case, SafeKeeper is not available (because WAL is not present on disk) and all
172 : /// operations can be done only with control file.
173 : pub enum StateSK {
174 : Loaded(SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>),
175 : Offloaded(Box<TimelineState<control_file::FileStorage>>),
176 : // Not used, required for moving between states.
177 : Empty,
178 : }
179 :
180 : impl StateSK {
181 0 : pub fn flush_lsn(&self) -> Lsn {
182 0 : match self {
183 0 : StateSK::Loaded(sk) => sk.wal_store.flush_lsn(),
184 0 : StateSK::Offloaded(state) => match state.eviction_state {
185 0 : EvictionState::Offloaded(flush_lsn) => flush_lsn,
186 0 : _ => panic!("StateSK::Offloaded mismatches with eviction_state from control_file"),
187 : },
188 0 : StateSK::Empty => unreachable!(),
189 : }
190 0 : }
191 :
192 : /// Get a reference to the control file's timeline state.
193 0 : pub fn state(&self) -> &TimelineState<control_file::FileStorage> {
194 0 : match self {
195 0 : StateSK::Loaded(sk) => &sk.state,
196 0 : StateSK::Offloaded(ref s) => s,
197 0 : StateSK::Empty => unreachable!(),
198 : }
199 0 : }
200 :
201 0 : pub fn state_mut(&mut self) -> &mut TimelineState<control_file::FileStorage> {
202 0 : match self {
203 0 : StateSK::Loaded(sk) => &mut sk.state,
204 0 : StateSK::Offloaded(ref mut s) => s,
205 0 : StateSK::Empty => unreachable!(),
206 : }
207 0 : }
208 :
209 0 : pub fn last_log_term(&self) -> Term {
210 0 : self.state()
211 0 : .acceptor_state
212 0 : .get_last_log_term(self.flush_lsn())
213 0 : }
214 :
215 : /// Close open WAL files to release FDs.
216 0 : fn close_wal_store(&mut self) {
217 0 : if let StateSK::Loaded(sk) = self {
218 0 : sk.wal_store.close();
219 0 : }
220 0 : }
221 :
222 : /// Update timeline state with peer safekeeper data.
223 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
224 0 : // update commit_lsn if safekeeper is loaded
225 0 : match self {
226 0 : StateSK::Loaded(sk) => sk.record_safekeeper_info(sk_info).await?,
227 0 : StateSK::Offloaded(_) => {}
228 0 : StateSK::Empty => unreachable!(),
229 : }
230 :
231 : // update everything else, including remote_consistent_lsn and backup_lsn
232 0 : let mut sync_control_file = false;
233 0 : let state = self.state_mut();
234 0 : let wal_seg_size = state.server.wal_seg_size as u64;
235 0 :
236 0 : state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
237 0 : sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
238 0 :
239 0 : state.inmem.remote_consistent_lsn = max(
240 0 : Lsn(sk_info.remote_consistent_lsn),
241 0 : state.inmem.remote_consistent_lsn,
242 0 : );
243 0 : sync_control_file |=
244 0 : state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
245 0 :
246 0 : state.inmem.peer_horizon_lsn =
247 0 : max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
248 0 : sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;
249 0 :
250 0 : if sync_control_file {
251 0 : state.flush().await?;
252 0 : }
253 0 : Ok(())
254 0 : }
255 :
256 : /// Previously known as epoch_start_lsn. Needed only for reference in some APIs.
257 0 : pub fn term_start_lsn(&self) -> Lsn {
258 0 : match self {
259 0 : StateSK::Loaded(sk) => sk.term_start_lsn,
260 0 : StateSK::Offloaded(_) => Lsn(0),
261 0 : StateSK::Empty => unreachable!(),
262 : }
263 0 : }
264 :
265 : /// Used for metrics only.
266 0 : pub fn wal_storage_metrics(&self) -> WalStorageMetrics {
267 0 : match self {
268 0 : StateSK::Loaded(sk) => sk.wal_store.get_metrics(),
269 0 : StateSK::Offloaded(_) => WalStorageMetrics::default(),
270 0 : StateSK::Empty => unreachable!(),
271 : }
272 0 : }
273 :
274 : /// Returns WAL storage internal LSNs for debug dump.
275 0 : pub fn wal_storage_internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
276 0 : match self {
277 0 : StateSK::Loaded(sk) => sk.wal_store.internal_state(),
278 : StateSK::Offloaded(_) => {
279 0 : let flush_lsn = self.flush_lsn();
280 0 : (flush_lsn, flush_lsn, flush_lsn, false)
281 : }
282 0 : StateSK::Empty => unreachable!(),
283 : }
284 0 : }
285 :
286 : /// Access to SafeKeeper object. Panics if offloaded, should be good to use from WalResidentTimeline.
287 0 : pub fn safekeeper(
288 0 : &mut self,
289 0 : ) -> &mut SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage> {
290 0 : match self {
291 0 : StateSK::Loaded(sk) => sk,
292 : StateSK::Offloaded(_) => {
293 0 : panic!("safekeeper is offloaded, cannot be used")
294 : }
295 0 : StateSK::Empty => unreachable!(),
296 : }
297 0 : }
298 :
299 : /// Moves control file's state structure out of the enum. Used to switch states.
300 0 : fn take_state(self) -> TimelineState<control_file::FileStorage> {
301 0 : match self {
302 0 : StateSK::Loaded(sk) => sk.state,
303 0 : StateSK::Offloaded(state) => *state,
304 0 : StateSK::Empty => unreachable!(),
305 : }
306 0 : }
307 : }
308 :
309 : /// Shared state associated with database instance
310 : pub struct SharedState {
311 : /// Safekeeper object
312 : pub(crate) sk: StateSK,
313 : /// In memory list containing state of peers sent in latest messages from them.
314 : pub(crate) peers_info: PeersInfo,
315 : // True value hinders old WAL removal; this is used by snapshotting. We
316 : // could make it a counter, but there is no need to.
317 : pub(crate) wal_removal_on_hold: bool,
318 : }
319 :
320 : impl SharedState {
321 : /// Initialize fresh timeline state without persisting anything to disk.
322 0 : fn create_new(
323 0 : conf: &SafeKeeperConf,
324 0 : ttid: &TenantTimelineId,
325 0 : state: TimelinePersistentState,
326 0 : ) -> Result<Self> {
327 0 : if state.server.wal_seg_size == 0 {
328 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
329 0 : }
330 0 :
331 0 : if state.server.pg_version == UNKNOWN_SERVER_VERSION {
332 0 : bail!(TimelineError::UninitialinzedPgVersion(*ttid));
333 0 : }
334 0 :
335 0 : if state.commit_lsn < state.local_start_lsn {
336 0 : bail!(
337 0 : "commit_lsn {} is higher than local_start_lsn {}",
338 0 : state.commit_lsn,
339 0 : state.local_start_lsn
340 0 : );
341 0 : }
342 0 :
343 0 : // We don't want to write anything to disk, because we may have existing timeline there.
344 0 : // These functions should not change anything on disk.
345 0 : let timeline_dir = get_timeline_dir(conf, ttid);
346 0 : let control_store =
347 0 : control_file::FileStorage::create_new(timeline_dir.clone(), conf, state)?;
348 0 : let wal_store =
349 0 : wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
350 0 : let sk = SafeKeeper::new(TimelineState::new(control_store), wal_store, conf.my_id)?;
351 :
352 0 : Ok(Self {
353 0 : sk: StateSK::Loaded(sk),
354 0 : peers_info: PeersInfo(vec![]),
355 0 : wal_removal_on_hold: false,
356 0 : })
357 0 : }
358 :
359 : /// Restore SharedState from control file. If file doesn't exist, bails out.
360 0 : fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
361 0 : let timeline_dir = get_timeline_dir(conf, ttid);
362 0 : let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
363 0 : if control_store.server.wal_seg_size == 0 {
364 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
365 0 : }
366 :
367 0 : let sk = match control_store.eviction_state {
368 : EvictionState::Present => {
369 0 : let wal_store =
370 0 : wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
371 0 : StateSK::Loaded(SafeKeeper::new(
372 0 : TimelineState::new(control_store),
373 0 : wal_store,
374 0 : conf.my_id,
375 0 : )?)
376 : }
377 : EvictionState::Offloaded(_) => {
378 0 : StateSK::Offloaded(Box::new(TimelineState::new(control_store)))
379 : }
380 : };
381 :
382 0 : Ok(Self {
383 0 : sk,
384 0 : peers_info: PeersInfo(vec![]),
385 0 : wal_removal_on_hold: false,
386 0 : })
387 0 : }
388 :
389 0 : pub(crate) fn get_wal_seg_size(&self) -> usize {
390 0 : self.sk.state().server.wal_seg_size as usize
391 0 : }
392 :
393 0 : fn get_safekeeper_info(
394 0 : &self,
395 0 : ttid: &TenantTimelineId,
396 0 : conf: &SafeKeeperConf,
397 0 : standby_apply_lsn: Lsn,
398 0 : ) -> SafekeeperTimelineInfo {
399 0 : SafekeeperTimelineInfo {
400 0 : safekeeper_id: conf.my_id.0,
401 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
402 0 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
403 0 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
404 0 : }),
405 0 : term: self.sk.state().acceptor_state.term,
406 0 : last_log_term: self.sk.last_log_term(),
407 0 : flush_lsn: self.sk.flush_lsn().0,
408 0 : // note: this value is not flushed to control file yet and can be lost
409 0 : commit_lsn: self.sk.state().inmem.commit_lsn.0,
410 0 : remote_consistent_lsn: self.sk.state().inmem.remote_consistent_lsn.0,
411 0 : peer_horizon_lsn: self.sk.state().inmem.peer_horizon_lsn.0,
412 0 : safekeeper_connstr: conf
413 0 : .advertise_pg_addr
414 0 : .to_owned()
415 0 : .unwrap_or(conf.listen_pg_addr.clone()),
416 0 : http_connstr: conf.listen_http_addr.to_owned(),
417 0 : backup_lsn: self.sk.state().inmem.backup_lsn.0,
418 0 : local_start_lsn: self.sk.state().local_start_lsn.0,
419 0 : availability_zone: conf.availability_zone.clone(),
420 0 : standby_horizon: standby_apply_lsn.0,
421 0 : }
422 0 : }
423 :
424 : /// Get our latest view of alive peers status on the timeline.
425 : /// We pass our own info through the broker as well, so when we don't have connection
426 : /// to the broker returned vec is empty.
427 0 : pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
428 0 : let now = Instant::now();
429 0 : self.peers_info
430 0 : .0
431 0 : .iter()
432 0 : // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
433 0 : .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
434 0 : .cloned()
435 0 : .collect()
436 0 : }
437 : }
438 :
439 0 : #[derive(Debug, thiserror::Error)]
440 : pub enum TimelineError {
441 : #[error("Timeline {0} was cancelled and cannot be used anymore")]
442 : Cancelled(TenantTimelineId),
443 : #[error("Timeline {0} was not found in global map")]
444 : NotFound(TenantTimelineId),
445 : #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
446 : Invalid(TenantTimelineId),
447 : #[error("Timeline {0} is already exists")]
448 : AlreadyExists(TenantTimelineId),
449 : #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
450 : UninitializedWalSegSize(TenantTimelineId),
451 : #[error("Timeline {0} is not initialized, pg_version is unknown")]
452 : UninitialinzedPgVersion(TenantTimelineId),
453 : }
454 :
455 : // Convert to HTTP API error.
456 : impl From<TimelineError> for ApiError {
457 0 : fn from(te: TimelineError) -> ApiError {
458 0 : match te {
459 0 : TimelineError::NotFound(ttid) => {
460 0 : ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
461 : }
462 0 : _ => ApiError::InternalServerError(anyhow!("{}", te)),
463 : }
464 0 : }
465 : }
466 :
467 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
468 : /// It also holds SharedState and provides mutually exclusive access to it.
469 : pub struct Timeline {
470 : pub ttid: TenantTimelineId,
471 :
472 : /// Used to broadcast commit_lsn updates to all background jobs.
473 : commit_lsn_watch_tx: watch::Sender<Lsn>,
474 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
475 :
476 : /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
477 : /// them when sending in recovery mode (to walproposer or peers). Note: this
478 : /// is just a notification, WAL reading should always done with lock held as
479 : /// term can change otherwise.
480 : term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
481 : term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
482 :
483 : /// Broadcasts shared state updates.
484 : shared_state_version_tx: watch::Sender<usize>,
485 : shared_state_version_rx: watch::Receiver<usize>,
486 :
487 : /// Safekeeper and other state, that should remain consistent and
488 : /// synchronized with the disk. This is tokio mutex as we write WAL to disk
489 : /// while holding it, ensuring that consensus checks are in order.
490 : mutex: RwLock<SharedState>,
491 : walsenders: Arc<WalSenders>,
492 : walreceivers: Arc<WalReceivers>,
493 : timeline_dir: Utf8PathBuf,
494 : manager_ctl: ManagerCtl,
495 :
496 : /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
497 : pub(crate) cancel: CancellationToken,
498 :
499 : // timeline_manager controlled state
500 : pub(crate) broker_active: AtomicBool,
501 : pub(crate) wal_backup_active: AtomicBool,
502 : pub(crate) last_removed_segno: AtomicU64,
503 : pub(crate) mgr_status: AtomicStatus,
504 : }
505 :
506 : impl Timeline {
507 : /// Load existing timeline from disk.
508 0 : pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Timeline> {
509 0 : let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
510 :
511 0 : let shared_state = SharedState::restore(conf, &ttid)?;
512 0 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
513 0 : watch::channel(shared_state.sk.state().commit_lsn);
514 0 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
515 0 : shared_state.sk.last_log_term(),
516 0 : shared_state.sk.flush_lsn(),
517 0 : )));
518 0 : let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
519 0 :
520 0 : let walreceivers = WalReceivers::new();
521 0 : Ok(Timeline {
522 0 : ttid,
523 0 : commit_lsn_watch_tx,
524 0 : commit_lsn_watch_rx,
525 0 : term_flush_lsn_watch_tx,
526 0 : term_flush_lsn_watch_rx,
527 0 : shared_state_version_tx,
528 0 : shared_state_version_rx,
529 0 : mutex: RwLock::new(shared_state),
530 0 : walsenders: WalSenders::new(walreceivers.clone()),
531 0 : walreceivers,
532 0 : cancel: CancellationToken::default(),
533 0 : timeline_dir: get_timeline_dir(conf, &ttid),
534 0 : manager_ctl: ManagerCtl::new(),
535 0 : broker_active: AtomicBool::new(false),
536 0 : wal_backup_active: AtomicBool::new(false),
537 0 : last_removed_segno: AtomicU64::new(0),
538 0 : mgr_status: AtomicStatus::new(),
539 0 : })
540 0 : }
541 :
542 : /// Create a new timeline, which is not yet persisted to disk.
543 0 : pub fn create_empty(
544 0 : conf: &SafeKeeperConf,
545 0 : ttid: TenantTimelineId,
546 0 : server_info: ServerInfo,
547 0 : commit_lsn: Lsn,
548 0 : local_start_lsn: Lsn,
549 0 : ) -> Result<Timeline> {
550 0 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
551 0 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
552 0 : watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
553 0 : let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
554 0 :
555 0 : let state =
556 0 : TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
557 0 :
558 0 : let walreceivers = WalReceivers::new();
559 0 : Ok(Timeline {
560 0 : ttid,
561 0 : commit_lsn_watch_tx,
562 0 : commit_lsn_watch_rx,
563 0 : term_flush_lsn_watch_tx,
564 0 : term_flush_lsn_watch_rx,
565 0 : shared_state_version_tx,
566 0 : shared_state_version_rx,
567 0 : mutex: RwLock::new(SharedState::create_new(conf, &ttid, state)?),
568 0 : walsenders: WalSenders::new(walreceivers.clone()),
569 0 : walreceivers,
570 0 : cancel: CancellationToken::default(),
571 0 : timeline_dir: get_timeline_dir(conf, &ttid),
572 0 : manager_ctl: ManagerCtl::new(),
573 0 : broker_active: AtomicBool::new(false),
574 0 : wal_backup_active: AtomicBool::new(false),
575 0 : last_removed_segno: AtomicU64::new(0),
576 0 : mgr_status: AtomicStatus::new(),
577 : })
578 0 : }
579 :
580 : /// Initialize fresh timeline on disk and start background tasks. If init
581 : /// fails, timeline is cancelled and cannot be used anymore.
582 : ///
583 : /// Init is transactional, so if it fails, created files will be deleted,
584 : /// and state on disk should remain unchanged.
585 0 : pub async fn init_new(
586 0 : self: &Arc<Timeline>,
587 0 : shared_state: &mut WriteGuardSharedState<'_>,
588 0 : conf: &SafeKeeperConf,
589 0 : broker_active_set: Arc<TimelinesSet>,
590 0 : partial_backup_rate_limiter: RateLimiter,
591 0 : ) -> Result<()> {
592 0 : match fs::metadata(&self.timeline_dir).await {
593 : Ok(_) => {
594 : // Timeline directory exists on disk, we should leave state unchanged
595 : // and return error.
596 0 : bail!(TimelineError::Invalid(self.ttid));
597 : }
598 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
599 0 : Err(e) => {
600 0 : return Err(e.into());
601 : }
602 : }
603 :
604 : // Create timeline directory.
605 0 : fs::create_dir_all(&self.timeline_dir).await?;
606 :
607 : // Write timeline to disk and start background tasks.
608 0 : if let Err(e) = shared_state.sk.state_mut().flush().await {
609 : // Bootstrap failed, cancel timeline and remove timeline directory.
610 0 : self.cancel(shared_state);
611 :
612 0 : if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
613 0 : warn!(
614 0 : "failed to remove timeline {} directory after bootstrap failure: {}",
615 0 : self.ttid, fs_err
616 : );
617 0 : }
618 :
619 0 : return Err(e);
620 0 : }
621 0 : self.bootstrap(conf, broker_active_set, partial_backup_rate_limiter);
622 0 : Ok(())
623 0 : }
624 :
625 : /// Bootstrap new or existing timeline starting background tasks.
626 0 : pub fn bootstrap(
627 0 : self: &Arc<Timeline>,
628 0 : conf: &SafeKeeperConf,
629 0 : broker_active_set: Arc<TimelinesSet>,
630 0 : partial_backup_rate_limiter: RateLimiter,
631 0 : ) {
632 0 : let (tx, rx) = self.manager_ctl.bootstrap_manager();
633 0 :
634 0 : // Start manager task which will monitor timeline state and update
635 0 : // background tasks.
636 0 : tokio::spawn(timeline_manager::main_task(
637 0 : ManagerTimeline { tli: self.clone() },
638 0 : conf.clone(),
639 0 : broker_active_set,
640 0 : tx,
641 0 : rx,
642 0 : partial_backup_rate_limiter,
643 0 : ));
644 0 : }
645 :
646 : /// Delete timeline from disk completely, by removing timeline directory.
647 : /// Background timeline activities will stop eventually.
648 : ///
649 : /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
650 : /// deletion API endpoint is retriable.
651 0 : pub async fn delete(
652 0 : &self,
653 0 : shared_state: &mut WriteGuardSharedState<'_>,
654 0 : only_local: bool,
655 0 : ) -> Result<bool> {
656 0 : self.cancel(shared_state);
657 0 :
658 0 : // TODO: It's better to wait for s3 offloader termination before
659 0 : // removing data from s3. Though since s3 doesn't have transactions it
660 0 : // still wouldn't guarantee absense of data after removal.
661 0 : let conf = GlobalTimelines::get_global_config();
662 0 : if !only_local && conf.is_wal_backup_enabled() {
663 : // Note: we concurrently delete remote storage data from multiple
664 : // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
665 : // do some retries anyway.
666 0 : wal_backup::delete_timeline(&self.ttid).await?;
667 0 : }
668 0 : let dir_existed = delete_dir(&self.timeline_dir).await?;
669 0 : Ok(dir_existed)
670 0 : }
671 :
672 : /// Cancel timeline to prevent further usage. Background tasks will stop
673 : /// eventually after receiving cancellation signal.
674 0 : fn cancel(&self, shared_state: &mut WriteGuardSharedState<'_>) {
675 0 : info!("timeline {} is cancelled", self.ttid);
676 0 : self.cancel.cancel();
677 0 : // Close associated FDs. Nobody will be able to touch timeline data once
678 0 : // it is cancelled, so WAL storage won't be opened again.
679 0 : shared_state.sk.close_wal_store();
680 0 : }
681 :
682 : /// Returns if timeline is cancelled.
683 0 : pub fn is_cancelled(&self) -> bool {
684 0 : self.cancel.is_cancelled()
685 0 : }
686 :
687 : /// Take a writing mutual exclusive lock on timeline shared_state.
688 0 : pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
689 0 : WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
690 0 : }
691 :
692 0 : pub async fn read_shared_state(&self) -> ReadGuardSharedState {
693 0 : self.mutex.read().await
694 0 : }
695 :
696 : /// Returns commit_lsn watch channel.
697 0 : pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
698 0 : self.commit_lsn_watch_rx.clone()
699 0 : }
700 :
701 : /// Returns term_flush_lsn watch channel.
702 0 : pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
703 0 : self.term_flush_lsn_watch_rx.clone()
704 0 : }
705 :
706 : /// Returns watch channel for SharedState update version.
707 0 : pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
708 0 : self.shared_state_version_rx.clone()
709 0 : }
710 :
711 : /// Returns wal_seg_size.
712 0 : pub async fn get_wal_seg_size(&self) -> usize {
713 0 : self.read_shared_state().await.get_wal_seg_size()
714 0 : }
715 :
716 : /// Returns state of the timeline.
717 0 : pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
718 0 : let state = self.read_shared_state().await;
719 0 : (
720 0 : state.sk.state().inmem.clone(),
721 0 : TimelinePersistentState::clone(state.sk.state()),
722 0 : )
723 0 : }
724 :
725 : /// Returns latest backup_lsn.
726 0 : pub async fn get_wal_backup_lsn(&self) -> Lsn {
727 0 : self.read_shared_state().await.sk.state().inmem.backup_lsn
728 0 : }
729 :
730 : /// Sets backup_lsn to the given value.
731 0 : pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
732 0 : if self.is_cancelled() {
733 0 : bail!(TimelineError::Cancelled(self.ttid));
734 0 : }
735 :
736 0 : let mut state = self.write_shared_state().await;
737 0 : state.sk.state_mut().inmem.backup_lsn = max(state.sk.state().inmem.backup_lsn, backup_lsn);
738 0 : // we should check whether to shut down offloader, but this will be done
739 0 : // soon by peer communication anyway.
740 0 : Ok(())
741 0 : }
742 :
743 : /// Get safekeeper info for broadcasting to broker and other peers.
744 0 : pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
745 0 : let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
746 0 : let shared_state = self.read_shared_state().await;
747 0 : shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
748 0 : }
749 :
750 : /// Update timeline state with peer safekeeper data.
751 0 : pub async fn record_safekeeper_info(
752 0 : self: &Arc<Self>,
753 0 : sk_info: SafekeeperTimelineInfo,
754 0 : ) -> Result<()> {
755 : {
756 0 : let mut shared_state = self.write_shared_state().await;
757 0 : shared_state.sk.record_safekeeper_info(&sk_info).await?;
758 0 : let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
759 0 : shared_state.peers_info.upsert(&peer_info);
760 0 : }
761 0 : Ok(())
762 0 : }
763 :
764 0 : pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
765 0 : let shared_state = self.read_shared_state().await;
766 0 : shared_state.get_peers(conf.heartbeat_timeout)
767 0 : }
768 :
769 0 : pub fn get_walsenders(&self) -> &Arc<WalSenders> {
770 0 : &self.walsenders
771 0 : }
772 :
773 0 : pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
774 0 : &self.walreceivers
775 0 : }
776 :
777 : /// Returns flush_lsn.
778 0 : pub async fn get_flush_lsn(&self) -> Lsn {
779 0 : self.read_shared_state().await.sk.flush_lsn()
780 0 : }
781 :
782 : /// Gather timeline data for metrics.
783 0 : pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
784 0 : if self.is_cancelled() {
785 0 : return None;
786 0 : }
787 0 :
788 0 : let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
789 0 : let state = self.read_shared_state().await;
790 0 : Some(FullTimelineInfo {
791 0 : ttid: self.ttid,
792 0 : ps_feedback_count,
793 0 : last_ps_feedback,
794 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
795 0 : timeline_is_active: self.broker_active.load(Ordering::Relaxed),
796 0 : num_computes: self.walreceivers.get_num() as u32,
797 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
798 0 : epoch_start_lsn: state.sk.term_start_lsn(),
799 0 : mem_state: state.sk.state().inmem.clone(),
800 0 : persisted_state: TimelinePersistentState::clone(state.sk.state()),
801 0 : flush_lsn: state.sk.flush_lsn(),
802 0 : wal_storage: state.sk.wal_storage_metrics(),
803 0 : })
804 0 : }
805 :
806 : /// Returns in-memory timeline state to build a full debug dump.
807 0 : pub async fn memory_dump(&self) -> debug_dump::Memory {
808 0 : let state = self.read_shared_state().await;
809 :
810 0 : let (write_lsn, write_record_lsn, flush_lsn, file_open) =
811 0 : state.sk.wal_storage_internal_state();
812 0 :
813 0 : debug_dump::Memory {
814 0 : is_cancelled: self.is_cancelled(),
815 0 : peers_info_len: state.peers_info.0.len(),
816 0 : walsenders: self.walsenders.get_all(),
817 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
818 0 : active: self.broker_active.load(Ordering::Relaxed),
819 0 : num_computes: self.walreceivers.get_num() as u32,
820 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
821 0 : epoch_start_lsn: state.sk.term_start_lsn(),
822 0 : mem_state: state.sk.state().inmem.clone(),
823 0 : mgr_status: self.mgr_status.get(),
824 0 : write_lsn,
825 0 : write_record_lsn,
826 0 : flush_lsn,
827 0 : file_open,
828 0 : }
829 0 : }
830 :
831 : /// Apply a function to the control file state and persist it.
832 0 : pub async fn map_control_file<T>(
833 0 : self: &Arc<Self>,
834 0 : f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
835 0 : ) -> Result<T> {
836 0 : let mut state = self.write_shared_state().await;
837 0 : let mut persistent_state = state.sk.state_mut().start_change();
838 : // If f returns error, we abort the change and don't persist anything.
839 0 : let res = f(&mut persistent_state)?;
840 : // If persisting fails, we abort the change and return error.
841 0 : state
842 0 : .sk
843 0 : .state_mut()
844 0 : .finish_change(&persistent_state)
845 0 : .await?;
846 0 : Ok(res)
847 0 : }
848 :
849 : /// Get the timeline guard for reading/writing WAL files.
850 : /// If WAL files are not present on disk (evicted), they will be automatically
851 : /// downloaded from remote storage. This is done in the manager task, which is
852 : /// responsible for issuing all guards.
853 : ///
854 : /// NB: don't use this function from timeline_manager, it will deadlock.
855 : /// NB: don't use this function while holding shared_state lock.
856 0 : pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
857 0 : if self.is_cancelled() {
858 0 : bail!(TimelineError::Cancelled(self.ttid));
859 0 : }
860 0 :
861 0 : debug!("requesting WalResidentTimeline guard");
862 0 : let started_at = Instant::now();
863 0 : let status_before = self.mgr_status.get();
864 :
865 : // Wait 30 seconds for the guard to be acquired. It can time out if someone is
866 : // holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
867 : // is stuck.
868 0 : let res = tokio::time::timeout_at(
869 0 : started_at + Duration::from_secs(30),
870 0 : self.manager_ctl.wal_residence_guard(),
871 0 : )
872 0 : .await;
873 :
874 0 : let guard = match res {
875 0 : Ok(Ok(guard)) => {
876 0 : let finished_at = Instant::now();
877 0 : let elapsed = finished_at - started_at;
878 0 : MISC_OPERATION_SECONDS
879 0 : .with_label_values(&["wal_residence_guard"])
880 0 : .observe(elapsed.as_secs_f64());
881 0 :
882 0 : guard
883 : }
884 0 : Ok(Err(e)) => {
885 0 : warn!(
886 0 : "error while acquiring WalResidentTimeline guard, statuses {:?} => {:?}",
887 0 : status_before,
888 0 : self.mgr_status.get()
889 : );
890 0 : return Err(e);
891 : }
892 : Err(_) => {
893 0 : warn!(
894 0 : "timeout while acquiring WalResidentTimeline guard, statuses {:?} => {:?}",
895 0 : status_before,
896 0 : self.mgr_status.get()
897 : );
898 0 : anyhow::bail!("timeout while acquiring WalResidentTimeline guard");
899 : }
900 : };
901 :
902 0 : Ok(WalResidentTimeline::new(self.clone(), guard))
903 0 : }
904 : }
905 :
906 : /// This is a guard that allows to read/write disk timeline state.
907 : /// All tasks that are trying to read/write WAL from disk should use this guard.
908 : pub struct WalResidentTimeline {
909 : pub tli: Arc<Timeline>,
910 : _guard: ResidenceGuard,
911 : }
912 :
913 : impl WalResidentTimeline {
914 0 : pub fn new(tli: Arc<Timeline>, _guard: ResidenceGuard) -> Self {
915 0 : WalResidentTimeline { tli, _guard }
916 0 : }
917 : }
918 :
919 : impl Deref for WalResidentTimeline {
920 : type Target = Arc<Timeline>;
921 :
922 0 : fn deref(&self) -> &Self::Target {
923 0 : &self.tli
924 0 : }
925 : }
926 :
927 : impl WalResidentTimeline {
928 : /// Returns true if walsender should stop sending WAL to pageserver. We
929 : /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
930 : /// computes. While there might be nothing to stream already, we learn about
931 : /// remote_consistent_lsn update through replication feedback, and we want
932 : /// to stop pushing to the broker if pageserver is fully caughtup.
933 0 : pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
934 0 : if self.is_cancelled() {
935 0 : return true;
936 0 : }
937 0 : let shared_state = self.read_shared_state().await;
938 0 : if self.walreceivers.get_num() == 0 {
939 0 : return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet
940 0 : reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn;
941 0 : }
942 0 : false
943 0 : }
944 :
945 : /// Ensure that current term is t, erroring otherwise, and lock the state.
946 0 : pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
947 0 : let ss = self.read_shared_state().await;
948 0 : if ss.sk.state().acceptor_state.term != t {
949 0 : bail!(
950 0 : "failed to acquire term {}, current term {}",
951 0 : t,
952 0 : ss.sk.state().acceptor_state.term
953 0 : );
954 0 : }
955 0 : Ok(ss)
956 0 : }
957 :
958 : /// Pass arrived message to the safekeeper.
959 0 : pub async fn process_msg(
960 0 : &self,
961 0 : msg: &ProposerAcceptorMessage,
962 0 : ) -> Result<Option<AcceptorProposerMessage>> {
963 0 : if self.is_cancelled() {
964 0 : bail!(TimelineError::Cancelled(self.ttid));
965 0 : }
966 :
967 : let mut rmsg: Option<AcceptorProposerMessage>;
968 : {
969 0 : let mut shared_state = self.write_shared_state().await;
970 0 : rmsg = shared_state.sk.safekeeper().process_msg(msg).await?;
971 :
972 : // if this is AppendResponse, fill in proper hot standby feedback.
973 0 : if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
974 0 : resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
975 0 : }
976 : }
977 0 : Ok(rmsg)
978 0 : }
979 :
980 0 : pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
981 0 : let (_, persisted_state) = self.get_state().await;
982 0 : let enable_remote_read = GlobalTimelines::get_global_config().is_wal_backup_enabled();
983 0 :
984 0 : WalReader::new(
985 0 : &self.ttid,
986 0 : self.timeline_dir.clone(),
987 0 : &persisted_state,
988 0 : start_lsn,
989 0 : enable_remote_read,
990 0 : )
991 0 : }
992 :
993 0 : pub fn get_timeline_dir(&self) -> Utf8PathBuf {
994 0 : self.timeline_dir.clone()
995 0 : }
996 :
997 : /// Update in memory remote consistent lsn.
998 0 : pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
999 0 : let mut shared_state = self.write_shared_state().await;
1000 0 : shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
1001 0 : shared_state.sk.state().inmem.remote_consistent_lsn,
1002 0 : candidate,
1003 0 : );
1004 0 : }
1005 : }
1006 :
1007 : /// This struct contains methods that are used by timeline manager task.
1008 : pub(crate) struct ManagerTimeline {
1009 : pub(crate) tli: Arc<Timeline>,
1010 : }
1011 :
1012 : impl Deref for ManagerTimeline {
1013 : type Target = Arc<Timeline>;
1014 :
1015 0 : fn deref(&self) -> &Self::Target {
1016 0 : &self.tli
1017 0 : }
1018 : }
1019 :
1020 : impl ManagerTimeline {
1021 0 : pub(crate) fn timeline_dir(&self) -> &Utf8PathBuf {
1022 0 : &self.tli.timeline_dir
1023 0 : }
1024 :
1025 : /// Manager requests this state on startup.
1026 0 : pub(crate) async fn bootstrap_mgr(&self) -> (bool, Option<PartialRemoteSegment>) {
1027 0 : let shared_state = self.read_shared_state().await;
1028 0 : let is_offloaded = matches!(
1029 0 : shared_state.sk.state().eviction_state,
1030 : EvictionState::Offloaded(_)
1031 : );
1032 0 : let partial_backup_uploaded = shared_state.sk.state().partial_backup.uploaded_segment();
1033 0 :
1034 0 : (is_offloaded, partial_backup_uploaded)
1035 0 : }
1036 :
1037 : /// Try to switch state Present->Offloaded.
1038 0 : pub(crate) async fn switch_to_offloaded(
1039 0 : &self,
1040 0 : partial: &PartialRemoteSegment,
1041 0 : ) -> anyhow::Result<()> {
1042 0 : let mut shared = self.write_shared_state().await;
1043 :
1044 : // updating control file
1045 0 : let mut pstate = shared.sk.state_mut().start_change();
1046 :
1047 0 : if !matches!(pstate.eviction_state, EvictionState::Present) {
1048 0 : bail!(
1049 0 : "cannot switch to offloaded state, current state is {:?}",
1050 0 : pstate.eviction_state
1051 0 : );
1052 0 : }
1053 0 :
1054 0 : if partial.flush_lsn != shared.sk.flush_lsn() {
1055 0 : bail!(
1056 0 : "flush_lsn mismatch in partial backup, expected {}, got {}",
1057 0 : shared.sk.flush_lsn(),
1058 0 : partial.flush_lsn
1059 0 : );
1060 0 : }
1061 0 :
1062 0 : if partial.commit_lsn != pstate.commit_lsn {
1063 0 : bail!(
1064 0 : "commit_lsn mismatch in partial backup, expected {}, got {}",
1065 0 : pstate.commit_lsn,
1066 0 : partial.commit_lsn
1067 0 : );
1068 0 : }
1069 0 :
1070 0 : if partial.term != shared.sk.last_log_term() {
1071 0 : bail!(
1072 0 : "term mismatch in partial backup, expected {}, got {}",
1073 0 : shared.sk.last_log_term(),
1074 0 : partial.term
1075 0 : );
1076 0 : }
1077 0 :
1078 0 : pstate.eviction_state = EvictionState::Offloaded(shared.sk.flush_lsn());
1079 0 : shared.sk.state_mut().finish_change(&pstate).await?;
1080 : // control file is now switched to Offloaded state
1081 :
1082 : // now we can switch shared.sk to Offloaded, shouldn't fail
1083 0 : let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
1084 0 : let cfile_state = prev_sk.take_state();
1085 0 : shared.sk = StateSK::Offloaded(Box::new(cfile_state));
1086 0 :
1087 0 : Ok(())
1088 0 : }
1089 :
1090 : /// Try to switch state Offloaded->Present.
1091 0 : pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
1092 0 : let conf = GlobalTimelines::get_global_config();
1093 0 : let mut shared = self.write_shared_state().await;
1094 :
1095 : // trying to restore WAL storage
1096 0 : let wal_store = wal_storage::PhysicalStorage::new(
1097 0 : &self.ttid,
1098 0 : self.timeline_dir.clone(),
1099 0 : &conf,
1100 0 : shared.sk.state(),
1101 0 : )?;
1102 :
1103 : // updating control file
1104 0 : let mut pstate = shared.sk.state_mut().start_change();
1105 :
1106 0 : if !matches!(pstate.eviction_state, EvictionState::Offloaded(_)) {
1107 0 : bail!(
1108 0 : "cannot switch to present state, current state is {:?}",
1109 0 : pstate.eviction_state
1110 0 : );
1111 0 : }
1112 0 :
1113 0 : if wal_store.flush_lsn() != shared.sk.flush_lsn() {
1114 0 : bail!(
1115 0 : "flush_lsn mismatch in restored WAL, expected {}, got {}",
1116 0 : shared.sk.flush_lsn(),
1117 0 : wal_store.flush_lsn()
1118 0 : );
1119 0 : }
1120 0 :
1121 0 : pstate.eviction_state = EvictionState::Present;
1122 0 : shared.sk.state_mut().finish_change(&pstate).await?;
1123 :
1124 : // now we can switch shared.sk to Present, shouldn't fail
1125 0 : let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
1126 0 : let cfile_state = prev_sk.take_state();
1127 0 : shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, conf.my_id)?);
1128 :
1129 0 : Ok(())
1130 0 : }
1131 :
1132 : /// Update current manager state, useful for debugging manager deadlocks.
1133 0 : pub(crate) fn set_status(&self, status: timeline_manager::Status) {
1134 0 : self.mgr_status.store(status, Ordering::Relaxed);
1135 0 : }
1136 : }
1137 :
1138 : /// Deletes directory and it's contents. Returns false if directory does not exist.
1139 0 : async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
1140 0 : match fs::remove_dir_all(path).await {
1141 0 : Ok(_) => Ok(true),
1142 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
1143 0 : Err(e) => Err(e.into()),
1144 : }
1145 0 : }
1146 :
1147 : /// Get a path to the tenant directory. If you just need to get a timeline directory,
1148 : /// use WalResidentTimeline::get_timeline_dir instead.
1149 14 : pub(crate) fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
1150 14 : conf.workdir.join(tenant_id.to_string())
1151 14 : }
1152 :
1153 : /// Get a path to the timeline directory. If you need to read WAL files from disk,
1154 : /// use WalResidentTimeline::get_timeline_dir instead. This function does not check
1155 : /// timeline eviction status and WAL files might not be present on disk.
1156 14 : pub(crate) fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
1157 14 : get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
1158 14 : }
|