Line data Source code
1 : //! This module implements Timeline lifecycle management and has all necessary code
2 : //! to glue together SafeKeeper and all other background services.
3 :
4 : use anyhow::{anyhow, bail, Result};
5 : use camino::Utf8PathBuf;
6 : use postgres_ffi::XLogSegNo;
7 : use serde::{Deserialize, Serialize};
8 : use tokio::fs;
9 :
10 : use std::cmp::max;
11 : use std::sync::Arc;
12 : use std::time::Duration;
13 : use tokio::sync::{Mutex, MutexGuard};
14 : use tokio::{
15 : sync::{mpsc::Sender, watch},
16 : time::Instant,
17 : };
18 : use tracing::*;
19 : use utils::http::error::ApiError;
20 : use utils::{
21 : id::{NodeId, TenantTimelineId},
22 : lsn::Lsn,
23 : };
24 :
25 : use storage_broker::proto::SafekeeperTimelineInfo;
26 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
27 :
28 : use crate::receive_wal::WalReceivers;
29 : use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo};
30 : use crate::safekeeper::{
31 : AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
32 : INVALID_TERM,
33 : };
34 : use crate::send_wal::WalSenders;
35 : use crate::state::{TimelineMemState, TimelinePersistentState};
36 : use crate::wal_backup::{self};
37 : use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
38 :
39 : use crate::metrics::FullTimelineInfo;
40 : use crate::wal_storage::Storage as wal_storage_iface;
41 : use crate::{debug_dump, wal_backup_partial, wal_storage};
42 : use crate::{GlobalTimelines, SafeKeeperConf};
43 :
44 : /// Things safekeeper should know about timeline state on peers.
45 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
46 : pub struct PeerInfo {
47 : pub sk_id: NodeId,
48 : pub term: Term,
49 : /// Term of the last entry.
50 : pub last_log_term: Term,
51 : /// LSN of the last record.
52 : pub flush_lsn: Lsn,
53 : pub commit_lsn: Lsn,
54 : /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
55 : /// sk since backup_lsn.
56 : pub local_start_lsn: Lsn,
57 : /// When info was received. Serde annotations are not very useful but make
58 : /// the code compile -- we don't rely on this field externally.
59 : #[serde(skip)]
60 : #[serde(default = "Instant::now")]
61 : ts: Instant,
62 : pub pg_connstr: String,
63 : pub http_connstr: String,
64 : }
65 :
66 : impl PeerInfo {
67 0 : fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
68 0 : PeerInfo {
69 0 : sk_id: NodeId(sk_info.safekeeper_id),
70 0 : term: sk_info.term,
71 0 : last_log_term: sk_info.last_log_term,
72 0 : flush_lsn: Lsn(sk_info.flush_lsn),
73 0 : commit_lsn: Lsn(sk_info.commit_lsn),
74 0 : local_start_lsn: Lsn(sk_info.local_start_lsn),
75 0 : pg_connstr: sk_info.safekeeper_connstr.clone(),
76 0 : http_connstr: sk_info.http_connstr.clone(),
77 0 : ts,
78 0 : }
79 0 : }
80 : }
81 :
82 : // vector-based node id -> peer state map with very limited functionality we
83 : // need.
84 : #[derive(Debug, Clone, Default)]
85 : pub struct PeersInfo(pub Vec<PeerInfo>);
86 :
87 : impl PeersInfo {
88 0 : fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
89 0 : self.0.iter_mut().find(|p| p.sk_id == id)
90 0 : }
91 :
92 0 : fn upsert(&mut self, p: &PeerInfo) {
93 0 : match self.get(p.sk_id) {
94 0 : Some(rp) => *rp = p.clone(),
95 0 : None => self.0.push(p.clone()),
96 : }
97 0 : }
98 : }
99 :
100 : /// Shared state associated with database instance
101 : pub struct SharedState {
102 : /// Safekeeper object
103 : sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
104 : /// In memory list containing state of peers sent in latest messages from them.
105 : peers_info: PeersInfo,
106 : /// True when WAL backup launcher oversees the timeline, making sure WAL is
107 : /// offloaded, allows to bother launcher less.
108 : wal_backup_active: bool,
109 : /// True whenever there is at least some pending activity on timeline: live
110 : /// compute connection, pageserver is not caughtup (it must have latest WAL
111 : /// for new compute start) or WAL backuping is not finished. Practically it
112 : /// means safekeepers broadcast info to peers about the timeline, old WAL is
113 : /// trimmed.
114 : ///
115 : /// TODO: it might be better to remove tli completely from GlobalTimelines
116 : /// when tli is inactive instead of having this flag.
117 : active: bool,
118 : last_removed_segno: XLogSegNo,
119 : }
120 :
121 : impl SharedState {
122 : /// Initialize fresh timeline state without persisting anything to disk.
123 0 : fn create_new(
124 0 : conf: &SafeKeeperConf,
125 0 : ttid: &TenantTimelineId,
126 0 : state: TimelinePersistentState,
127 0 : ) -> Result<Self> {
128 0 : if state.server.wal_seg_size == 0 {
129 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
130 0 : }
131 0 :
132 0 : if state.server.pg_version == UNKNOWN_SERVER_VERSION {
133 0 : bail!(TimelineError::UninitialinzedPgVersion(*ttid));
134 0 : }
135 0 :
136 0 : if state.commit_lsn < state.local_start_lsn {
137 0 : bail!(
138 0 : "commit_lsn {} is higher than local_start_lsn {}",
139 0 : state.commit_lsn,
140 0 : state.local_start_lsn
141 0 : );
142 0 : }
143 0 :
144 0 : // We don't want to write anything to disk, because we may have existing timeline there.
145 0 : // These functions should not change anything on disk.
146 0 : let timeline_dir = conf.timeline_dir(ttid);
147 0 : let control_store = control_file::FileStorage::create_new(timeline_dir, conf, state)?;
148 0 : let wal_store =
149 0 : wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
150 0 : let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
151 :
152 0 : Ok(Self {
153 0 : sk,
154 0 : peers_info: PeersInfo(vec![]),
155 0 : wal_backup_active: false,
156 0 : active: false,
157 0 : last_removed_segno: 0,
158 0 : })
159 0 : }
160 :
161 : /// Restore SharedState from control file. If file doesn't exist, bails out.
162 0 : fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
163 0 : let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
164 0 : if control_store.server.wal_seg_size == 0 {
165 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
166 0 : }
167 :
168 0 : let wal_store =
169 0 : wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
170 :
171 : Ok(Self {
172 0 : sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
173 0 : peers_info: PeersInfo(vec![]),
174 : wal_backup_active: false,
175 : active: false,
176 : last_removed_segno: 0,
177 : })
178 0 : }
179 :
180 0 : fn is_active(&self, num_computes: usize) -> bool {
181 0 : self.is_wal_backup_required(num_computes)
182 : // FIXME: add tracking of relevant pageservers and check them here individually,
183 : // otherwise migration won't work (we suspend too early).
184 0 : || self.sk.state.inmem.remote_consistent_lsn < self.sk.state.inmem.commit_lsn
185 0 : }
186 :
187 : /// Mark timeline active/inactive and return whether s3 offloading requires
188 : /// start/stop action. If timeline is deactivated, control file is persisted
189 : /// as maintenance task does that only for active timelines.
190 0 : async fn update_status(&mut self, num_computes: usize, ttid: TenantTimelineId) -> bool {
191 0 : let is_active = self.is_active(num_computes);
192 0 : if self.active != is_active {
193 0 : info!(
194 0 : "timeline {} active={} now, remote_consistent_lsn={}, commit_lsn={}",
195 : ttid,
196 : is_active,
197 : self.sk.state.inmem.remote_consistent_lsn,
198 : self.sk.state.inmem.commit_lsn
199 : );
200 0 : if !is_active {
201 0 : if let Err(e) = self.sk.state.flush().await {
202 0 : warn!("control file save in update_status failed: {:?}", e);
203 0 : }
204 0 : }
205 0 : }
206 0 : self.active = is_active;
207 0 : self.is_wal_backup_action_pending(num_computes)
208 0 : }
209 :
210 : /// Should we run s3 offloading in current state?
211 0 : fn is_wal_backup_required(&self, num_computes: usize) -> bool {
212 0 : let seg_size = self.get_wal_seg_size();
213 0 : num_computes > 0 ||
214 : // Currently only the whole segment is offloaded, so compare segment numbers.
215 0 : (self.sk.state.inmem.commit_lsn.segment_number(seg_size) >
216 0 : self.sk.state.inmem.backup_lsn.segment_number(seg_size))
217 0 : }
218 :
219 : /// Is current state of s3 offloading is not what it ought to be?
220 0 : fn is_wal_backup_action_pending(&self, num_computes: usize) -> bool {
221 0 : let res = self.wal_backup_active != self.is_wal_backup_required(num_computes);
222 0 : if res {
223 0 : let action_pending = if self.is_wal_backup_required(num_computes) {
224 0 : "start"
225 : } else {
226 0 : "stop"
227 : };
228 0 : trace!(
229 0 : "timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
230 0 : self.sk.state.timeline_id, action_pending, num_computes, self.sk.state.inmem.commit_lsn, self.sk.state.inmem.backup_lsn
231 : );
232 0 : }
233 0 : res
234 0 : }
235 :
236 : /// Returns whether s3 offloading is required and sets current status as
237 : /// matching.
238 0 : fn wal_backup_attend(&mut self, num_computes: usize) -> bool {
239 0 : self.wal_backup_active = self.is_wal_backup_required(num_computes);
240 0 : self.wal_backup_active
241 0 : }
242 :
243 0 : fn get_wal_seg_size(&self) -> usize {
244 0 : self.sk.state.server.wal_seg_size as usize
245 0 : }
246 :
247 0 : fn get_safekeeper_info(
248 0 : &self,
249 0 : ttid: &TenantTimelineId,
250 0 : conf: &SafeKeeperConf,
251 0 : standby_apply_lsn: Lsn,
252 0 : ) -> SafekeeperTimelineInfo {
253 0 : SafekeeperTimelineInfo {
254 0 : safekeeper_id: conf.my_id.0,
255 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
256 0 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
257 0 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
258 0 : }),
259 0 : term: self.sk.state.acceptor_state.term,
260 0 : last_log_term: self.sk.get_epoch(),
261 0 : flush_lsn: self.sk.flush_lsn().0,
262 0 : // note: this value is not flushed to control file yet and can be lost
263 0 : commit_lsn: self.sk.state.inmem.commit_lsn.0,
264 0 : remote_consistent_lsn: self.sk.state.inmem.remote_consistent_lsn.0,
265 0 : peer_horizon_lsn: self.sk.state.inmem.peer_horizon_lsn.0,
266 0 : safekeeper_connstr: conf
267 0 : .advertise_pg_addr
268 0 : .to_owned()
269 0 : .unwrap_or(conf.listen_pg_addr.clone()),
270 0 : http_connstr: conf.listen_http_addr.to_owned(),
271 0 : backup_lsn: self.sk.state.inmem.backup_lsn.0,
272 0 : local_start_lsn: self.sk.state.local_start_lsn.0,
273 0 : availability_zone: conf.availability_zone.clone(),
274 0 : standby_horizon: standby_apply_lsn.0,
275 0 : }
276 0 : }
277 :
278 : /// Get our latest view of alive peers status on the timeline.
279 : /// We pass our own info through the broker as well, so when we don't have connection
280 : /// to the broker returned vec is empty.
281 0 : fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
282 0 : let now = Instant::now();
283 0 : self.peers_info
284 0 : .0
285 0 : .iter()
286 0 : // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
287 0 : .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
288 0 : .cloned()
289 0 : .collect()
290 0 : }
291 :
292 : /// Get oldest segno we still need to keep. We hold WAL till it is consumed
293 : /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
294 : /// offloading.
295 : /// While it is safe to use inmem values for determining horizon,
296 : /// we use persistent to make possible normal states less surprising.
297 0 : fn get_horizon_segno(
298 0 : &self,
299 0 : wal_backup_enabled: bool,
300 0 : extra_horizon_lsn: Option<Lsn>,
301 0 : ) -> XLogSegNo {
302 0 : let state = &self.sk.state;
303 0 :
304 0 : use std::cmp::min;
305 0 : let mut horizon_lsn = min(state.remote_consistent_lsn, state.peer_horizon_lsn);
306 0 : if wal_backup_enabled {
307 0 : horizon_lsn = min(horizon_lsn, state.backup_lsn);
308 0 : }
309 0 : if let Some(extra_horizon_lsn) = extra_horizon_lsn {
310 0 : horizon_lsn = min(horizon_lsn, extra_horizon_lsn);
311 0 : }
312 0 : horizon_lsn.segment_number(state.server.wal_seg_size as usize)
313 0 : }
314 : }
315 :
316 0 : #[derive(Debug, thiserror::Error)]
317 : pub enum TimelineError {
318 : #[error("Timeline {0} was cancelled and cannot be used anymore")]
319 : Cancelled(TenantTimelineId),
320 : #[error("Timeline {0} was not found in global map")]
321 : NotFound(TenantTimelineId),
322 : #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
323 : Invalid(TenantTimelineId),
324 : #[error("Timeline {0} is already exists")]
325 : AlreadyExists(TenantTimelineId),
326 : #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
327 : UninitializedWalSegSize(TenantTimelineId),
328 : #[error("Timeline {0} is not initialized, pg_version is unknown")]
329 : UninitialinzedPgVersion(TenantTimelineId),
330 : }
331 :
332 : // Convert to HTTP API error.
333 : impl From<TimelineError> for ApiError {
334 0 : fn from(te: TimelineError) -> ApiError {
335 0 : match te {
336 0 : TimelineError::NotFound(ttid) => {
337 0 : ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
338 : }
339 0 : _ => ApiError::InternalServerError(anyhow!("{}", te)),
340 : }
341 0 : }
342 : }
343 :
344 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
345 : /// It also holds SharedState and provides mutually exclusive access to it.
346 : pub struct Timeline {
347 : pub ttid: TenantTimelineId,
348 :
349 : /// Sending here asks for wal backup launcher attention (start/stop
350 : /// offloading). Sending ttid instead of concrete command allows to do
351 : /// sending without timeline lock.
352 : pub wal_backup_launcher_tx: Sender<TenantTimelineId>,
353 :
354 : /// Used to broadcast commit_lsn updates to all background jobs.
355 : commit_lsn_watch_tx: watch::Sender<Lsn>,
356 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
357 :
358 : /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
359 : /// them when sending in recovery mode (to walproposer or peers). Note: this
360 : /// is just a notification, WAL reading should always done with lock held as
361 : /// term can change otherwise.
362 : term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
363 : term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
364 :
365 : /// Safekeeper and other state, that should remain consistent and
366 : /// synchronized with the disk. This is tokio mutex as we write WAL to disk
367 : /// while holding it, ensuring that consensus checks are in order.
368 : mutex: Mutex<SharedState>,
369 : walsenders: Arc<WalSenders>,
370 : walreceivers: Arc<WalReceivers>,
371 :
372 : /// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
373 : cancellation_tx: watch::Sender<bool>,
374 :
375 : /// Timeline should not be used after cancellation. Background tasks should
376 : /// monitor this channel and stop eventually after receiving `true` from this channel.
377 : cancellation_rx: watch::Receiver<bool>,
378 :
379 : /// Directory where timeline state is stored.
380 : pub timeline_dir: Utf8PathBuf,
381 :
382 : /// Should we keep WAL on disk for active replication connections.
383 : /// Especially useful for sharding, when different shards process WAL
384 : /// with different speed.
385 : // TODO: add `Arc<SafeKeeperConf>` here instead of adding each field separately.
386 : walsenders_keep_horizon: bool,
387 : }
388 :
389 : impl Timeline {
390 : /// Load existing timeline from disk.
391 0 : pub fn load_timeline(
392 0 : conf: &SafeKeeperConf,
393 0 : ttid: TenantTimelineId,
394 0 : wal_backup_launcher_tx: Sender<TenantTimelineId>,
395 0 : ) -> Result<Timeline> {
396 0 : let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
397 :
398 0 : let shared_state = SharedState::restore(conf, &ttid)?;
399 0 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
400 0 : watch::channel(shared_state.sk.state.commit_lsn);
401 0 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
402 0 : shared_state.sk.get_term(),
403 0 : shared_state.sk.flush_lsn(),
404 0 : )));
405 0 : let (cancellation_tx, cancellation_rx) = watch::channel(false);
406 0 :
407 0 : let walreceivers = WalReceivers::new();
408 0 : Ok(Timeline {
409 0 : ttid,
410 0 : wal_backup_launcher_tx,
411 0 : commit_lsn_watch_tx,
412 0 : commit_lsn_watch_rx,
413 0 : term_flush_lsn_watch_tx,
414 0 : term_flush_lsn_watch_rx,
415 0 : mutex: Mutex::new(shared_state),
416 0 : walsenders: WalSenders::new(walreceivers.clone()),
417 0 : walreceivers,
418 0 : cancellation_rx,
419 0 : cancellation_tx,
420 0 : timeline_dir: conf.timeline_dir(&ttid),
421 0 : walsenders_keep_horizon: conf.walsenders_keep_horizon,
422 0 : })
423 0 : }
424 :
425 : /// Create a new timeline, which is not yet persisted to disk.
426 0 : pub fn create_empty(
427 0 : conf: &SafeKeeperConf,
428 0 : ttid: TenantTimelineId,
429 0 : wal_backup_launcher_tx: Sender<TenantTimelineId>,
430 0 : server_info: ServerInfo,
431 0 : commit_lsn: Lsn,
432 0 : local_start_lsn: Lsn,
433 0 : ) -> Result<Timeline> {
434 0 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
435 0 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
436 0 : watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
437 0 : let (cancellation_tx, cancellation_rx) = watch::channel(false);
438 0 : let state =
439 0 : TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
440 0 :
441 0 : let walreceivers = WalReceivers::new();
442 0 : Ok(Timeline {
443 0 : ttid,
444 0 : wal_backup_launcher_tx,
445 0 : commit_lsn_watch_tx,
446 0 : commit_lsn_watch_rx,
447 0 : term_flush_lsn_watch_tx,
448 0 : term_flush_lsn_watch_rx,
449 0 : mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
450 0 : walsenders: WalSenders::new(walreceivers.clone()),
451 0 : walreceivers,
452 0 : cancellation_rx,
453 0 : cancellation_tx,
454 0 : timeline_dir: conf.timeline_dir(&ttid),
455 0 : walsenders_keep_horizon: conf.walsenders_keep_horizon,
456 : })
457 0 : }
458 :
459 : /// Initialize fresh timeline on disk and start background tasks. If init
460 : /// fails, timeline is cancelled and cannot be used anymore.
461 : ///
462 : /// Init is transactional, so if it fails, created files will be deleted,
463 : /// and state on disk should remain unchanged.
464 0 : pub async fn init_new(
465 0 : self: &Arc<Timeline>,
466 0 : shared_state: &mut MutexGuard<'_, SharedState>,
467 0 : conf: &SafeKeeperConf,
468 0 : ) -> Result<()> {
469 0 : match fs::metadata(&self.timeline_dir).await {
470 : Ok(_) => {
471 : // Timeline directory exists on disk, we should leave state unchanged
472 : // and return error.
473 0 : bail!(TimelineError::Invalid(self.ttid));
474 : }
475 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
476 0 : Err(e) => {
477 0 : return Err(e.into());
478 : }
479 : }
480 :
481 : // Create timeline directory.
482 0 : fs::create_dir_all(&self.timeline_dir).await?;
483 :
484 : // Write timeline to disk and start background tasks.
485 0 : if let Err(e) = shared_state.sk.state.flush().await {
486 : // Bootstrap failed, cancel timeline and remove timeline directory.
487 0 : self.cancel(shared_state);
488 :
489 0 : if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
490 0 : warn!(
491 0 : "failed to remove timeline {} directory after bootstrap failure: {}",
492 0 : self.ttid, fs_err
493 : );
494 0 : }
495 :
496 0 : return Err(e);
497 0 : }
498 0 : self.bootstrap(conf);
499 0 : Ok(())
500 0 : }
501 :
502 : /// Bootstrap new or existing timeline starting background stasks.
503 0 : pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
504 0 : // Start recovery task which always runs on the timeline.
505 0 : if conf.peer_recovery_enabled {
506 0 : tokio::spawn(recovery_main(self.clone(), conf.clone()));
507 0 : }
508 0 : if conf.is_wal_backup_enabled() && conf.partial_backup_enabled {
509 0 : tokio::spawn(wal_backup_partial::main_task(self.clone(), conf.clone()));
510 0 : }
511 0 : }
512 :
513 : /// Delete timeline from disk completely, by removing timeline directory.
514 : /// Background timeline activities will stop eventually.
515 : ///
516 : /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
517 : /// deletion API endpoint is retriable.
518 0 : pub async fn delete(
519 0 : &self,
520 0 : shared_state: &mut MutexGuard<'_, SharedState>,
521 0 : only_local: bool,
522 0 : ) -> Result<(bool, bool)> {
523 0 : let was_active = shared_state.active;
524 0 : self.cancel(shared_state);
525 0 :
526 0 : // TODO: It's better to wait for s3 offloader termination before
527 0 : // removing data from s3. Though since s3 doesn't have transactions it
528 0 : // still wouldn't guarantee absense of data after removal.
529 0 : let conf = GlobalTimelines::get_global_config();
530 0 : if !only_local && conf.is_wal_backup_enabled() {
531 : // Note: we concurrently delete remote storage data from multiple
532 : // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
533 : // do some retries anyway.
534 0 : wal_backup::delete_timeline(&self.ttid).await?;
535 0 : }
536 0 : let dir_existed = delete_dir(&self.timeline_dir).await?;
537 0 : Ok((dir_existed, was_active))
538 0 : }
539 :
540 : /// Cancel timeline to prevent further usage. Background tasks will stop
541 : /// eventually after receiving cancellation signal.
542 : ///
543 : /// Note that we can't notify backup launcher here while holding
544 : /// shared_state lock, as this is a potential deadlock: caller is
545 : /// responsible for that. Generally we should probably make WAL backup tasks
546 : /// to shut down on their own, checking once in a while whether it is the
547 : /// time.
548 0 : fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
549 0 : info!("timeline {} is cancelled", self.ttid);
550 0 : let _ = self.cancellation_tx.send(true);
551 0 : // Close associated FDs. Nobody will be able to touch timeline data once
552 0 : // it is cancelled, so WAL storage won't be opened again.
553 0 : shared_state.sk.wal_store.close();
554 0 : }
555 :
556 : /// Returns if timeline is cancelled.
557 0 : pub fn is_cancelled(&self) -> bool {
558 0 : *self.cancellation_rx.borrow()
559 0 : }
560 :
561 : /// Returns watch channel which gets value when timeline is cancelled. It is
562 : /// guaranteed to have not cancelled value observed (errors otherwise).
563 0 : pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
564 0 : let rx = self.cancellation_rx.clone();
565 0 : if *rx.borrow() {
566 0 : bail!(TimelineError::Cancelled(self.ttid));
567 0 : }
568 0 : Ok(rx)
569 0 : }
570 :
571 : /// Take a writing mutual exclusive lock on timeline shared_state.
572 0 : pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
573 0 : self.mutex.lock().await
574 0 : }
575 :
576 0 : async fn update_status(&self, shared_state: &mut SharedState) -> bool {
577 0 : shared_state
578 0 : .update_status(self.walreceivers.get_num(), self.ttid)
579 0 : .await
580 0 : }
581 :
582 : /// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
583 0 : pub async fn update_status_notify(&self) -> Result<()> {
584 0 : if self.is_cancelled() {
585 0 : bail!(TimelineError::Cancelled(self.ttid));
586 0 : }
587 0 : let is_wal_backup_action_pending: bool = {
588 0 : let mut shared_state = self.write_shared_state().await;
589 0 : self.update_status(&mut shared_state).await
590 : };
591 0 : if is_wal_backup_action_pending {
592 : // Can fail only if channel to a static thread got closed, which is not normal at all.
593 0 : self.wal_backup_launcher_tx.send(self.ttid).await?;
594 0 : }
595 0 : Ok(())
596 0 : }
597 :
598 : /// Returns true if walsender should stop sending WAL to pageserver. We
599 : /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
600 : /// computes. While there might be nothing to stream already, we learn about
601 : /// remote_consistent_lsn update through replication feedback, and we want
602 : /// to stop pushing to the broker if pageserver is fully caughtup.
603 0 : pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
604 0 : if self.is_cancelled() {
605 0 : return true;
606 0 : }
607 0 : let shared_state = self.write_shared_state().await;
608 0 : if self.walreceivers.get_num() == 0 {
609 0 : return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
610 0 : reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
611 0 : }
612 0 : false
613 0 : }
614 :
615 : /// Ensure taht current term is t, erroring otherwise, and lock the state.
616 0 : pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
617 0 : let ss = self.write_shared_state().await;
618 0 : if ss.sk.state.acceptor_state.term != t {
619 0 : bail!(
620 0 : "failed to acquire term {}, current term {}",
621 0 : t,
622 0 : ss.sk.state.acceptor_state.term
623 0 : );
624 0 : }
625 0 : Ok(ss)
626 0 : }
627 :
628 : /// Returns whether s3 offloading is required and sets current status as
629 : /// matching it.
630 0 : pub async fn wal_backup_attend(&self) -> bool {
631 0 : if self.is_cancelled() {
632 0 : return false;
633 0 : }
634 0 :
635 0 : self.write_shared_state()
636 0 : .await
637 0 : .wal_backup_attend(self.walreceivers.get_num())
638 0 : }
639 :
640 : /// Returns commit_lsn watch channel.
641 0 : pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
642 0 : self.commit_lsn_watch_rx.clone()
643 0 : }
644 :
645 : /// Returns term_flush_lsn watch channel.
646 0 : pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
647 0 : self.term_flush_lsn_watch_rx.clone()
648 0 : }
649 :
650 : /// Pass arrived message to the safekeeper.
651 0 : pub async fn process_msg(
652 0 : &self,
653 0 : msg: &ProposerAcceptorMessage,
654 0 : ) -> Result<Option<AcceptorProposerMessage>> {
655 0 : if self.is_cancelled() {
656 0 : bail!(TimelineError::Cancelled(self.ttid));
657 0 : }
658 :
659 : let mut rmsg: Option<AcceptorProposerMessage>;
660 : let commit_lsn: Lsn;
661 : let term_flush_lsn: TermLsn;
662 : {
663 0 : let mut shared_state = self.write_shared_state().await;
664 0 : rmsg = shared_state.sk.process_msg(msg).await?;
665 :
666 : // if this is AppendResponse, fill in proper hot standby feedback.
667 0 : if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
668 0 : resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
669 0 : }
670 :
671 0 : commit_lsn = shared_state.sk.state.inmem.commit_lsn;
672 0 : term_flush_lsn =
673 0 : TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
674 0 : }
675 0 : self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
676 0 : self.commit_lsn_watch_tx.send(commit_lsn)?;
677 0 : Ok(rmsg)
678 0 : }
679 :
680 : /// Returns wal_seg_size.
681 0 : pub async fn get_wal_seg_size(&self) -> usize {
682 0 : self.write_shared_state().await.get_wal_seg_size()
683 0 : }
684 :
685 : /// Returns true only if the timeline is loaded and active.
686 0 : pub async fn is_active(&self) -> bool {
687 0 : if self.is_cancelled() {
688 0 : return false;
689 0 : }
690 0 :
691 0 : self.write_shared_state().await.active
692 0 : }
693 :
694 : /// Returns state of the timeline.
695 0 : pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
696 0 : let state = self.write_shared_state().await;
697 0 : (state.sk.state.inmem.clone(), state.sk.state.clone())
698 0 : }
699 :
700 : /// Returns latest backup_lsn.
701 0 : pub async fn get_wal_backup_lsn(&self) -> Lsn {
702 0 : self.write_shared_state().await.sk.state.inmem.backup_lsn
703 0 : }
704 :
705 : /// Sets backup_lsn to the given value.
706 0 : pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
707 0 : if self.is_cancelled() {
708 0 : bail!(TimelineError::Cancelled(self.ttid));
709 0 : }
710 :
711 0 : let mut state = self.write_shared_state().await;
712 0 : state.sk.state.inmem.backup_lsn = max(state.sk.state.inmem.backup_lsn, backup_lsn);
713 0 : // we should check whether to shut down offloader, but this will be done
714 0 : // soon by peer communication anyway.
715 0 : Ok(())
716 0 : }
717 :
718 : /// Get safekeeper info for broadcasting to broker and other peers.
719 0 : pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
720 0 : let shared_state = self.write_shared_state().await;
721 0 : let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
722 0 : shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
723 0 : }
724 :
725 : /// Update timeline state with peer safekeeper data.
726 0 : pub async fn record_safekeeper_info(&self, sk_info: SafekeeperTimelineInfo) -> Result<()> {
727 : let is_wal_backup_action_pending: bool;
728 : let commit_lsn: Lsn;
729 : {
730 0 : let mut shared_state = self.write_shared_state().await;
731 0 : shared_state.sk.record_safekeeper_info(&sk_info).await?;
732 0 : let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
733 0 : shared_state.peers_info.upsert(&peer_info);
734 0 : is_wal_backup_action_pending = self.update_status(&mut shared_state).await;
735 0 : commit_lsn = shared_state.sk.state.inmem.commit_lsn;
736 0 : }
737 0 : self.commit_lsn_watch_tx.send(commit_lsn)?;
738 : // Wake up wal backup launcher, if it is time to stop the offloading.
739 0 : if is_wal_backup_action_pending {
740 0 : self.wal_backup_launcher_tx.send(self.ttid).await?;
741 0 : }
742 0 : Ok(())
743 0 : }
744 :
745 : /// Update in memory remote consistent lsn.
746 0 : pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
747 0 : let mut shared_state = self.write_shared_state().await;
748 0 : shared_state.sk.state.inmem.remote_consistent_lsn =
749 0 : max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
750 0 : }
751 :
752 0 : pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
753 0 : let shared_state = self.write_shared_state().await;
754 0 : shared_state.get_peers(conf.heartbeat_timeout)
755 0 : }
756 :
757 : /// Should we start fetching WAL from a peer safekeeper, and if yes, from
758 : /// which? Answer is yes, i.e. .donors is not empty if 1) there is something
759 : /// to fetch, and we can do that without running elections; 2) there is no
760 : /// actively streaming compute, as we don't want to compete with it.
761 : ///
762 : /// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal
763 : /// to its last_log_term so we are sure such a leader ever had been elected.
764 : ///
765 : /// All possible donors are returned so that we could keep connection to the
766 : /// current one if it is good even if it slightly lags behind.
767 : ///
768 : /// Note that term conditions above might be not met, but safekeepers are
769 : /// still not aligned on last flush_lsn. Generally in this case until
770 : /// elections are run it is not possible to say which safekeeper should
771 : /// recover from which one -- history which would be committed is different
772 : /// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
773 : /// Thus we don't try to predict it here.
774 0 : pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
775 0 : let ss = self.write_shared_state().await;
776 0 : let term = ss.sk.state.acceptor_state.term;
777 0 : let last_log_term = ss.sk.get_epoch();
778 0 : let flush_lsn = ss.sk.flush_lsn();
779 0 : // note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us.
780 0 : let mut peers = ss.get_peers(heartbeat_timeout);
781 0 : // Sort by <last log term, lsn> pairs.
782 0 : peers.sort_by(|p1, p2| {
783 0 : let tl1 = TermLsn {
784 0 : term: p1.last_log_term,
785 0 : lsn: p1.flush_lsn,
786 0 : };
787 0 : let tl2 = TermLsn {
788 0 : term: p2.last_log_term,
789 0 : lsn: p2.flush_lsn,
790 0 : };
791 0 : tl2.cmp(&tl1) // desc
792 0 : });
793 0 : let num_streaming_computes = self.walreceivers.get_num_streaming();
794 0 : let donors = if num_streaming_computes > 0 {
795 0 : vec![] // If there is a streaming compute, don't try to recover to not intervene.
796 : } else {
797 0 : peers
798 0 : .iter()
799 0 : .filter_map(|candidate| {
800 0 : // Are we interested in this candidate?
801 0 : let candidate_tl = TermLsn {
802 0 : term: candidate.last_log_term,
803 0 : lsn: candidate.flush_lsn,
804 0 : };
805 0 : let my_tl = TermLsn {
806 0 : term: last_log_term,
807 0 : lsn: flush_lsn,
808 0 : };
809 0 : if my_tl < candidate_tl {
810 : // Yes, we are interested. Can we pull from it without
811 : // (re)running elections? It is possible if 1) his term
812 : // is equal to his last_log_term so we could act on
813 : // behalf of leader of this term (we must be sure he was
814 : // ever elected) and 2) our term is not higher, or we'll refuse data.
815 0 : if candidate.term == candidate.last_log_term && candidate.term >= term {
816 0 : Some(Donor::from(candidate))
817 : } else {
818 0 : None
819 : }
820 : } else {
821 0 : None
822 : }
823 0 : })
824 0 : .collect()
825 : };
826 0 : RecoveryNeededInfo {
827 0 : term,
828 0 : last_log_term,
829 0 : flush_lsn,
830 0 : peers,
831 0 : num_streaming_computes,
832 0 : donors,
833 0 : }
834 0 : }
835 :
836 0 : pub fn get_walsenders(&self) -> &Arc<WalSenders> {
837 0 : &self.walsenders
838 0 : }
839 :
840 0 : pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
841 0 : &self.walreceivers
842 0 : }
843 :
844 : /// Returns flush_lsn.
845 0 : pub async fn get_flush_lsn(&self) -> Lsn {
846 0 : self.write_shared_state().await.sk.wal_store.flush_lsn()
847 0 : }
848 :
849 : /// Delete WAL segments from disk that are no longer needed. This is determined
850 : /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
851 0 : pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
852 0 : if self.is_cancelled() {
853 0 : bail!(TimelineError::Cancelled(self.ttid));
854 0 : }
855 :
856 : // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
857 : // This allows to get better read speed for pageservers that are lagging behind,
858 : // at the cost of keeping more WAL on disk.
859 0 : let replication_horizon_lsn = if self.walsenders_keep_horizon {
860 0 : self.walsenders.laggard_lsn()
861 : } else {
862 0 : None
863 : };
864 :
865 : let horizon_segno: XLogSegNo;
866 0 : let remover = {
867 0 : let shared_state = self.write_shared_state().await;
868 0 : horizon_segno =
869 0 : shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn);
870 0 : if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
871 0 : return Ok(()); // nothing to do
872 0 : }
873 0 :
874 0 : // release the lock before removing
875 0 : shared_state.sk.wal_store.remove_up_to(horizon_segno - 1)
876 0 : };
877 0 :
878 0 : // delete old WAL files
879 0 : remover.await?;
880 :
881 : // update last_removed_segno
882 0 : let mut shared_state = self.write_shared_state().await;
883 0 : shared_state.last_removed_segno = horizon_segno;
884 0 : Ok(())
885 0 : }
886 :
887 : /// Persist control file if there is something to save and enough time
888 : /// passed after the last save. This helps to keep remote_consistent_lsn up
889 : /// to date so that storage nodes restart doesn't cause many pageserver ->
890 : /// safekeeper reconnections.
891 0 : pub async fn maybe_persist_control_file(&self) -> Result<()> {
892 0 : self.write_shared_state()
893 0 : .await
894 : .sk
895 0 : .maybe_persist_inmem_control_file()
896 0 : .await
897 0 : }
898 :
899 : /// Gather timeline data for metrics. If the timeline is not active, returns
900 : /// None, we do not collect these.
901 0 : pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
902 0 : if self.is_cancelled() {
903 0 : return None;
904 0 : }
905 0 :
906 0 : let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
907 0 : let state = self.write_shared_state().await;
908 0 : if state.active {
909 0 : Some(FullTimelineInfo {
910 0 : ttid: self.ttid,
911 0 : ps_feedback_count,
912 0 : last_ps_feedback,
913 0 : wal_backup_active: state.wal_backup_active,
914 0 : timeline_is_active: state.active,
915 0 : num_computes: self.walreceivers.get_num() as u32,
916 0 : last_removed_segno: state.last_removed_segno,
917 0 : epoch_start_lsn: state.sk.epoch_start_lsn,
918 0 : mem_state: state.sk.state.inmem.clone(),
919 0 : persisted_state: state.sk.state.clone(),
920 0 : flush_lsn: state.sk.wal_store.flush_lsn(),
921 0 : wal_storage: state.sk.wal_store.get_metrics(),
922 0 : })
923 : } else {
924 0 : None
925 : }
926 0 : }
927 :
928 : /// Returns in-memory timeline state to build a full debug dump.
929 0 : pub async fn memory_dump(&self) -> debug_dump::Memory {
930 0 : let state = self.write_shared_state().await;
931 :
932 0 : let (write_lsn, write_record_lsn, flush_lsn, file_open) =
933 0 : state.sk.wal_store.internal_state();
934 0 :
935 0 : debug_dump::Memory {
936 0 : is_cancelled: self.is_cancelled(),
937 0 : peers_info_len: state.peers_info.0.len(),
938 0 : walsenders: self.walsenders.get_all(),
939 0 : wal_backup_active: state.wal_backup_active,
940 0 : active: state.active,
941 0 : num_computes: self.walreceivers.get_num() as u32,
942 0 : last_removed_segno: state.last_removed_segno,
943 0 : epoch_start_lsn: state.sk.epoch_start_lsn,
944 0 : mem_state: state.sk.state.inmem.clone(),
945 0 : write_lsn,
946 0 : write_record_lsn,
947 0 : flush_lsn,
948 0 : file_open,
949 0 : }
950 0 : }
951 :
952 : /// Apply a function to the control file state and persist it.
953 0 : pub async fn map_control_file<T>(
954 0 : &self,
955 0 : f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
956 0 : ) -> Result<T> {
957 0 : let mut state = self.write_shared_state().await;
958 0 : let mut persistent_state = state.sk.state.start_change();
959 : // If f returns error, we abort the change and don't persist anything.
960 0 : let res = f(&mut persistent_state)?;
961 : // If persisting fails, we abort the change and return error.
962 0 : state.sk.state.finish_change(&persistent_state).await?;
963 0 : Ok(res)
964 0 : }
965 : }
966 :
967 : /// Deletes directory and it's contents. Returns false if directory does not exist.
968 0 : async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
969 0 : match fs::remove_dir_all(path).await {
970 0 : Ok(_) => Ok(true),
971 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
972 0 : Err(e) => Err(e.into()),
973 : }
974 0 : }
|