TLA Line data Source code
1 : //! This module implements Timeline lifecycle management and has all necessary code
2 : //! to glue together SafeKeeper and all other background services.
3 :
4 : use anyhow::{anyhow, bail, Result};
5 : use camino::Utf8PathBuf;
6 : use postgres_ffi::XLogSegNo;
7 : use serde::{Deserialize, Serialize};
8 : use tokio::fs;
9 :
10 : use std::cmp::max;
11 : use std::sync::Arc;
12 : use std::time::Duration;
13 : use tokio::sync::{Mutex, MutexGuard};
14 : use tokio::{
15 : sync::{mpsc::Sender, watch},
16 : time::Instant,
17 : };
18 : use tracing::*;
19 : use utils::http::error::ApiError;
20 : use utils::{
21 : id::{NodeId, TenantTimelineId},
22 : lsn::Lsn,
23 : };
24 :
25 : use storage_broker::proto::SafekeeperTimelineInfo;
26 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
27 :
28 : use crate::receive_wal::WalReceivers;
29 : use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo};
30 : use crate::safekeeper::{
31 : AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
32 : SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
33 : };
34 : use crate::send_wal::WalSenders;
35 : use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
36 :
37 : use crate::metrics::FullTimelineInfo;
38 : use crate::wal_storage::Storage as wal_storage_iface;
39 : use crate::SafeKeeperConf;
40 : use crate::{debug_dump, wal_storage};
41 :
42 : /// Things safekeeper should know about timeline state on peers.
43 CBC 13295 : #[derive(Debug, Clone, Serialize, Deserialize)]
44 : pub struct PeerInfo {
45 : pub sk_id: NodeId,
46 : pub term: Term,
47 : /// Term of the last entry.
48 : pub last_log_term: Term,
49 : /// LSN of the last record.
50 : pub flush_lsn: Lsn,
51 : pub commit_lsn: Lsn,
52 : /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
53 : /// sk since backup_lsn.
54 : pub local_start_lsn: Lsn,
55 : /// When info was received. Serde annotations are not very useful but make
56 : /// the code compile -- we don't rely on this field externally.
57 : #[serde(skip)]
58 : #[serde(default = "Instant::now")]
59 : ts: Instant,
60 : pub pg_connstr: String,
61 : pub http_connstr: String,
62 : }
63 :
64 : impl PeerInfo {
65 11960 : fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
66 11960 : PeerInfo {
67 11960 : sk_id: NodeId(sk_info.safekeeper_id),
68 11960 : term: sk_info.term,
69 11960 : last_log_term: sk_info.last_log_term,
70 11960 : flush_lsn: Lsn(sk_info.flush_lsn),
71 11960 : commit_lsn: Lsn(sk_info.commit_lsn),
72 11960 : local_start_lsn: Lsn(sk_info.local_start_lsn),
73 11960 : pg_connstr: sk_info.safekeeper_connstr.clone(),
74 11960 : http_connstr: sk_info.http_connstr.clone(),
75 11960 : ts,
76 11960 : }
77 11960 : }
78 : }
79 :
80 : // vector-based node id -> peer state map with very limited functionality we
81 : // need.
82 UBC 0 : #[derive(Debug, Clone, Default)]
83 : pub struct PeersInfo(pub Vec<PeerInfo>);
84 :
85 : impl PeersInfo {
86 CBC 11960 : fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
87 16033 : self.0.iter_mut().find(|p| p.sk_id == id)
88 11960 : }
89 :
90 11960 : fn upsert(&mut self, p: &PeerInfo) {
91 11960 : match self.get(p.sk_id) {
92 11148 : Some(rp) => *rp = p.clone(),
93 812 : None => self.0.push(p.clone()),
94 : }
95 11960 : }
96 : }
97 :
98 : /// Shared state associated with database instance
99 : pub struct SharedState {
100 : /// Safekeeper object
101 : sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
102 : /// In memory list containing state of peers sent in latest messages from them.
103 : peers_info: PeersInfo,
104 : /// True when WAL backup launcher oversees the timeline, making sure WAL is
105 : /// offloaded, allows to bother launcher less.
106 : wal_backup_active: bool,
107 : /// True whenever there is at least some pending activity on timeline: live
108 : /// compute connection, pageserver is not caughtup (it must have latest WAL
109 : /// for new compute start) or WAL backuping is not finished. Practically it
110 : /// means safekeepers broadcast info to peers about the timeline, old WAL is
111 : /// trimmed.
112 : ///
113 : /// TODO: it might be better to remove tli completely from GlobalTimelines
114 : /// when tli is inactive instead of having this flag.
115 : active: bool,
116 : last_removed_segno: XLogSegNo,
117 : }
118 :
119 : impl SharedState {
120 : /// Initialize fresh timeline state without persisting anything to disk.
121 449 : fn create_new(
122 449 : conf: &SafeKeeperConf,
123 449 : ttid: &TenantTimelineId,
124 449 : state: SafeKeeperState,
125 449 : ) -> Result<Self> {
126 449 : if state.server.wal_seg_size == 0 {
127 UBC 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
128 CBC 449 : }
129 449 :
130 449 : if state.server.pg_version == UNKNOWN_SERVER_VERSION {
131 UBC 0 : bail!(TimelineError::UninitialinzedPgVersion(*ttid));
132 CBC 449 : }
133 449 :
134 449 : if state.commit_lsn < state.local_start_lsn {
135 UBC 0 : bail!(
136 0 : "commit_lsn {} is higher than local_start_lsn {}",
137 0 : state.commit_lsn,
138 0 : state.local_start_lsn
139 0 : );
140 CBC 449 : }
141 449 :
142 449 : // We don't want to write anything to disk, because we may have existing timeline there.
143 449 : // These functions should not change anything on disk.
144 449 : let timeline_dir = conf.timeline_dir(ttid);
145 449 : let control_store = control_file::FileStorage::create_new(timeline_dir, conf, state)?;
146 449 : let wal_store =
147 449 : wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
148 449 : let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
149 :
150 449 : Ok(Self {
151 449 : sk,
152 449 : peers_info: PeersInfo(vec![]),
153 449 : wal_backup_active: false,
154 449 : active: false,
155 449 : last_removed_segno: 0,
156 449 : })
157 449 : }
158 :
159 : /// Restore SharedState from control file. If file doesn't exist, bails out.
160 133 : fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
161 133 : let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
162 133 : if control_store.server.wal_seg_size == 0 {
163 UBC 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
164 CBC 133 : }
165 :
166 133 : let wal_store =
167 133 : wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
168 :
169 : Ok(Self {
170 133 : sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
171 133 : peers_info: PeersInfo(vec![]),
172 : wal_backup_active: false,
173 : active: false,
174 : last_removed_segno: 0,
175 : })
176 133 : }
177 :
178 15911 : fn is_active(&self, num_computes: usize, remote_consistent_lsn: Lsn) -> bool {
179 15911 : self.is_wal_backup_required(num_computes)
180 : // FIXME: add tracking of relevant pageservers and check them here individually,
181 : // otherwise migration won't work (we suspend too early).
182 2365 : || remote_consistent_lsn < self.sk.inmem.commit_lsn
183 15911 : }
184 :
185 : /// Mark timeline active/inactive and return whether s3 offloading requires
186 : /// start/stop action. If timeline is deactivated, control file is persisted
187 : /// as maintenance task does that only for active timelines.
188 15911 : async fn update_status(
189 15911 : &mut self,
190 15911 : num_computes: usize,
191 15911 : remote_consistent_lsn: Lsn,
192 15911 : ttid: TenantTimelineId,
193 15911 : ) -> bool {
194 15911 : let is_active = self.is_active(num_computes, remote_consistent_lsn);
195 15911 : if self.active != is_active {
196 1460 : info!(
197 1460 : "timeline {} active={} now, remote_consistent_lsn={}, commit_lsn={}",
198 1460 : ttid, is_active, remote_consistent_lsn, self.sk.inmem.commit_lsn
199 1460 : );
200 1460 : if !is_active {
201 1403 : if let Err(e) = self.sk.persist_inmem(remote_consistent_lsn).await {
202 UBC 0 : warn!("control file save in update_status failed: {:?}", e);
203 CBC 469 : }
204 991 : }
205 14451 : }
206 15911 : self.active = is_active;
207 15911 : self.is_wal_backup_action_pending(num_computes)
208 15911 : }
209 :
210 : /// Should we run s3 offloading in current state?
211 44927 : fn is_wal_backup_required(&self, num_computes: usize) -> bool {
212 44927 : let seg_size = self.get_wal_seg_size();
213 44927 : num_computes > 0 ||
214 : // Currently only the whole segment is offloaded, so compare segment numbers.
215 10047 : (self.sk.inmem.commit_lsn.segment_number(seg_size) >
216 10047 : self.sk.inmem.backup_lsn.segment_number(seg_size))
217 44927 : }
218 :
219 : /// Is current state of s3 offloading is not what it ought to be?
220 15911 : fn is_wal_backup_action_pending(&self, num_computes: usize) -> bool {
221 15911 : let res = self.wal_backup_active != self.is_wal_backup_required(num_computes);
222 15911 : if res {
223 12959 : let action_pending = if self.is_wal_backup_required(num_computes) {
224 12899 : "start"
225 : } else {
226 60 : "stop"
227 : };
228 12959 : trace!(
229 UBC 0 : "timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
230 0 : self.sk.state.timeline_id, action_pending, num_computes, self.sk.inmem.commit_lsn, self.sk.inmem.backup_lsn
231 0 : );
232 CBC 2952 : }
233 15911 : res
234 15911 : }
235 :
236 : /// Returns whether s3 offloading is required and sets current status as
237 : /// matching.
238 146 : fn wal_backup_attend(&mut self, num_computes: usize) -> bool {
239 146 : self.wal_backup_active = self.is_wal_backup_required(num_computes);
240 146 : self.wal_backup_active
241 146 : }
242 :
243 44939 : fn get_wal_seg_size(&self) -> usize {
244 44939 : self.sk.state.server.wal_seg_size as usize
245 44939 : }
246 :
247 8661 : fn get_safekeeper_info(
248 8661 : &self,
249 8661 : ttid: &TenantTimelineId,
250 8661 : conf: &SafeKeeperConf,
251 8661 : remote_consistent_lsn: Lsn,
252 8661 : ) -> SafekeeperTimelineInfo {
253 8661 : SafekeeperTimelineInfo {
254 8661 : safekeeper_id: conf.my_id.0,
255 8661 : tenant_timeline_id: Some(ProtoTenantTimelineId {
256 8661 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
257 8661 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
258 8661 : }),
259 8661 : term: self.sk.state.acceptor_state.term,
260 8661 : last_log_term: self.sk.get_epoch(),
261 8661 : flush_lsn: self.sk.flush_lsn().0,
262 8661 : // note: this value is not flushed to control file yet and can be lost
263 8661 : commit_lsn: self.sk.inmem.commit_lsn.0,
264 8661 : remote_consistent_lsn: remote_consistent_lsn.0,
265 8661 : peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0,
266 8661 : safekeeper_connstr: conf
267 8661 : .advertise_pg_addr
268 8661 : .to_owned()
269 8661 : .unwrap_or(conf.listen_pg_addr.clone()),
270 8661 : http_connstr: conf.listen_http_addr.to_owned(),
271 8661 : backup_lsn: self.sk.inmem.backup_lsn.0,
272 8661 : local_start_lsn: self.sk.state.local_start_lsn.0,
273 8661 : availability_zone: conf.availability_zone.clone(),
274 8661 : }
275 8661 : }
276 :
277 : /// Get our latest view of alive peers status on the timeline.
278 : /// We pass our own info through the broker as well, so when we don't have connection
279 : /// to the broker returned vec is empty.
280 541 : fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
281 541 : let now = Instant::now();
282 541 : self.peers_info
283 541 : .0
284 541 : .iter()
285 541 : // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
286 1373 : .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
287 541 : .cloned()
288 541 : .collect()
289 541 : }
290 : }
291 :
292 1 : #[derive(Debug, thiserror::Error)]
293 : pub enum TimelineError {
294 : #[error("Timeline {0} was cancelled and cannot be used anymore")]
295 : Cancelled(TenantTimelineId),
296 : #[error("Timeline {0} was not found in global map")]
297 : NotFound(TenantTimelineId),
298 : #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
299 : Invalid(TenantTimelineId),
300 : #[error("Timeline {0} is already exists")]
301 : AlreadyExists(TenantTimelineId),
302 : #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
303 : UninitializedWalSegSize(TenantTimelineId),
304 : #[error("Timeline {0} is not initialized, pg_version is unknown")]
305 : UninitialinzedPgVersion(TenantTimelineId),
306 : }
307 :
308 : // Convert to HTTP API error.
309 : impl From<TimelineError> for ApiError {
310 UBC 0 : fn from(te: TimelineError) -> ApiError {
311 0 : match te {
312 0 : TimelineError::NotFound(ttid) => {
313 0 : ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
314 : }
315 0 : _ => ApiError::InternalServerError(anyhow!("{}", te)),
316 : }
317 0 : }
318 : }
319 :
320 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
321 : /// It also holds SharedState and provides mutually exclusive access to it.
322 : pub struct Timeline {
323 : pub ttid: TenantTimelineId,
324 :
325 : /// Sending here asks for wal backup launcher attention (start/stop
326 : /// offloading). Sending ttid instead of concrete command allows to do
327 : /// sending without timeline lock.
328 : pub wal_backup_launcher_tx: Sender<TenantTimelineId>,
329 :
330 : /// Used to broadcast commit_lsn updates to all background jobs.
331 : commit_lsn_watch_tx: watch::Sender<Lsn>,
332 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
333 :
334 : /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
335 : /// them when sending in recovery mode (to walproposer or peers). Note: this
336 : /// is just a notification, WAL reading should always done with lock held as
337 : /// term can change otherwise.
338 : term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
339 : term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
340 :
341 : /// Safekeeper and other state, that should remain consistent and
342 : /// synchronized with the disk. This is tokio mutex as we write WAL to disk
343 : /// while holding it, ensuring that consensus checks are in order.
344 : mutex: Mutex<SharedState>,
345 : walsenders: Arc<WalSenders>,
346 : walreceivers: Arc<WalReceivers>,
347 :
348 : /// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
349 : cancellation_tx: watch::Sender<bool>,
350 :
351 : /// Timeline should not be used after cancellation. Background tasks should
352 : /// monitor this channel and stop eventually after receiving `true` from this channel.
353 : cancellation_rx: watch::Receiver<bool>,
354 :
355 : /// Directory where timeline state is stored.
356 : pub timeline_dir: Utf8PathBuf,
357 : }
358 :
359 : impl Timeline {
360 : /// Load existing timeline from disk.
361 CBC 133 : pub fn load_timeline(
362 133 : conf: &SafeKeeperConf,
363 133 : ttid: TenantTimelineId,
364 133 : wal_backup_launcher_tx: Sender<TenantTimelineId>,
365 133 : ) -> Result<Timeline> {
366 133 : let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
367 :
368 133 : let shared_state = SharedState::restore(conf, &ttid)?;
369 133 : let rcl = shared_state.sk.state.remote_consistent_lsn;
370 133 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
371 133 : watch::channel(shared_state.sk.state.commit_lsn);
372 133 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
373 133 : shared_state.sk.get_term(),
374 133 : shared_state.sk.flush_lsn(),
375 133 : )));
376 133 : let (cancellation_tx, cancellation_rx) = watch::channel(false);
377 133 :
378 133 : Ok(Timeline {
379 133 : ttid,
380 133 : wal_backup_launcher_tx,
381 133 : commit_lsn_watch_tx,
382 133 : commit_lsn_watch_rx,
383 133 : term_flush_lsn_watch_tx,
384 133 : term_flush_lsn_watch_rx,
385 133 : mutex: Mutex::new(shared_state),
386 133 : walsenders: WalSenders::new(rcl),
387 133 : walreceivers: WalReceivers::new(),
388 133 : cancellation_rx,
389 133 : cancellation_tx,
390 133 : timeline_dir: conf.timeline_dir(&ttid),
391 133 : })
392 133 : }
393 :
394 : /// Create a new timeline, which is not yet persisted to disk.
395 449 : pub fn create_empty(
396 449 : conf: &SafeKeeperConf,
397 449 : ttid: TenantTimelineId,
398 449 : wal_backup_launcher_tx: Sender<TenantTimelineId>,
399 449 : server_info: ServerInfo,
400 449 : commit_lsn: Lsn,
401 449 : local_start_lsn: Lsn,
402 449 : ) -> Result<Timeline> {
403 449 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
404 449 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
405 449 : watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
406 449 : let (cancellation_tx, cancellation_rx) = watch::channel(false);
407 449 : let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
408 449 :
409 449 : Ok(Timeline {
410 449 : ttid,
411 449 : wal_backup_launcher_tx,
412 449 : commit_lsn_watch_tx,
413 449 : commit_lsn_watch_rx,
414 449 : term_flush_lsn_watch_tx,
415 449 : term_flush_lsn_watch_rx,
416 449 : mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
417 449 : walsenders: WalSenders::new(Lsn(0)),
418 449 : walreceivers: WalReceivers::new(),
419 449 : cancellation_rx,
420 449 : cancellation_tx,
421 449 : timeline_dir: conf.timeline_dir(&ttid),
422 : })
423 449 : }
424 :
425 : /// Initialize fresh timeline on disk and start background tasks. If init
426 : /// fails, timeline is cancelled and cannot be used anymore.
427 : ///
428 : /// Init is transactional, so if it fails, created files will be deleted,
429 : /// and state on disk should remain unchanged.
430 449 : pub async fn init_new(
431 449 : self: &Arc<Timeline>,
432 449 : shared_state: &mut MutexGuard<'_, SharedState>,
433 449 : conf: &SafeKeeperConf,
434 449 : ) -> Result<()> {
435 526 : match fs::metadata(&self.timeline_dir).await {
436 : Ok(_) => {
437 : // Timeline directory exists on disk, we should leave state unchanged
438 : // and return error.
439 UBC 0 : bail!(TimelineError::Invalid(self.ttid));
440 : }
441 CBC 449 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
442 UBC 0 : Err(e) => {
443 0 : return Err(e.into());
444 : }
445 : }
446 :
447 : // Create timeline directory.
448 CBC 528 : fs::create_dir_all(&self.timeline_dir).await?;
449 :
450 : // Write timeline to disk and start background tasks.
451 1442 : if let Err(e) = shared_state.sk.persist_inmem(Lsn::INVALID).await {
452 : // Bootstrap failed, cancel timeline and remove timeline directory.
453 UBC 0 : self.cancel(shared_state);
454 :
455 0 : if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
456 0 : warn!(
457 0 : "failed to remove timeline {} directory after bootstrap failure: {}",
458 0 : self.ttid, fs_err
459 0 : );
460 0 : }
461 :
462 0 : return Err(e);
463 CBC 449 : }
464 449 : self.bootstrap(conf);
465 449 : Ok(())
466 449 : }
467 :
468 : /// Bootstrap new or existing timeline starting background stasks.
469 582 : pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
470 582 : // Start recovery task which always runs on the timeline.
471 582 : if conf.peer_recovery_enabled {
472 1 : tokio::spawn(recovery_main(self.clone(), conf.clone()));
473 581 : }
474 582 : }
475 :
476 : /// Delete timeline from disk completely, by removing timeline directory. Background
477 : /// timeline activities will stop eventually.
478 25 : pub async fn delete_from_disk(
479 25 : &self,
480 25 : shared_state: &mut MutexGuard<'_, SharedState>,
481 25 : ) -> Result<(bool, bool)> {
482 25 : let was_active = shared_state.active;
483 25 : self.cancel(shared_state);
484 25 : let dir_existed = delete_dir(&self.timeline_dir).await?;
485 25 : Ok((dir_existed, was_active))
486 25 : }
487 :
488 : /// Cancel timeline to prevent further usage. Background tasks will stop
489 : /// eventually after receiving cancellation signal.
490 : ///
491 : /// Note that we can't notify backup launcher here while holding
492 : /// shared_state lock, as this is a potential deadlock: caller is
493 : /// responsible for that. Generally we should probably make WAL backup tasks
494 : /// to shut down on their own, checking once in a while whether it is the
495 : /// time.
496 25 : fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
497 25 : info!("timeline {} is cancelled", self.ttid);
498 25 : let _ = self.cancellation_tx.send(true);
499 25 : // Close associated FDs. Nobody will be able to touch timeline data once
500 25 : // it is cancelled, so WAL storage won't be opened again.
501 25 : shared_state.sk.wal_store.close();
502 25 : }
503 :
504 : /// Returns if timeline is cancelled.
505 3320336 : pub fn is_cancelled(&self) -> bool {
506 3320336 : *self.cancellation_rx.borrow()
507 3320336 : }
508 :
509 : /// Returns watch channel which gets value when timeline is cancelled. It is
510 : /// guaranteed to have not cancelled value observed (errors otherwise).
511 1 : pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
512 1 : let rx = self.cancellation_rx.clone();
513 1 : if *rx.borrow() {
514 UBC 0 : bail!(TimelineError::Cancelled(self.ttid));
515 CBC 1 : }
516 1 : Ok(rx)
517 1 : }
518 :
519 : /// Take a writing mutual exclusive lock on timeline shared_state.
520 3323988 : pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
521 3323988 : self.mutex.lock().await
522 3323988 : }
523 :
524 15911 : async fn update_status(&self, shared_state: &mut SharedState) -> bool {
525 15911 : shared_state
526 15911 : .update_status(
527 15911 : self.walreceivers.get_num(),
528 15911 : self.get_walsenders().get_remote_consistent_lsn(),
529 15911 : self.ttid,
530 15911 : )
531 1403 : .await
532 15911 : }
533 :
534 : /// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
535 3951 : pub async fn update_status_notify(&self) -> Result<()> {
536 3951 : if self.is_cancelled() {
537 UBC 0 : bail!(TimelineError::Cancelled(self.ttid));
538 CBC 3951 : }
539 3951 : let is_wal_backup_action_pending: bool = {
540 3951 : let mut shared_state = self.write_shared_state().await;
541 3951 : self.update_status(&mut shared_state).await
542 : };
543 3951 : if is_wal_backup_action_pending {
544 : // Can fail only if channel to a static thread got closed, which is not normal at all.
545 2404 : self.wal_backup_launcher_tx.send(self.ttid).await?;
546 1547 : }
547 3951 : Ok(())
548 3951 : }
549 :
550 : /// Returns true if walsender should stop sending WAL to pageserver. We
551 : /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
552 : /// computes. While there might be nothing to stream already, we learn about
553 : /// remote_consistent_lsn update through replication feedback, and we want
554 : /// to stop pushing to the broker if pageserver is fully caughtup.
555 3461 : pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
556 3461 : if self.is_cancelled() {
557 1 : return true;
558 3460 : }
559 3460 : let shared_state = self.write_shared_state().await;
560 3460 : if self.walreceivers.get_num() == 0 {
561 712 : return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
562 712 : reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn;
563 2748 : }
564 2748 : false
565 3461 : }
566 :
567 : /// Ensure taht current term is t, erroring otherwise, and lock the state.
568 1905 : pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
569 1905 : let ss = self.write_shared_state().await;
570 1905 : if ss.sk.state.acceptor_state.term != t {
571 1 : bail!(
572 1 : "failed to acquire term {}, current term {}",
573 1 : t,
574 1 : ss.sk.state.acceptor_state.term
575 1 : );
576 1904 : }
577 1904 : Ok(ss)
578 1905 : }
579 :
580 : /// Returns whether s3 offloading is required and sets current status as
581 : /// matching it.
582 146 : pub async fn wal_backup_attend(&self) -> bool {
583 146 : if self.is_cancelled() {
584 UBC 0 : return false;
585 CBC 146 : }
586 146 :
587 146 : self.write_shared_state()
588 38 : .await
589 146 : .wal_backup_attend(self.walreceivers.get_num())
590 146 : }
591 :
592 : /// Returns commit_lsn watch channel.
593 730 : pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
594 730 : self.commit_lsn_watch_rx.clone()
595 730 : }
596 :
597 : /// Returns term_flush_lsn watch channel.
598 16 : pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
599 16 : self.term_flush_lsn_watch_rx.clone()
600 16 : }
601 :
602 : /// Pass arrived message to the safekeeper.
603 3274394 : pub async fn process_msg(
604 3274394 : &self,
605 3274394 : msg: &ProposerAcceptorMessage,
606 3274394 : ) -> Result<Option<AcceptorProposerMessage>> {
607 3274394 : if self.is_cancelled() {
608 UBC 0 : bail!(TimelineError::Cancelled(self.ttid));
609 CBC 3274394 : }
610 :
611 : let mut rmsg: Option<AcceptorProposerMessage>;
612 : let commit_lsn: Lsn;
613 : let term_flush_lsn: TermLsn;
614 : {
615 3274394 : let mut shared_state = self.write_shared_state().await;
616 3985426 : rmsg = shared_state.sk.process_msg(msg).await?;
617 :
618 : // if this is AppendResponse, fill in proper pageserver and hot
619 : // standby feedback.
620 3274393 : if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
621 1294232 : let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks();
622 1294232 : resp.hs_feedback = hs_feedback;
623 1294232 : resp.pageserver_feedback = ps_feedback;
624 1980161 : }
625 :
626 3274393 : commit_lsn = shared_state.sk.inmem.commit_lsn;
627 3274393 : term_flush_lsn =
628 3274393 : TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
629 3274393 : }
630 3274393 : self.commit_lsn_watch_tx.send(commit_lsn)?;
631 3274393 : self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
632 3274393 : Ok(rmsg)
633 3274393 : }
634 :
635 : /// Returns wal_seg_size.
636 12 : pub async fn get_wal_seg_size(&self) -> usize {
637 12 : self.write_shared_state().await.get_wal_seg_size()
638 12 : }
639 :
640 : /// Returns true only if the timeline is loaded and active.
641 11121 : pub async fn is_active(&self) -> bool {
642 11121 : if self.is_cancelled() {
643 UBC 0 : return false;
644 CBC 11121 : }
645 11121 :
646 11121 : self.write_shared_state().await.active
647 11121 : }
648 :
649 : /// Returns state of the timeline.
650 2736 : pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
651 2736 : let state = self.write_shared_state().await;
652 2736 : (state.sk.inmem.clone(), state.sk.state.clone())
653 2736 : }
654 :
655 : /// Returns latest backup_lsn.
656 312 : pub async fn get_wal_backup_lsn(&self) -> Lsn {
657 312 : self.write_shared_state().await.sk.inmem.backup_lsn
658 312 : }
659 :
660 : /// Sets backup_lsn to the given value.
661 13 : pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
662 13 : if self.is_cancelled() {
663 UBC 0 : bail!(TimelineError::Cancelled(self.ttid));
664 CBC 13 : }
665 :
666 13 : let mut state = self.write_shared_state().await;
667 13 : state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn);
668 13 : // we should check whether to shut down offloader, but this will be done
669 13 : // soon by peer communication anyway.
670 13 : Ok(())
671 13 : }
672 :
673 : /// Get safekeeper info for broadcasting to broker and other peers.
674 8661 : pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
675 8661 : let shared_state = self.write_shared_state().await;
676 8661 : shared_state.get_safekeeper_info(
677 8661 : &self.ttid,
678 8661 : conf,
679 8661 : self.walsenders.get_remote_consistent_lsn(),
680 8661 : )
681 8661 : }
682 :
683 : /// Update timeline state with peer safekeeper data.
684 11960 : pub async fn record_safekeeper_info(&self, mut sk_info: SafekeeperTimelineInfo) -> Result<()> {
685 11960 : // Update local remote_consistent_lsn in memory (in .walsenders) and in
686 11960 : // sk_info to pass it down to control file.
687 11960 : sk_info.remote_consistent_lsn = self
688 11960 : .walsenders
689 11960 : .update_remote_consistent_lsn(Lsn(sk_info.remote_consistent_lsn))
690 11960 : .0;
691 : let is_wal_backup_action_pending: bool;
692 : let commit_lsn: Lsn;
693 : {
694 11960 : let mut shared_state = self.write_shared_state().await;
695 11960 : shared_state.sk.record_safekeeper_info(&sk_info).await?;
696 11960 : let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
697 11960 : shared_state.peers_info.upsert(&peer_info);
698 11960 : is_wal_backup_action_pending = self.update_status(&mut shared_state).await;
699 11960 : commit_lsn = shared_state.sk.inmem.commit_lsn;
700 11960 : }
701 11960 : self.commit_lsn_watch_tx.send(commit_lsn)?;
702 : // Wake up wal backup launcher, if it is time to stop the offloading.
703 11960 : if is_wal_backup_action_pending {
704 10555 : self.wal_backup_launcher_tx.send(self.ttid).await?;
705 1405 : }
706 11960 : Ok(())
707 11960 : }
708 :
709 538 : pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
710 538 : let shared_state = self.write_shared_state().await;
711 538 : shared_state.get_peers(conf.heartbeat_timeout)
712 538 : }
713 :
714 : /// Should we start fetching WAL from a peer safekeeper, and if yes, from
715 : /// which? Answer is yes, i.e. .donors is not empty if 1) there is something
716 : /// to fetch, and we can do that without running elections; 2) there is no
717 : /// actively streaming compute, as we don't want to compete with it.
718 : ///
719 : /// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal
720 : /// to its last_log_term so we are sure such a leader ever had been elected.
721 : ///
722 : /// All possible donors are returned so that we could keep connection to the
723 : /// current one if it is good even if it slightly lags behind.
724 : ///
725 : /// Note that term conditions above might be not met, but safekeepers are
726 : /// still not aligned on last flush_lsn. Generally in this case until
727 : /// elections are run it is not possible to say which safekeeper should
728 : /// recover from which one -- history which would be committed is different
729 : /// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
730 : /// Thus we don't try to predict it here.
731 3 : pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
732 3 : let ss = self.write_shared_state().await;
733 3 : let term = ss.sk.state.acceptor_state.term;
734 3 : let last_log_term = ss.sk.get_epoch();
735 3 : let flush_lsn = ss.sk.flush_lsn();
736 3 : // note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us.
737 3 : let mut peers = ss.get_peers(heartbeat_timeout);
738 3 : // Sort by <last log term, lsn> pairs.
739 5 : peers.sort_by(|p1, p2| {
740 5 : let tl1 = TermLsn {
741 5 : term: p1.last_log_term,
742 5 : lsn: p1.flush_lsn,
743 5 : };
744 5 : let tl2 = TermLsn {
745 5 : term: p2.last_log_term,
746 5 : lsn: p2.flush_lsn,
747 5 : };
748 5 : tl2.cmp(&tl1) // desc
749 5 : });
750 3 : let num_streaming_computes = self.walreceivers.get_num_streaming();
751 3 : let donors = if num_streaming_computes > 0 {
752 UBC 0 : vec![] // If there is a streaming compute, don't try to recover to not intervene.
753 : } else {
754 CBC 3 : peers
755 3 : .iter()
756 6 : .filter_map(|candidate| {
757 6 : // Are we interested in this candidate?
758 6 : let candidate_tl = TermLsn {
759 6 : term: candidate.last_log_term,
760 6 : lsn: candidate.flush_lsn,
761 6 : };
762 6 : let my_tl = TermLsn {
763 6 : term: last_log_term,
764 6 : lsn: flush_lsn,
765 6 : };
766 6 : if my_tl < candidate_tl {
767 : // Yes, we are interested. Can we pull from it without
768 : // (re)running elections? It is possible if 1) his term
769 : // is equal to his last_log_term so we could act on
770 : // behalf of leader of this term (we must be sure he was
771 : // ever elected) and 2) our term is not higher, or we'll refuse data.
772 2 : if candidate.term == candidate.last_log_term && candidate.term >= term {
773 2 : Some(Donor::from(candidate))
774 : } else {
775 UBC 0 : None
776 : }
777 : } else {
778 CBC 4 : None
779 : }
780 6 : })
781 3 : .collect()
782 : };
783 3 : RecoveryNeededInfo {
784 3 : term,
785 3 : last_log_term,
786 3 : flush_lsn,
787 3 : peers,
788 3 : num_streaming_computes,
789 3 : donors,
790 3 : }
791 3 : }
792 :
793 17190 : pub fn get_walsenders(&self) -> &Arc<WalSenders> {
794 17190 : &self.walsenders
795 17190 : }
796 :
797 1993 : pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
798 1993 : &self.walreceivers
799 1993 : }
800 :
801 : /// Returns flush_lsn.
802 525 : pub async fn get_flush_lsn(&self) -> Lsn {
803 525 : self.write_shared_state().await.sk.wal_store.flush_lsn()
804 525 : }
805 :
806 : /// Delete WAL segments from disk that are no longer needed. This is determined
807 : /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
808 1846 : pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
809 1846 : if self.is_cancelled() {
810 UBC 0 : bail!(TimelineError::Cancelled(self.ttid));
811 CBC 1846 : }
812 :
813 : let horizon_segno: XLogSegNo;
814 29 : let remover = {
815 1846 : let shared_state = self.write_shared_state().await;
816 1846 : horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
817 1846 : if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
818 1817 : return Ok(()); // nothing to do
819 29 : }
820 29 :
821 29 : // release the lock before removing
822 29 : shared_state.sk.wal_store.remove_up_to(horizon_segno - 1)
823 29 : };
824 29 :
825 29 : // delete old WAL files
826 43 : remover.await?;
827 :
828 : // update last_removed_segno
829 29 : let mut shared_state = self.write_shared_state().await;
830 29 : shared_state.last_removed_segno = horizon_segno;
831 29 : Ok(())
832 1846 : }
833 :
834 : /// Persist control file if there is something to save and enough time
835 : /// passed after the last save. This helps to keep remote_consistent_lsn up
836 : /// to date so that storage nodes restart doesn't cause many pageserver ->
837 : /// safekeeper reconnections.
838 1846 : pub async fn maybe_persist_control_file(&self) -> Result<()> {
839 1846 : let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn();
840 1846 : self.write_shared_state()
841 106 : .await
842 : .sk
843 1846 : .maybe_persist_inmem_control_file(remote_consistent_lsn)
844 6 : .await
845 1846 : }
846 :
847 : /// Gather timeline data for metrics. If the timeline is not active, returns
848 : /// None, we do not collect these.
849 51 : pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
850 51 : if self.is_cancelled() {
851 UBC 0 : return None;
852 CBC 51 : }
853 51 :
854 51 : let ps_feedback = self.walsenders.get_ps_feedback();
855 51 : let state = self.write_shared_state().await;
856 51 : if state.active {
857 51 : Some(FullTimelineInfo {
858 51 : ttid: self.ttid,
859 51 : ps_feedback,
860 51 : wal_backup_active: state.wal_backup_active,
861 51 : timeline_is_active: state.active,
862 51 : num_computes: self.walreceivers.get_num() as u32,
863 51 : last_removed_segno: state.last_removed_segno,
864 51 : epoch_start_lsn: state.sk.epoch_start_lsn,
865 51 : mem_state: state.sk.inmem.clone(),
866 51 : persisted_state: state.sk.state.clone(),
867 51 : flush_lsn: state.sk.wal_store.flush_lsn(),
868 51 : remote_consistent_lsn: self.get_walsenders().get_remote_consistent_lsn(),
869 51 : wal_storage: state.sk.wal_store.get_metrics(),
870 51 : })
871 : } else {
872 UBC 0 : None
873 : }
874 CBC 51 : }
875 :
876 : /// Returns in-memory timeline state to build a full debug dump.
877 5 : pub async fn memory_dump(&self) -> debug_dump::Memory {
878 5 : let state = self.write_shared_state().await;
879 :
880 5 : let (write_lsn, write_record_lsn, flush_lsn, file_open) =
881 5 : state.sk.wal_store.internal_state();
882 5 :
883 5 : debug_dump::Memory {
884 5 : is_cancelled: self.is_cancelled(),
885 5 : peers_info_len: state.peers_info.0.len(),
886 5 : walsenders: self.walsenders.get_all(),
887 5 : wal_backup_active: state.wal_backup_active,
888 5 : active: state.active,
889 5 : num_computes: self.walreceivers.get_num() as u32,
890 5 : last_removed_segno: state.last_removed_segno,
891 5 : epoch_start_lsn: state.sk.epoch_start_lsn,
892 5 : mem_state: state.sk.inmem.clone(),
893 5 : write_lsn,
894 5 : write_record_lsn,
895 5 : flush_lsn,
896 5 : file_open,
897 5 : }
898 5 : }
899 : }
900 :
901 : /// Deletes directory and it's contents. Returns false if directory does not exist.
902 25 : async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
903 25 : match fs::remove_dir_all(path).await {
904 11 : Ok(_) => Ok(true),
905 14 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
906 UBC 0 : Err(e) => Err(e.into()),
907 : }
908 CBC 25 : }
|