Line data Source code
1 : //! This module implements Timeline lifecycle management and has all necessary code
2 : //! to glue together SafeKeeper and all other background services.
3 :
4 : use std::cmp::max;
5 : use std::ops::{Deref, DerefMut};
6 : use std::sync::Arc;
7 : use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8 : use std::time::Duration;
9 :
10 : use anyhow::{Result, anyhow, bail};
11 : use camino::{Utf8Path, Utf8PathBuf};
12 : use http_utils::error::ApiError;
13 : use remote_storage::RemotePath;
14 : use safekeeper_api::Term;
15 : use safekeeper_api::membership::Configuration;
16 : use safekeeper_api::models::{
17 : PeerInfo, TimelineMembershipSwitchResponse, TimelineTermBumpResponse,
18 : };
19 : use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
20 : use tokio::fs::{self};
21 : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, watch};
22 : use tokio::time::Instant;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::*;
25 : use utils::id::{NodeId, TenantId, TenantTimelineId};
26 : use utils::lsn::Lsn;
27 : use utils::sync::gate::Gate;
28 :
29 : use crate::metrics::{
30 : FullTimelineInfo, MISC_OPERATION_SECONDS, WAL_STORAGE_LIMIT_ERRORS, WalStorageMetrics,
31 : };
32 :
33 : use crate::hadron::GLOBAL_DISK_LIMIT_EXCEEDED;
34 : use crate::rate_limit::RateLimiter;
35 : use crate::receive_wal::WalReceivers;
36 : use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn};
37 : use crate::send_wal::{WalSenders, WalSendersTimelineMetricValues};
38 : use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
39 : use crate::timeline_guard::ResidenceGuard;
40 : use crate::timeline_manager::{AtomicStatus, ManagerCtl};
41 : use crate::timelines_set::TimelinesSet;
42 : use crate::wal_backup;
43 : use crate::wal_backup::{WalBackup, remote_timeline_path};
44 : use crate::wal_backup_partial::PartialRemoteSegment;
45 : use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
46 : use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage};
47 :
48 0 : fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
49 0 : PeerInfo {
50 0 : sk_id: NodeId(sk_info.safekeeper_id),
51 0 : term: sk_info.term,
52 0 : last_log_term: sk_info.last_log_term,
53 0 : flush_lsn: Lsn(sk_info.flush_lsn),
54 0 : commit_lsn: Lsn(sk_info.commit_lsn),
55 0 : local_start_lsn: Lsn(sk_info.local_start_lsn),
56 0 : pg_connstr: sk_info.safekeeper_connstr.clone(),
57 0 : http_connstr: sk_info.http_connstr.clone(),
58 0 : https_connstr: sk_info.https_connstr.clone(),
59 0 : ts,
60 0 : }
61 0 : }
62 :
63 : // vector-based node id -> peer state map with very limited functionality we
64 : // need.
65 : #[derive(Debug, Clone, Default)]
66 : pub struct PeersInfo(pub Vec<PeerInfo>);
67 :
68 : impl PeersInfo {
69 0 : fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
70 0 : self.0.iter_mut().find(|p| p.sk_id == id)
71 0 : }
72 :
73 0 : fn upsert(&mut self, p: &PeerInfo) {
74 0 : match self.get(p.sk_id) {
75 0 : Some(rp) => *rp = p.clone(),
76 0 : None => self.0.push(p.clone()),
77 : }
78 0 : }
79 : }
80 :
81 : pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
82 :
83 : /// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
84 : /// automatically updates `watch::Sender` channels with state on drop.
85 : pub struct WriteGuardSharedState<'a> {
86 : tli: Arc<Timeline>,
87 : guard: RwLockWriteGuard<'a, SharedState>,
88 : }
89 :
90 : impl<'a> WriteGuardSharedState<'a> {
91 1249 : fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
92 1249 : WriteGuardSharedState { tli, guard }
93 1249 : }
94 : }
95 :
96 : impl Deref for WriteGuardSharedState<'_> {
97 : type Target = SharedState;
98 :
99 0 : fn deref(&self) -> &Self::Target {
100 0 : &self.guard
101 0 : }
102 : }
103 :
104 : impl DerefMut for WriteGuardSharedState<'_> {
105 1244 : fn deref_mut(&mut self) -> &mut Self::Target {
106 1244 : &mut self.guard
107 1244 : }
108 : }
109 :
110 : impl Drop for WriteGuardSharedState<'_> {
111 1249 : fn drop(&mut self) {
112 1249 : let term_flush_lsn =
113 1249 : TermLsn::from((self.guard.sk.last_log_term(), self.guard.sk.flush_lsn()));
114 1249 : let commit_lsn = self.guard.sk.state().inmem.commit_lsn;
115 :
116 1249 : let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
117 1249 : if *old != term_flush_lsn {
118 620 : *old = term_flush_lsn;
119 620 : true
120 : } else {
121 629 : false
122 : }
123 1249 : });
124 :
125 1249 : let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
126 1249 : if *old != commit_lsn {
127 615 : *old = commit_lsn;
128 615 : true
129 : } else {
130 634 : false
131 : }
132 1249 : });
133 :
134 : // send notification about shared state update
135 1249 : self.tli.shared_state_version_tx.send_modify(|old| {
136 1249 : *old += 1;
137 1249 : });
138 1249 : }
139 : }
140 :
141 : /// This structure is stored in shared state and represents the state of the timeline.
142 : ///
143 : /// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
144 : /// case, SafeKeeper is not available (because WAL is not present on disk) and all
145 : /// operations can be done only with control file.
146 : #[allow(clippy::large_enum_variant, reason = "TODO")]
147 : pub enum StateSK {
148 : Loaded(SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>),
149 : Offloaded(Box<TimelineState<control_file::FileStorage>>),
150 : // Not used, required for moving between states.
151 : Empty,
152 : }
153 :
154 : impl StateSK {
155 2580 : pub fn flush_lsn(&self) -> Lsn {
156 2580 : match self {
157 2580 : StateSK::Loaded(sk) => sk.wal_store.flush_lsn(),
158 0 : StateSK::Offloaded(state) => match state.eviction_state {
159 0 : EvictionState::Offloaded(flush_lsn) => flush_lsn,
160 0 : _ => panic!("StateSK::Offloaded mismatches with eviction_state from control_file"),
161 : },
162 0 : StateSK::Empty => unreachable!(),
163 : }
164 2580 : }
165 :
166 : /// Get a reference to the control file's timeline state.
167 2613 : pub fn state(&self) -> &TimelineState<control_file::FileStorage> {
168 2613 : match self {
169 2613 : StateSK::Loaded(sk) => &sk.state,
170 0 : StateSK::Offloaded(s) => s,
171 0 : StateSK::Empty => unreachable!(),
172 : }
173 2613 : }
174 :
175 4 : pub fn state_mut(&mut self) -> &mut TimelineState<control_file::FileStorage> {
176 4 : match self {
177 4 : StateSK::Loaded(sk) => &mut sk.state,
178 0 : StateSK::Offloaded(s) => s,
179 0 : StateSK::Empty => unreachable!(),
180 : }
181 4 : }
182 :
183 1290 : pub fn last_log_term(&self) -> Term {
184 1290 : self.state()
185 1290 : .acceptor_state
186 1290 : .get_last_log_term(self.flush_lsn())
187 1290 : }
188 :
189 0 : pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
190 0 : self.state_mut().term_bump(to).await
191 0 : }
192 :
193 0 : pub async fn membership_switch(
194 0 : &mut self,
195 0 : to: Configuration,
196 0 : ) -> Result<TimelineMembershipSwitchResponse> {
197 0 : let result = self.state_mut().membership_switch(to).await?;
198 0 : let flush_lsn = self.flush_lsn();
199 0 : let last_log_term = self.state().acceptor_state.get_last_log_term(flush_lsn);
200 :
201 0 : Ok(TimelineMembershipSwitchResponse {
202 0 : previous_conf: result.previous_conf,
203 0 : current_conf: result.current_conf,
204 0 : last_log_term,
205 0 : flush_lsn,
206 0 : })
207 0 : }
208 :
209 : /// Close open WAL files to release FDs.
210 0 : fn close_wal_store(&mut self) {
211 0 : if let StateSK::Loaded(sk) = self {
212 0 : sk.wal_store.close();
213 0 : }
214 0 : }
215 :
216 : /// Update timeline state with peer safekeeper data.
217 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
218 : // update commit_lsn if safekeeper is loaded
219 0 : match self {
220 0 : StateSK::Loaded(sk) => sk.record_safekeeper_info(sk_info).await?,
221 0 : StateSK::Offloaded(_) => {}
222 0 : StateSK::Empty => unreachable!(),
223 : }
224 :
225 : // update everything else, including remote_consistent_lsn and backup_lsn
226 0 : let mut sync_control_file = false;
227 0 : let state = self.state_mut();
228 0 : let wal_seg_size = state.server.wal_seg_size as u64;
229 :
230 0 : state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
231 0 : sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
232 :
233 0 : state.inmem.remote_consistent_lsn = max(
234 0 : Lsn(sk_info.remote_consistent_lsn),
235 0 : state.inmem.remote_consistent_lsn,
236 0 : );
237 0 : sync_control_file |=
238 0 : state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
239 :
240 0 : state.inmem.peer_horizon_lsn =
241 0 : max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
242 0 : sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;
243 :
244 0 : if sync_control_file {
245 0 : state.flush().await?;
246 0 : }
247 0 : Ok(())
248 0 : }
249 :
250 : /// Previously known as epoch_start_lsn. Needed only for reference in some APIs.
251 0 : pub fn term_start_lsn(&self) -> Lsn {
252 0 : match self {
253 0 : StateSK::Loaded(sk) => sk.term_start_lsn,
254 0 : StateSK::Offloaded(_) => Lsn(0),
255 0 : StateSK::Empty => unreachable!(),
256 : }
257 0 : }
258 :
259 : /// Used for metrics only.
260 0 : pub fn wal_storage_metrics(&self) -> WalStorageMetrics {
261 0 : match self {
262 0 : StateSK::Loaded(sk) => sk.wal_store.get_metrics(),
263 0 : StateSK::Offloaded(_) => WalStorageMetrics::default(),
264 0 : StateSK::Empty => unreachable!(),
265 : }
266 0 : }
267 :
268 : /// Returns WAL storage internal LSNs for debug dump.
269 0 : pub fn wal_storage_internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
270 0 : match self {
271 0 : StateSK::Loaded(sk) => sk.wal_store.internal_state(),
272 : StateSK::Offloaded(_) => {
273 0 : let flush_lsn = self.flush_lsn();
274 0 : (flush_lsn, flush_lsn, flush_lsn, false)
275 : }
276 0 : StateSK::Empty => unreachable!(),
277 : }
278 0 : }
279 :
280 : /// Access to SafeKeeper object. Panics if offloaded, should be good to use from WalResidentTimeline.
281 1240 : pub fn safekeeper(
282 1240 : &mut self,
283 1240 : ) -> &mut SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage> {
284 1240 : match self {
285 1240 : StateSK::Loaded(sk) => sk,
286 : StateSK::Offloaded(_) => {
287 0 : panic!("safekeeper is offloaded, cannot be used")
288 : }
289 0 : StateSK::Empty => unreachable!(),
290 : }
291 1240 : }
292 :
293 : /// Moves control file's state structure out of the enum. Used to switch states.
294 0 : fn take_state(self) -> TimelineState<control_file::FileStorage> {
295 0 : match self {
296 0 : StateSK::Loaded(sk) => sk.state,
297 0 : StateSK::Offloaded(state) => *state,
298 0 : StateSK::Empty => unreachable!(),
299 : }
300 0 : }
301 : }
302 :
303 : /// Shared state associated with database instance
304 : pub struct SharedState {
305 : /// Safekeeper object
306 : pub(crate) sk: StateSK,
307 : /// In memory list containing state of peers sent in latest messages from them.
308 : pub(crate) peers_info: PeersInfo,
309 : // True value hinders old WAL removal; this is used by snapshotting. We
310 : // could make it a counter, but there is no need to.
311 : pub(crate) wal_removal_on_hold: bool,
312 : }
313 :
314 : impl SharedState {
315 : /// Creates a new SharedState.
316 5 : pub fn new(sk: StateSK) -> Self {
317 5 : Self {
318 5 : sk,
319 5 : peers_info: PeersInfo(vec![]),
320 5 : wal_removal_on_hold: false,
321 5 : }
322 5 : }
323 :
324 : /// Restore SharedState from control file. If file doesn't exist, bails out.
325 0 : pub fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
326 0 : let timeline_dir = get_timeline_dir(conf, ttid);
327 0 : let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?;
328 0 : if control_store.server.wal_seg_size == 0 {
329 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
330 0 : }
331 :
332 0 : let sk = match control_store.eviction_state {
333 : EvictionState::Present => {
334 0 : let wal_store = wal_storage::PhysicalStorage::new(
335 0 : ttid,
336 0 : &timeline_dir,
337 0 : &control_store,
338 0 : conf.no_sync,
339 0 : )?;
340 0 : StateSK::Loaded(SafeKeeper::new(
341 0 : TimelineState::new(control_store),
342 0 : wal_store,
343 0 : conf.my_id,
344 0 : )?)
345 : }
346 : EvictionState::Offloaded(_) => {
347 0 : StateSK::Offloaded(Box::new(TimelineState::new(control_store)))
348 : }
349 : };
350 :
351 0 : Ok(Self::new(sk))
352 0 : }
353 :
354 5 : pub(crate) fn get_wal_seg_size(&self) -> usize {
355 5 : self.sk.state().server.wal_seg_size as usize
356 5 : }
357 :
358 0 : fn get_safekeeper_info(
359 0 : &self,
360 0 : ttid: &TenantTimelineId,
361 0 : conf: &SafeKeeperConf,
362 0 : standby_apply_lsn: Lsn,
363 0 : ) -> SafekeeperTimelineInfo {
364 0 : SafekeeperTimelineInfo {
365 0 : safekeeper_id: conf.my_id.0,
366 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
367 0 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
368 0 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
369 0 : }),
370 0 : term: self.sk.state().acceptor_state.term,
371 0 : last_log_term: self.sk.last_log_term(),
372 0 : flush_lsn: self.sk.flush_lsn().0,
373 0 : // note: this value is not flushed to control file yet and can be lost
374 0 : commit_lsn: self.sk.state().inmem.commit_lsn.0,
375 0 : remote_consistent_lsn: self.sk.state().inmem.remote_consistent_lsn.0,
376 0 : peer_horizon_lsn: self.sk.state().inmem.peer_horizon_lsn.0,
377 0 : safekeeper_connstr: conf
378 0 : .advertise_pg_addr
379 0 : .to_owned()
380 0 : .unwrap_or(conf.listen_pg_addr.clone()),
381 0 : http_connstr: conf.listen_http_addr.to_owned(),
382 0 : https_connstr: conf.listen_https_addr.to_owned(),
383 0 : backup_lsn: self.sk.state().inmem.backup_lsn.0,
384 0 : local_start_lsn: self.sk.state().local_start_lsn.0,
385 0 : availability_zone: conf.availability_zone.clone(),
386 0 : standby_horizon: standby_apply_lsn.0,
387 0 : }
388 0 : }
389 :
390 : /// Get our latest view of alive peers status on the timeline.
391 : /// We pass our own info through the broker as well, so when we don't have connection
392 : /// to the broker returned vec is empty.
393 36 : pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
394 36 : let now = Instant::now();
395 36 : self.peers_info
396 36 : .0
397 36 : .iter()
398 : // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
399 36 : .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
400 36 : .cloned()
401 36 : .collect()
402 36 : }
403 : }
404 :
405 : #[derive(Debug, thiserror::Error)]
406 : pub enum TimelineError {
407 : #[error("Timeline {0} was cancelled and cannot be used anymore")]
408 : Cancelled(TenantTimelineId),
409 : #[error("Timeline {0} was not found in global map")]
410 : NotFound(TenantTimelineId),
411 : #[error("Timeline {0} has been deleted")]
412 : Deleted(TenantTimelineId),
413 : #[error("Timeline {0} creation is in progress")]
414 : CreationInProgress(TenantTimelineId),
415 : #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
416 : Invalid(TenantTimelineId),
417 : #[error("Timeline {0} is already exists")]
418 : AlreadyExists(TenantTimelineId),
419 : #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
420 : UninitializedWalSegSize(TenantTimelineId),
421 : #[error("Timeline {0} is not initialized, pg_version is unknown")]
422 : UninitialinzedPgVersion(TenantTimelineId),
423 : }
424 :
425 : // Convert to HTTP API error.
426 : impl From<TimelineError> for ApiError {
427 0 : fn from(te: TimelineError) -> ApiError {
428 0 : match te {
429 0 : TimelineError::NotFound(ttid) => {
430 0 : ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
431 : }
432 0 : TimelineError::Deleted(ttid) => {
433 0 : ApiError::NotFound(anyhow!("timeline {} deleted", ttid).into())
434 : }
435 0 : _ => ApiError::InternalServerError(anyhow!("{}", te)),
436 : }
437 0 : }
438 : }
439 :
440 : /// We run remote deletion in a background task, this is how it sends its results back.
441 : type RemoteDeletionReceiver = tokio::sync::watch::Receiver<Option<anyhow::Result<()>>>;
442 :
443 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
444 : /// It also holds SharedState and provides mutually exclusive access to it.
445 : pub struct Timeline {
446 : pub ttid: TenantTimelineId,
447 : pub remote_path: RemotePath,
448 :
449 : /// Used to broadcast commit_lsn updates to all background jobs.
450 : commit_lsn_watch_tx: watch::Sender<Lsn>,
451 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
452 :
453 : /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
454 : /// them when sending in recovery mode (to walproposer or peers). Note: this
455 : /// is just a notification, WAL reading should always done with lock held as
456 : /// term can change otherwise.
457 : term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
458 : term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
459 :
460 : /// Broadcasts shared state updates.
461 : shared_state_version_tx: watch::Sender<usize>,
462 : shared_state_version_rx: watch::Receiver<usize>,
463 :
464 : /// Safekeeper and other state, that should remain consistent and
465 : /// synchronized with the disk. This is tokio mutex as we write WAL to disk
466 : /// while holding it, ensuring that consensus checks are in order.
467 : mutex: RwLock<SharedState>,
468 : walsenders: Arc<WalSenders>,
469 : walreceivers: Arc<WalReceivers>,
470 : timeline_dir: Utf8PathBuf,
471 : manager_ctl: ManagerCtl,
472 : conf: Arc<SafeKeeperConf>,
473 :
474 : pub(crate) wal_backup: Arc<WalBackup>,
475 :
476 : remote_deletion: std::sync::Mutex<Option<RemoteDeletionReceiver>>,
477 :
478 : /// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
479 : /// this gate, you must respect [`Timeline::cancel`]
480 : pub(crate) gate: Gate,
481 :
482 : /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
483 : pub(crate) cancel: CancellationToken,
484 :
485 : // timeline_manager controlled state
486 : pub(crate) broker_active: AtomicBool,
487 : pub(crate) wal_backup_active: AtomicBool,
488 : pub(crate) last_removed_segno: AtomicU64,
489 : pub(crate) mgr_status: AtomicStatus,
490 : }
491 :
492 : impl Timeline {
493 : /// Constructs a new timeline.
494 5 : pub fn new(
495 5 : ttid: TenantTimelineId,
496 5 : timeline_dir: &Utf8Path,
497 5 : remote_path: &RemotePath,
498 5 : shared_state: SharedState,
499 5 : conf: Arc<SafeKeeperConf>,
500 5 : wal_backup: Arc<WalBackup>,
501 5 : ) -> Arc<Self> {
502 5 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
503 5 : watch::channel(shared_state.sk.state().commit_lsn);
504 5 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
505 5 : shared_state.sk.last_log_term(),
506 5 : shared_state.sk.flush_lsn(),
507 5 : )));
508 5 : let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
509 :
510 5 : let walreceivers = WalReceivers::new();
511 :
512 5 : Arc::new(Self {
513 5 : ttid,
514 5 : remote_path: remote_path.to_owned(),
515 5 : timeline_dir: timeline_dir.to_owned(),
516 5 : commit_lsn_watch_tx,
517 5 : commit_lsn_watch_rx,
518 5 : term_flush_lsn_watch_tx,
519 5 : term_flush_lsn_watch_rx,
520 5 : shared_state_version_tx,
521 5 : shared_state_version_rx,
522 5 : mutex: RwLock::new(shared_state),
523 5 : walsenders: WalSenders::new(walreceivers.clone()),
524 5 : walreceivers,
525 5 : gate: Default::default(),
526 5 : cancel: CancellationToken::default(),
527 5 : remote_deletion: std::sync::Mutex::new(None),
528 5 : manager_ctl: ManagerCtl::new(),
529 5 : conf,
530 5 : broker_active: AtomicBool::new(false),
531 5 : wal_backup_active: AtomicBool::new(false),
532 5 : last_removed_segno: AtomicU64::new(0),
533 5 : mgr_status: AtomicStatus::new(),
534 5 : wal_backup,
535 5 : })
536 5 : }
537 :
538 : /// Load existing timeline from disk.
539 0 : pub fn load_timeline(
540 0 : conf: Arc<SafeKeeperConf>,
541 0 : ttid: TenantTimelineId,
542 0 : wal_backup: Arc<WalBackup>,
543 0 : ) -> Result<Arc<Timeline>> {
544 0 : let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
545 :
546 0 : let shared_state = SharedState::restore(conf.as_ref(), &ttid)?;
547 0 : let timeline_dir = get_timeline_dir(conf.as_ref(), &ttid);
548 0 : let remote_path = remote_timeline_path(&ttid)?;
549 :
550 0 : Ok(Timeline::new(
551 0 : ttid,
552 0 : &timeline_dir,
553 0 : &remote_path,
554 0 : shared_state,
555 0 : conf,
556 0 : wal_backup,
557 0 : ))
558 0 : }
559 :
560 : /// Bootstrap new or existing timeline starting background tasks.
561 5 : pub fn bootstrap(
562 5 : self: &Arc<Timeline>,
563 5 : _shared_state: &mut WriteGuardSharedState<'_>,
564 5 : conf: &SafeKeeperConf,
565 5 : broker_active_set: Arc<TimelinesSet>,
566 5 : partial_backup_rate_limiter: RateLimiter,
567 5 : wal_backup: Arc<WalBackup>,
568 5 : ) {
569 5 : let (tx, rx) = self.manager_ctl.bootstrap_manager();
570 :
571 5 : let Ok(gate_guard) = self.gate.enter() else {
572 : // Init raced with shutdown
573 0 : return;
574 : };
575 :
576 : // Start manager task which will monitor timeline state and update
577 : // background tasks.
578 5 : tokio::spawn({
579 5 : let this = self.clone();
580 5 : let conf = conf.clone();
581 5 : async move {
582 5 : let _gate_guard = gate_guard;
583 5 : timeline_manager::main_task(
584 5 : ManagerTimeline { tli: this },
585 5 : conf,
586 5 : broker_active_set,
587 5 : tx,
588 5 : rx,
589 5 : partial_backup_rate_limiter,
590 5 : wal_backup,
591 5 : )
592 5 : .await
593 0 : }
594 : });
595 5 : }
596 :
597 : /// Cancel the timeline, requesting background activity to stop. Closing
598 : /// the `self.gate` waits for that.
599 0 : pub fn cancel(&self) {
600 0 : info!("timeline {} shutting down", self.ttid);
601 0 : self.cancel.cancel();
602 0 : }
603 :
604 : /// Background timeline activities (which hold Timeline::gate) will no
605 : /// longer run once this function completes. `Self::cancel` must have been
606 : /// already called.
607 0 : pub async fn close(&self) {
608 0 : assert!(self.cancel.is_cancelled());
609 :
610 : // Wait for any concurrent tasks to stop using this timeline, to avoid e.g. attempts
611 : // to read deleted files.
612 0 : self.gate.close().await;
613 0 : }
614 :
615 : /// Delete timeline from disk completely, by removing timeline directory.
616 : ///
617 : /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
618 : /// deletion API endpoint is retriable.
619 : ///
620 : /// Timeline must be in shut-down state (i.e. call [`Self::close`] first)
621 0 : pub async fn delete(
622 0 : &self,
623 0 : shared_state: &mut WriteGuardSharedState<'_>,
624 0 : only_local: bool,
625 0 : ) -> Result<bool> {
626 : // Assert that [`Self::close`] was already called
627 0 : assert!(self.cancel.is_cancelled());
628 0 : assert!(self.gate.close_complete());
629 :
630 0 : info!("deleting timeline {} from disk", self.ttid);
631 :
632 : // Close associated FDs. Nobody will be able to touch timeline data once
633 : // it is cancelled, so WAL storage won't be opened again.
634 0 : shared_state.sk.close_wal_store();
635 :
636 0 : if !only_local {
637 0 : self.remote_delete().await?;
638 0 : }
639 :
640 0 : let dir_existed = delete_dir(&self.timeline_dir).await?;
641 0 : Ok(dir_existed)
642 0 : }
643 :
644 : /// Delete timeline content from remote storage. If the returned future is dropped,
645 : /// deletion will continue in the background.
646 : ///
647 : /// This function ordinarily spawns a task and stashes a result receiver into [`Self::remote_deletion`]. If
648 : /// deletion is already happening, it may simply wait for an existing task's result.
649 : ///
650 : /// Note: we concurrently delete remote storage data from multiple
651 : /// safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
652 : /// do some retries anyway.
653 0 : async fn remote_delete(&self) -> Result<()> {
654 : // We will start a background task to do the deletion, so that it proceeds even if our
655 : // API request is dropped. Future requests will see the existing deletion task and wait
656 : // for it to complete.
657 0 : let mut result_rx = {
658 0 : let mut remote_deletion_state = self.remote_deletion.lock().unwrap();
659 0 : let result_rx = if let Some(result_rx) = remote_deletion_state.as_ref() {
660 0 : if let Some(result) = result_rx.borrow().as_ref() {
661 0 : if let Err(e) = result {
662 : // A previous remote deletion failed: we will start a new one
663 0 : tracing::error!("remote deletion failed, will retry ({e})");
664 0 : None
665 : } else {
666 : // A previous remote deletion call already succeeded
667 0 : return Ok(());
668 : }
669 : } else {
670 : // Remote deletion is still in flight
671 0 : Some(result_rx.clone())
672 : }
673 : } else {
674 : // Remote deletion was not attempted yet, start it now.
675 0 : None
676 : };
677 :
678 0 : match result_rx {
679 0 : Some(result_rx) => result_rx,
680 0 : None => self.start_remote_delete(&mut remote_deletion_state),
681 : }
682 : };
683 :
684 : // Wait for a result
685 0 : let Ok(result) = result_rx.wait_for(|v| v.is_some()).await else {
686 : // Unexpected: sender should always send a result before dropping the channel, even if it has an error
687 0 : return Err(anyhow::anyhow!(
688 0 : "remote deletion task future was dropped without sending a result"
689 0 : ));
690 : };
691 :
692 0 : result
693 0 : .as_ref()
694 0 : .expect("We did a wait_for on this being Some above")
695 0 : .as_ref()
696 0 : .map(|_| ())
697 0 : .map_err(|e| anyhow::anyhow!("remote deletion failed: {e}"))
698 0 : }
699 :
700 : /// Spawn background task to do remote deletion, return a receiver for its outcome
701 0 : fn start_remote_delete(
702 0 : &self,
703 0 : guard: &mut std::sync::MutexGuard<Option<RemoteDeletionReceiver>>,
704 0 : ) -> RemoteDeletionReceiver {
705 0 : tracing::info!("starting remote deletion");
706 0 : let storage = self.wal_backup.get_storage().clone();
707 0 : let (result_tx, result_rx) = tokio::sync::watch::channel(None);
708 0 : let ttid = self.ttid;
709 0 : tokio::task::spawn(
710 0 : async move {
711 0 : let r = if let Some(storage) = storage {
712 0 : wal_backup::delete_timeline(&storage, &ttid).await
713 : } else {
714 0 : tracing::info!(
715 0 : "skipping remote deletion because no remote storage is configured; this effectively leaks the objects in remote storage"
716 : );
717 0 : Ok(())
718 : };
719 :
720 0 : if let Err(e) = &r {
721 : // Log error here in case nobody ever listens for our result (e.g. dropped API request)
722 0 : tracing::error!("remote deletion failed: {e}");
723 0 : }
724 :
725 : // Ignore send results: it's legal for the Timeline to give up waiting for us.
726 0 : let _ = result_tx.send(Some(r));
727 0 : }
728 0 : .instrument(info_span!("remote_delete", timeline = %self.ttid)),
729 : );
730 :
731 0 : **guard = Some(result_rx.clone());
732 :
733 0 : result_rx
734 0 : }
735 :
736 : /// Returns if timeline is cancelled.
737 2505 : pub fn is_cancelled(&self) -> bool {
738 2505 : self.cancel.is_cancelled()
739 2505 : }
740 :
741 : /// Take a writing mutual exclusive lock on timeline shared_state.
742 1249 : pub async fn write_shared_state(self: &Arc<Self>) -> WriteGuardSharedState<'_> {
743 1249 : WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
744 1249 : }
745 :
746 55 : pub async fn read_shared_state(&self) -> ReadGuardSharedState {
747 55 : self.mutex.read().await
748 55 : }
749 :
750 : /// Returns commit_lsn watch channel.
751 5 : pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
752 5 : self.commit_lsn_watch_rx.clone()
753 5 : }
754 :
755 : /// Returns term_flush_lsn watch channel.
756 0 : pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
757 0 : self.term_flush_lsn_watch_rx.clone()
758 0 : }
759 :
760 : /// Returns watch channel for SharedState update version.
761 5 : pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
762 5 : self.shared_state_version_rx.clone()
763 5 : }
764 :
765 : /// Returns wal_seg_size.
766 5 : pub async fn get_wal_seg_size(&self) -> usize {
767 5 : self.read_shared_state().await.get_wal_seg_size()
768 5 : }
769 :
770 : /// Returns state of the timeline.
771 9 : pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
772 9 : let state = self.read_shared_state().await;
773 9 : (
774 9 : state.sk.state().inmem.clone(),
775 9 : TimelinePersistentState::clone(state.sk.state()),
776 9 : )
777 9 : }
778 :
779 : /// Returns latest backup_lsn.
780 0 : pub async fn get_wal_backup_lsn(&self) -> Lsn {
781 0 : self.read_shared_state().await.sk.state().inmem.backup_lsn
782 0 : }
783 :
784 : /// Sets backup_lsn to the given value.
785 0 : pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
786 0 : if self.is_cancelled() {
787 0 : bail!(TimelineError::Cancelled(self.ttid));
788 0 : }
789 :
790 0 : let mut state = self.write_shared_state().await;
791 0 : state.sk.state_mut().inmem.backup_lsn = max(state.sk.state().inmem.backup_lsn, backup_lsn);
792 : // we should check whether to shut down offloader, but this will be done
793 : // soon by peer communication anyway.
794 0 : Ok(())
795 0 : }
796 :
797 : /// Get safekeeper info for broadcasting to broker and other peers.
798 0 : pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
799 0 : let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
800 0 : let shared_state = self.read_shared_state().await;
801 0 : shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
802 0 : }
803 :
804 : /// Update timeline state with peer safekeeper data.
805 0 : pub async fn record_safekeeper_info(
806 0 : self: &Arc<Self>,
807 0 : sk_info: SafekeeperTimelineInfo,
808 0 : ) -> Result<()> {
809 : {
810 0 : let mut shared_state = self.write_shared_state().await;
811 0 : shared_state.sk.record_safekeeper_info(&sk_info).await?;
812 0 : let peer_info = peer_info_from_sk_info(&sk_info, Instant::now());
813 0 : shared_state.peers_info.upsert(&peer_info);
814 : }
815 0 : Ok(())
816 0 : }
817 :
818 0 : pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
819 0 : let shared_state = self.read_shared_state().await;
820 0 : shared_state.get_peers(conf.heartbeat_timeout)
821 0 : }
822 :
823 5 : pub fn get_walsenders(&self) -> &Arc<WalSenders> {
824 5 : &self.walsenders
825 5 : }
826 :
827 15 : pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
828 15 : &self.walreceivers
829 15 : }
830 :
831 : /// Returns flush_lsn.
832 0 : pub async fn get_flush_lsn(&self) -> Lsn {
833 0 : self.read_shared_state().await.sk.flush_lsn()
834 0 : }
835 :
836 : /// Gather timeline data for metrics.
837 0 : pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
838 0 : if self.is_cancelled() {
839 0 : return None;
840 0 : }
841 :
842 : let WalSendersTimelineMetricValues {
843 0 : ps_feedback_counter,
844 0 : ps_corruption_detected,
845 0 : last_ps_feedback,
846 0 : interpreted_wal_reader_tasks,
847 0 : } = self.walsenders.info_for_metrics();
848 :
849 0 : let state = self.read_shared_state().await;
850 0 : Some(FullTimelineInfo {
851 0 : ttid: self.ttid,
852 0 : ps_feedback_count: ps_feedback_counter,
853 0 : ps_corruption_detected,
854 0 : last_ps_feedback,
855 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
856 0 : timeline_is_active: self.broker_active.load(Ordering::Relaxed),
857 0 : num_computes: self.walreceivers.get_num() as u32,
858 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
859 0 : interpreted_wal_reader_tasks,
860 0 : epoch_start_lsn: state.sk.term_start_lsn(),
861 0 : mem_state: state.sk.state().inmem.clone(),
862 0 : persisted_state: TimelinePersistentState::clone(state.sk.state()),
863 0 : flush_lsn: state.sk.flush_lsn(),
864 0 : wal_storage: state.sk.wal_storage_metrics(),
865 0 : })
866 0 : }
867 :
868 : /// Returns in-memory timeline state to build a full debug dump.
869 0 : pub async fn memory_dump(&self) -> debug_dump::Memory {
870 0 : let state = self.read_shared_state().await;
871 :
872 0 : let (write_lsn, write_record_lsn, flush_lsn, file_open) =
873 0 : state.sk.wal_storage_internal_state();
874 :
875 0 : debug_dump::Memory {
876 0 : is_cancelled: self.is_cancelled(),
877 0 : peers_info_len: state.peers_info.0.len(),
878 0 : walsenders: self.walsenders.get_all_public(),
879 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
880 0 : active: self.broker_active.load(Ordering::Relaxed),
881 0 : num_computes: self.walreceivers.get_num() as u32,
882 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
883 0 : epoch_start_lsn: state.sk.term_start_lsn(),
884 0 : mem_state: state.sk.state().inmem.clone(),
885 0 : mgr_status: self.mgr_status.get(),
886 0 : write_lsn,
887 0 : write_record_lsn,
888 0 : flush_lsn,
889 0 : file_open,
890 0 : }
891 0 : }
892 :
893 : /// Apply a function to the control file state and persist it.
894 0 : pub async fn map_control_file<T>(
895 0 : self: &Arc<Self>,
896 0 : f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
897 0 : ) -> Result<T> {
898 0 : let mut state = self.write_shared_state().await;
899 0 : let mut persistent_state = state.sk.state_mut().start_change();
900 : // If f returns error, we abort the change and don't persist anything.
901 0 : let res = f(&mut persistent_state)?;
902 : // If persisting fails, we abort the change and return error.
903 0 : state
904 0 : .sk
905 0 : .state_mut()
906 0 : .finish_change(&persistent_state)
907 0 : .await?;
908 0 : Ok(res)
909 0 : }
910 :
911 0 : pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
912 0 : let mut state = self.write_shared_state().await;
913 0 : state.sk.term_bump(to).await
914 0 : }
915 :
916 0 : pub async fn membership_switch(
917 0 : self: &Arc<Self>,
918 0 : to: Configuration,
919 0 : ) -> Result<TimelineMembershipSwitchResponse> {
920 0 : let mut state = self.write_shared_state().await;
921 : // Ensure we don't race with exclude/delete requests by checking the cancellation
922 : // token under the write_shared_state lock.
923 : // Exclude/delete cancel the timeline under the shared state lock,
924 : // so the timeline cannot be deleted in the middle of the membership switch.
925 0 : if self.is_cancelled() {
926 0 : bail!(TimelineError::Cancelled(self.ttid));
927 0 : }
928 0 : state.sk.membership_switch(to).await
929 0 : }
930 :
931 : /// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
932 10 : async fn do_wal_residence_guard(
933 10 : self: &Arc<Self>,
934 10 : block: bool,
935 10 : ) -> Result<Option<WalResidentTimeline>> {
936 10 : let op_label = if block {
937 10 : "wal_residence_guard"
938 : } else {
939 0 : "try_wal_residence_guard"
940 : };
941 :
942 10 : if self.is_cancelled() {
943 0 : bail!(TimelineError::Cancelled(self.ttid));
944 10 : }
945 :
946 10 : debug!("requesting WalResidentTimeline guard");
947 10 : let started_at = Instant::now();
948 10 : let status_before = self.mgr_status.get();
949 :
950 : // Wait 30 seconds for the guard to be acquired. It can time out if someone is
951 : // holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
952 : // is stuck.
953 10 : let res = tokio::time::timeout_at(started_at + Duration::from_secs(30), async {
954 10 : if block {
955 10 : self.manager_ctl.wal_residence_guard().await.map(Some)
956 : } else {
957 0 : self.manager_ctl.try_wal_residence_guard().await
958 : }
959 10 : })
960 10 : .await;
961 :
962 10 : let guard = match res {
963 10 : Ok(Ok(guard)) => {
964 10 : let finished_at = Instant::now();
965 10 : let elapsed = finished_at - started_at;
966 10 : MISC_OPERATION_SECONDS
967 10 : .with_label_values(&[op_label])
968 10 : .observe(elapsed.as_secs_f64());
969 :
970 10 : guard
971 : }
972 0 : Ok(Err(e)) => {
973 0 : warn!(
974 0 : "error acquiring in {op_label}, statuses {:?} => {:?}",
975 : status_before,
976 0 : self.mgr_status.get()
977 : );
978 0 : return Err(e);
979 : }
980 : Err(_) => {
981 0 : warn!(
982 0 : "timeout acquiring in {op_label} guard, statuses {:?} => {:?}",
983 : status_before,
984 0 : self.mgr_status.get()
985 : );
986 0 : anyhow::bail!("timeout while acquiring WalResidentTimeline guard");
987 : }
988 : };
989 :
990 10 : Ok(guard.map(|g| WalResidentTimeline::new(self.clone(), g)))
991 10 : }
992 :
993 : /// Get the timeline guard for reading/writing WAL files.
994 : /// If WAL files are not present on disk (evicted), they will be automatically
995 : /// downloaded from remote storage. This is done in the manager task, which is
996 : /// responsible for issuing all guards.
997 : ///
998 : /// NB: don't use this function from timeline_manager, it will deadlock.
999 : /// NB: don't use this function while holding shared_state lock.
1000 10 : pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
1001 10 : self.do_wal_residence_guard(true)
1002 10 : .await
1003 10 : .map(|m| m.expect("Always get Some in block=true mode"))
1004 10 : }
1005 :
1006 : /// Get the timeline guard for reading/writing WAL files if the timeline is resident,
1007 : /// else return None
1008 0 : pub(crate) async fn try_wal_residence_guard(
1009 0 : self: &Arc<Self>,
1010 0 : ) -> Result<Option<WalResidentTimeline>> {
1011 0 : self.do_wal_residence_guard(false).await
1012 0 : }
1013 :
1014 0 : pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
1015 0 : self.manager_ctl.backup_partial_reset().await
1016 0 : }
1017 : }
1018 :
1019 : /// This is a guard that allows to read/write disk timeline state.
1020 : /// All tasks that are trying to read/write WAL from disk should use this guard.
1021 : pub struct WalResidentTimeline {
1022 : pub tli: Arc<Timeline>,
1023 : _guard: ResidenceGuard,
1024 : }
1025 :
1026 : impl WalResidentTimeline {
1027 15 : pub fn new(tli: Arc<Timeline>, _guard: ResidenceGuard) -> Self {
1028 15 : WalResidentTimeline { tli, _guard }
1029 15 : }
1030 : }
1031 :
1032 : impl Deref for WalResidentTimeline {
1033 : type Target = Arc<Timeline>;
1034 :
1035 6911 : fn deref(&self) -> &Self::Target {
1036 6911 : &self.tli
1037 6911 : }
1038 : }
1039 :
1040 : impl WalResidentTimeline {
1041 : /// Returns true if walsender should stop sending WAL to pageserver. We
1042 : /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
1043 : /// computes. While there might be nothing to stream already, we learn about
1044 : /// remote_consistent_lsn update through replication feedback, and we want
1045 : /// to stop pushing to the broker if pageserver is fully caughtup.
1046 0 : pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
1047 0 : if self.is_cancelled() {
1048 0 : return true;
1049 0 : }
1050 0 : let shared_state = self.read_shared_state().await;
1051 0 : if self.walreceivers.get_num() == 0 {
1052 0 : return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet
1053 0 : reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn;
1054 0 : }
1055 0 : false
1056 0 : }
1057 :
1058 : /// Ensure that current term is t, erroring otherwise, and lock the state.
1059 0 : pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
1060 0 : let ss = self.read_shared_state().await;
1061 0 : if ss.sk.state().acceptor_state.term != t {
1062 0 : bail!(
1063 0 : "failed to acquire term {}, current term {}",
1064 : t,
1065 0 : ss.sk.state().acceptor_state.term
1066 : );
1067 0 : }
1068 0 : Ok(ss)
1069 0 : }
1070 :
1071 : // BEGIN HADRON
1072 : // Check if disk usage by WAL segment files for this timeline exceeds the configured limit.
1073 1240 : fn hadron_check_disk_usage(
1074 1240 : &self,
1075 1240 : shared_state_locked: &mut WriteGuardSharedState<'_>,
1076 1240 : ) -> Result<()> {
1077 : // The disk usage is calculated based on the number of segments between `last_removed_segno`
1078 : // and the current flush LSN segment number. `last_removed_segno` is advanced after
1079 : // unneeded WAL files are physically removed from disk (see `update_wal_removal_end()`
1080 : // in `timeline_manager.rs`).
1081 1240 : let max_timeline_disk_usage_bytes = self.conf.max_timeline_disk_usage_bytes;
1082 1240 : if max_timeline_disk_usage_bytes > 0 {
1083 0 : let last_removed_segno = self.last_removed_segno.load(Ordering::Relaxed);
1084 0 : let flush_lsn = shared_state_locked.sk.flush_lsn();
1085 0 : let wal_seg_size = shared_state_locked.sk.state().server.wal_seg_size as u64;
1086 0 : let current_segno = flush_lsn.segment_number(wal_seg_size as usize);
1087 :
1088 0 : let segno_count = current_segno - last_removed_segno;
1089 0 : let disk_usage_bytes = segno_count * wal_seg_size;
1090 :
1091 0 : if disk_usage_bytes > max_timeline_disk_usage_bytes {
1092 0 : WAL_STORAGE_LIMIT_ERRORS.inc();
1093 0 : bail!(
1094 0 : "WAL storage utilization exceeds configured limit of {} bytes: current disk usage: {} bytes",
1095 : max_timeline_disk_usage_bytes,
1096 : disk_usage_bytes
1097 : );
1098 0 : }
1099 1240 : }
1100 :
1101 1240 : if GLOBAL_DISK_LIMIT_EXCEEDED.load(Ordering::Relaxed) {
1102 0 : bail!("Global disk usage exceeded limit");
1103 1240 : }
1104 :
1105 1240 : Ok(())
1106 1240 : }
1107 : // END HADRON
1108 :
1109 : /// Pass arrived message to the safekeeper.
1110 1240 : pub async fn process_msg(
1111 1240 : &self,
1112 1240 : msg: &ProposerAcceptorMessage,
1113 1240 : ) -> Result<Option<AcceptorProposerMessage>> {
1114 1240 : if self.is_cancelled() {
1115 0 : bail!(TimelineError::Cancelled(self.ttid));
1116 1240 : }
1117 :
1118 : let mut rmsg: Option<AcceptorProposerMessage>;
1119 : {
1120 1240 : let mut shared_state = self.write_shared_state().await;
1121 : // BEGIN HADRON
1122 : // Errors from the `hadron_check_disk_usage()` function fail the process_msg() function, which
1123 : // gets propagated upward and terminates the entire WalAcceptor. This will cause postgres to
1124 : // disconnect from the safekeeper and reestablish another connection. Postgres will keep retrying
1125 : // safekeeper connections every second until it can successfully propose WAL to the SK again.
1126 1240 : self.hadron_check_disk_usage(&mut shared_state)?;
1127 : // END HADRON
1128 1240 : rmsg = shared_state.sk.safekeeper().process_msg(msg).await?;
1129 :
1130 : // if this is AppendResponse, fill in proper hot standby feedback.
1131 620 : if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
1132 620 : resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
1133 620 : }
1134 : }
1135 1240 : Ok(rmsg)
1136 1240 : }
1137 :
1138 9 : pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
1139 9 : let (_, persisted_state) = self.get_state().await;
1140 :
1141 9 : WalReader::new(
1142 9 : &self.ttid,
1143 9 : self.timeline_dir.clone(),
1144 9 : &persisted_state,
1145 9 : start_lsn,
1146 9 : self.wal_backup.clone(),
1147 : )
1148 9 : }
1149 :
1150 0 : pub fn get_timeline_dir(&self) -> Utf8PathBuf {
1151 0 : self.timeline_dir.clone()
1152 0 : }
1153 :
1154 : /// Update in memory remote consistent lsn.
1155 0 : pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
1156 0 : let mut shared_state = self.write_shared_state().await;
1157 0 : shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
1158 0 : shared_state.sk.state().inmem.remote_consistent_lsn,
1159 0 : candidate,
1160 0 : );
1161 0 : }
1162 : }
1163 :
1164 : /// This struct contains methods that are used by timeline manager task.
1165 : pub(crate) struct ManagerTimeline {
1166 : pub(crate) tli: Arc<Timeline>,
1167 : }
1168 :
1169 : impl Deref for ManagerTimeline {
1170 : type Target = Arc<Timeline>;
1171 :
1172 391 : fn deref(&self) -> &Self::Target {
1173 391 : &self.tli
1174 391 : }
1175 : }
1176 :
1177 : impl ManagerTimeline {
1178 0 : pub(crate) fn timeline_dir(&self) -> &Utf8PathBuf {
1179 0 : &self.tli.timeline_dir
1180 0 : }
1181 :
1182 : /// Manager requests this state on startup.
1183 5 : pub(crate) async fn bootstrap_mgr(&self) -> (bool, Option<PartialRemoteSegment>) {
1184 5 : let shared_state = self.read_shared_state().await;
1185 5 : let is_offloaded = matches!(
1186 5 : shared_state.sk.state().eviction_state,
1187 : EvictionState::Offloaded(_)
1188 : );
1189 5 : let partial_backup_uploaded = shared_state.sk.state().partial_backup.uploaded_segment();
1190 :
1191 5 : (is_offloaded, partial_backup_uploaded)
1192 5 : }
1193 :
1194 : /// Try to switch state Present->Offloaded.
1195 0 : pub(crate) async fn switch_to_offloaded(
1196 0 : &self,
1197 0 : partial: &PartialRemoteSegment,
1198 0 : ) -> anyhow::Result<()> {
1199 0 : let mut shared = self.write_shared_state().await;
1200 :
1201 : // updating control file
1202 0 : let mut pstate = shared.sk.state_mut().start_change();
1203 :
1204 0 : if !matches!(pstate.eviction_state, EvictionState::Present) {
1205 0 : bail!(
1206 0 : "cannot switch to offloaded state, current state is {:?}",
1207 : pstate.eviction_state
1208 : );
1209 0 : }
1210 :
1211 0 : if partial.flush_lsn != shared.sk.flush_lsn() {
1212 0 : bail!(
1213 0 : "flush_lsn mismatch in partial backup, expected {}, got {}",
1214 0 : shared.sk.flush_lsn(),
1215 : partial.flush_lsn
1216 : );
1217 0 : }
1218 :
1219 0 : if partial.commit_lsn != pstate.commit_lsn {
1220 0 : bail!(
1221 0 : "commit_lsn mismatch in partial backup, expected {}, got {}",
1222 : pstate.commit_lsn,
1223 : partial.commit_lsn
1224 : );
1225 0 : }
1226 :
1227 0 : if partial.term != shared.sk.last_log_term() {
1228 0 : bail!(
1229 0 : "term mismatch in partial backup, expected {}, got {}",
1230 0 : shared.sk.last_log_term(),
1231 : partial.term
1232 : );
1233 0 : }
1234 :
1235 0 : pstate.eviction_state = EvictionState::Offloaded(shared.sk.flush_lsn());
1236 0 : shared.sk.state_mut().finish_change(&pstate).await?;
1237 : // control file is now switched to Offloaded state
1238 :
1239 : // now we can switch shared.sk to Offloaded, shouldn't fail
1240 0 : let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
1241 0 : let cfile_state = prev_sk.take_state();
1242 0 : shared.sk = StateSK::Offloaded(Box::new(cfile_state));
1243 :
1244 0 : Ok(())
1245 0 : }
1246 :
1247 : /// Try to switch state Offloaded->Present.
1248 0 : pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
1249 0 : let mut shared = self.write_shared_state().await;
1250 :
1251 : // trying to restore WAL storage
1252 0 : let wal_store = wal_storage::PhysicalStorage::new(
1253 0 : &self.ttid,
1254 0 : &self.timeline_dir,
1255 0 : shared.sk.state(),
1256 0 : self.conf.no_sync,
1257 0 : )?;
1258 :
1259 : // updating control file
1260 0 : let mut pstate = shared.sk.state_mut().start_change();
1261 :
1262 0 : if !matches!(pstate.eviction_state, EvictionState::Offloaded(_)) {
1263 0 : bail!(
1264 0 : "cannot switch to present state, current state is {:?}",
1265 : pstate.eviction_state
1266 : );
1267 0 : }
1268 :
1269 0 : if wal_store.flush_lsn() != shared.sk.flush_lsn() {
1270 0 : bail!(
1271 0 : "flush_lsn mismatch in restored WAL, expected {}, got {}",
1272 0 : shared.sk.flush_lsn(),
1273 0 : wal_store.flush_lsn()
1274 : );
1275 0 : }
1276 :
1277 0 : pstate.eviction_state = EvictionState::Present;
1278 0 : shared.sk.state_mut().finish_change(&pstate).await?;
1279 :
1280 : // now we can switch shared.sk to Present, shouldn't fail
1281 0 : let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
1282 0 : let cfile_state = prev_sk.take_state();
1283 0 : shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, self.conf.my_id)?);
1284 :
1285 0 : Ok(())
1286 0 : }
1287 :
1288 : /// Update current manager state, useful for debugging manager deadlocks.
1289 208 : pub(crate) fn set_status(&self, status: timeline_manager::Status) {
1290 208 : self.mgr_status.store(status, Ordering::Relaxed);
1291 208 : }
1292 : }
1293 :
1294 : /// Deletes directory and it's contents. Returns false if directory does not exist.
1295 0 : pub async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
1296 0 : match fs::remove_dir_all(path).await {
1297 0 : Ok(_) => Ok(true),
1298 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
1299 0 : Err(e) => Err(e.into()),
1300 : }
1301 0 : }
1302 :
1303 : /// Get a path to the tenant directory. If you just need to get a timeline directory,
1304 : /// use WalResidentTimeline::get_timeline_dir instead.
1305 10 : pub fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
1306 10 : conf.workdir.join(tenant_id.to_string())
1307 10 : }
1308 :
1309 : /// Get a path to the timeline directory. If you need to read WAL files from disk,
1310 : /// use WalResidentTimeline::get_timeline_dir instead. This function does not check
1311 : /// timeline eviction status and WAL files might not be present on disk.
1312 10 : pub fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
1313 10 : get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
1314 10 : }
|