TLA Line data Source code
1 : //! This module implements Timeline lifecycle management and has all necessary code
2 : //! to glue together SafeKeeper and all other background services.
3 :
4 : use anyhow::{anyhow, bail, Result};
5 : use camino::Utf8PathBuf;
6 : use postgres_ffi::XLogSegNo;
7 : use serde::{Deserialize, Serialize};
8 : use serde_with::serde_as;
9 : use tokio::fs;
10 :
11 : use serde_with::DisplayFromStr;
12 : use std::cmp::max;
13 : use std::sync::Arc;
14 : use tokio::sync::{Mutex, MutexGuard};
15 : use tokio::{
16 : sync::{mpsc::Sender, watch},
17 : time::Instant,
18 : };
19 : use tracing::*;
20 : use utils::http::error::ApiError;
21 : use utils::{
22 : id::{NodeId, TenantTimelineId},
23 : lsn::Lsn,
24 : };
25 :
26 : use storage_broker::proto::SafekeeperTimelineInfo;
27 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
28 :
29 : use crate::receive_wal::WalReceivers;
30 : use crate::recovery::recovery_main;
31 : use crate::safekeeper::{
32 : AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
33 : SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
34 : };
35 : use crate::send_wal::WalSenders;
36 : use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
37 :
38 : use crate::metrics::FullTimelineInfo;
39 : use crate::wal_storage::Storage as wal_storage_iface;
40 : use crate::SafeKeeperConf;
41 : use crate::{debug_dump, wal_storage};
42 :
43 : /// Things safekeeper should know about timeline state on peers.
44 : #[serde_as]
45 CBC 10902 : #[derive(Debug, Clone, Serialize, Deserialize)]
46 : pub struct PeerInfo {
47 : pub sk_id: NodeId,
48 : /// Term of the last entry.
49 : _last_log_term: Term,
50 : /// LSN of the last record.
51 : #[serde_as(as = "DisplayFromStr")]
52 : _flush_lsn: Lsn,
53 : #[serde_as(as = "DisplayFromStr")]
54 : pub commit_lsn: Lsn,
55 : /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
56 : /// sk since backup_lsn.
57 : #[serde_as(as = "DisplayFromStr")]
58 : pub local_start_lsn: Lsn,
59 : /// When info was received. Serde annotations are not very useful but make
60 : /// the code compile -- we don't rely on this field externally.
61 : #[serde(skip)]
62 : #[serde(default = "Instant::now")]
63 : ts: Instant,
64 : }
65 :
66 : impl PeerInfo {
67 9742 : fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
68 9742 : PeerInfo {
69 9742 : sk_id: NodeId(sk_info.safekeeper_id),
70 9742 : _last_log_term: sk_info.last_log_term,
71 9742 : _flush_lsn: Lsn(sk_info.flush_lsn),
72 9742 : commit_lsn: Lsn(sk_info.commit_lsn),
73 9742 : local_start_lsn: Lsn(sk_info.local_start_lsn),
74 9742 : ts,
75 9742 : }
76 9742 : }
77 : }
78 :
79 : // vector-based node id -> peer state map with very limited functionality we
80 : // need.
81 UBC 0 : #[derive(Debug, Clone, Default)]
82 : pub struct PeersInfo(pub Vec<PeerInfo>);
83 :
84 : impl PeersInfo {
85 CBC 9742 : fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
86 13581 : self.0.iter_mut().find(|p| p.sk_id == id)
87 9742 : }
88 :
89 9742 : fn upsert(&mut self, p: &PeerInfo) {
90 9742 : match self.get(p.sk_id) {
91 8898 : Some(rp) => *rp = p.clone(),
92 844 : None => self.0.push(p.clone()),
93 : }
94 9742 : }
95 : }
96 :
97 : /// Shared state associated with database instance
98 : pub struct SharedState {
99 : /// Safekeeper object
100 : sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
101 : /// In memory list containing state of peers sent in latest messages from them.
102 : peers_info: PeersInfo,
103 : /// True when WAL backup launcher oversees the timeline, making sure WAL is
104 : /// offloaded, allows to bother launcher less.
105 : wal_backup_active: bool,
106 : /// True whenever there is at least some pending activity on timeline: live
107 : /// compute connection, pageserver is not caughtup (it must have latest WAL
108 : /// for new compute start) or WAL backuping is not finished. Practically it
109 : /// means safekeepers broadcast info to peers about the timeline, old WAL is
110 : /// trimmed.
111 : ///
112 : /// TODO: it might be better to remove tli completely from GlobalTimelines
113 : /// when tli is inactive instead of having this flag.
114 : active: bool,
115 : last_removed_segno: XLogSegNo,
116 : }
117 :
118 : impl SharedState {
119 : /// Initialize fresh timeline state without persisting anything to disk.
120 481 : fn create_new(
121 481 : conf: &SafeKeeperConf,
122 481 : ttid: &TenantTimelineId,
123 481 : state: SafeKeeperState,
124 481 : ) -> Result<Self> {
125 481 : if state.server.wal_seg_size == 0 {
126 UBC 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
127 CBC 481 : }
128 481 :
129 481 : if state.server.pg_version == UNKNOWN_SERVER_VERSION {
130 UBC 0 : bail!(TimelineError::UninitialinzedPgVersion(*ttid));
131 CBC 481 : }
132 481 :
133 481 : if state.commit_lsn < state.local_start_lsn {
134 UBC 0 : bail!(
135 0 : "commit_lsn {} is higher than local_start_lsn {}",
136 0 : state.commit_lsn,
137 0 : state.local_start_lsn
138 0 : );
139 CBC 481 : }
140 :
141 : // We don't want to write anything to disk, because we may have existing timeline there.
142 : // These functions should not change anything on disk.
143 481 : let control_store = control_file::FileStorage::create_new(ttid, conf, state)?;
144 481 : let wal_store =
145 481 : wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
146 481 : let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
147 :
148 481 : Ok(Self {
149 481 : sk,
150 481 : peers_info: PeersInfo(vec![]),
151 481 : wal_backup_active: false,
152 481 : active: false,
153 481 : last_removed_segno: 0,
154 481 : })
155 481 : }
156 :
157 : /// Restore SharedState from control file. If file doesn't exist, bails out.
158 84 : fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
159 84 : let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
160 84 : if control_store.server.wal_seg_size == 0 {
161 UBC 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
162 CBC 84 : }
163 :
164 84 : let wal_store =
165 84 : wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
166 :
167 : Ok(Self {
168 84 : sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
169 84 : peers_info: PeersInfo(vec![]),
170 : wal_backup_active: false,
171 : active: false,
172 : last_removed_segno: 0,
173 : })
174 84 : }
175 :
176 14253 : fn is_active(&self, num_computes: usize, remote_consistent_lsn: Lsn) -> bool {
177 14253 : self.is_wal_backup_required(num_computes)
178 : // FIXME: add tracking of relevant pageservers and check them here individually,
179 : // otherwise migration won't work (we suspend too early).
180 2514 : || remote_consistent_lsn < self.sk.inmem.commit_lsn
181 14253 : }
182 :
183 : /// Mark timeline active/inactive and return whether s3 offloading requires
184 : /// start/stop action.
185 14253 : fn update_status(
186 14253 : &mut self,
187 14253 : num_computes: usize,
188 14253 : remote_consistent_lsn: Lsn,
189 14253 : ttid: TenantTimelineId,
190 14253 : ) -> bool {
191 14253 : let is_active = self.is_active(num_computes, remote_consistent_lsn);
192 14253 : if self.active != is_active {
193 1563 : info!("timeline {} active={} now", ttid, is_active);
194 12690 : }
195 14253 : self.active = is_active;
196 14253 : self.is_wal_backup_action_pending(num_computes)
197 14253 : }
198 :
199 : /// Should we run s3 offloading in current state?
200 40074 : fn is_wal_backup_required(&self, num_computes: usize) -> bool {
201 40074 : let seg_size = self.get_wal_seg_size();
202 40074 : num_computes > 0 ||
203 : // Currently only the whole segment is offloaded, so compare segment numbers.
204 9697 : (self.sk.inmem.commit_lsn.segment_number(seg_size) >
205 9697 : self.sk.inmem.backup_lsn.segment_number(seg_size))
206 40074 : }
207 :
208 : /// Is current state of s3 offloading is not what it ought to be?
209 14253 : fn is_wal_backup_action_pending(&self, num_computes: usize) -> bool {
210 14253 : let res = self.wal_backup_active != self.is_wal_backup_required(num_computes);
211 14253 : if res {
212 11376 : let action_pending = if self.is_wal_backup_required(num_computes) {
213 11298 : "start"
214 : } else {
215 78 : "stop"
216 : };
217 11376 : trace!(
218 UBC 0 : "timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
219 0 : self.sk.state.timeline_id, action_pending, num_computes, self.sk.inmem.commit_lsn, self.sk.inmem.backup_lsn
220 0 : );
221 CBC 2877 : }
222 14253 : res
223 14253 : }
224 :
225 : /// Returns whether s3 offloading is required and sets current status as
226 : /// matching.
227 192 : fn wal_backup_attend(&mut self, num_computes: usize) -> bool {
228 192 : self.wal_backup_active = self.is_wal_backup_required(num_computes);
229 192 : self.wal_backup_active
230 192 : }
231 :
232 40099 : fn get_wal_seg_size(&self) -> usize {
233 40099 : self.sk.state.server.wal_seg_size as usize
234 40099 : }
235 :
236 6665 : fn get_safekeeper_info(
237 6665 : &self,
238 6665 : ttid: &TenantTimelineId,
239 6665 : conf: &SafeKeeperConf,
240 6665 : remote_consistent_lsn: Lsn,
241 6665 : ) -> SafekeeperTimelineInfo {
242 6665 : SafekeeperTimelineInfo {
243 6665 : safekeeper_id: conf.my_id.0,
244 6665 : tenant_timeline_id: Some(ProtoTenantTimelineId {
245 6665 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
246 6665 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
247 6665 : }),
248 6665 : term: self.sk.state.acceptor_state.term,
249 6665 : last_log_term: self.sk.get_epoch(),
250 6665 : flush_lsn: self.sk.flush_lsn().0,
251 6665 : // note: this value is not flushed to control file yet and can be lost
252 6665 : commit_lsn: self.sk.inmem.commit_lsn.0,
253 6665 : remote_consistent_lsn: remote_consistent_lsn.0,
254 6665 : peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0,
255 6665 : safekeeper_connstr: conf
256 6665 : .advertise_pg_addr
257 6665 : .to_owned()
258 6665 : .unwrap_or(conf.listen_pg_addr.clone()),
259 6665 : http_connstr: conf.listen_http_addr.to_owned(),
260 6665 : backup_lsn: self.sk.inmem.backup_lsn.0,
261 6665 : local_start_lsn: self.sk.state.local_start_lsn.0,
262 6665 : availability_zone: conf.availability_zone.clone(),
263 6665 : }
264 6665 : }
265 : }
266 :
267 UBC 0 : #[derive(Debug, thiserror::Error)]
268 : pub enum TimelineError {
269 : #[error("Timeline {0} was cancelled and cannot be used anymore")]
270 : Cancelled(TenantTimelineId),
271 : #[error("Timeline {0} was not found in global map")]
272 : NotFound(TenantTimelineId),
273 : #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
274 : Invalid(TenantTimelineId),
275 : #[error("Timeline {0} is already exists")]
276 : AlreadyExists(TenantTimelineId),
277 : #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
278 : UninitializedWalSegSize(TenantTimelineId),
279 : #[error("Timeline {0} is not initialized, pg_version is unknown")]
280 : UninitialinzedPgVersion(TenantTimelineId),
281 : }
282 :
283 : // Convert to HTTP API error.
284 : impl From<TimelineError> for ApiError {
285 0 : fn from(te: TimelineError) -> ApiError {
286 0 : match te {
287 0 : TimelineError::NotFound(ttid) => {
288 0 : ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
289 : }
290 0 : _ => ApiError::InternalServerError(anyhow!("{}", te)),
291 : }
292 0 : }
293 : }
294 :
295 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
296 : /// It also holds SharedState and provides mutually exclusive access to it.
297 : pub struct Timeline {
298 : pub ttid: TenantTimelineId,
299 :
300 : /// Sending here asks for wal backup launcher attention (start/stop
301 : /// offloading). Sending ttid instead of concrete command allows to do
302 : /// sending without timeline lock.
303 : pub wal_backup_launcher_tx: Sender<TenantTimelineId>,
304 :
305 : /// Used to broadcast commit_lsn updates to all background jobs.
306 : commit_lsn_watch_tx: watch::Sender<Lsn>,
307 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
308 :
309 : /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
310 : /// them when sending in recovery mode (to walproposer or peers). Note: this
311 : /// is just a notification, WAL reading should always done with lock held as
312 : /// term can change otherwise.
313 : term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
314 : term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
315 :
316 : /// Safekeeper and other state, that should remain consistent and
317 : /// synchronized with the disk. This is tokio mutex as we write WAL to disk
318 : /// while holding it, ensuring that consensus checks are in order.
319 : mutex: Mutex<SharedState>,
320 : walsenders: Arc<WalSenders>,
321 : walreceivers: Arc<WalReceivers>,
322 :
323 : /// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
324 : cancellation_tx: watch::Sender<bool>,
325 :
326 : /// Timeline should not be used after cancellation. Background tasks should
327 : /// monitor this channel and stop eventually after receiving `true` from this channel.
328 : cancellation_rx: watch::Receiver<bool>,
329 :
330 : /// Directory where timeline state is stored.
331 : pub timeline_dir: Utf8PathBuf,
332 : }
333 :
334 : impl Timeline {
335 : /// Load existing timeline from disk.
336 CBC 84 : pub fn load_timeline(
337 84 : conf: &SafeKeeperConf,
338 84 : ttid: TenantTimelineId,
339 84 : wal_backup_launcher_tx: Sender<TenantTimelineId>,
340 84 : ) -> Result<Timeline> {
341 84 : let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
342 :
343 84 : let shared_state = SharedState::restore(conf, &ttid)?;
344 84 : let rcl = shared_state.sk.state.remote_consistent_lsn;
345 84 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
346 84 : watch::channel(shared_state.sk.state.commit_lsn);
347 84 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
348 84 : shared_state.sk.get_term(),
349 84 : shared_state.sk.flush_lsn(),
350 84 : )));
351 84 : let (cancellation_tx, cancellation_rx) = watch::channel(false);
352 84 :
353 84 : Ok(Timeline {
354 84 : ttid,
355 84 : wal_backup_launcher_tx,
356 84 : commit_lsn_watch_tx,
357 84 : commit_lsn_watch_rx,
358 84 : term_flush_lsn_watch_tx,
359 84 : term_flush_lsn_watch_rx,
360 84 : mutex: Mutex::new(shared_state),
361 84 : walsenders: WalSenders::new(rcl),
362 84 : walreceivers: WalReceivers::new(),
363 84 : cancellation_rx,
364 84 : cancellation_tx,
365 84 : timeline_dir: conf.timeline_dir(&ttid),
366 84 : })
367 84 : }
368 :
369 : /// Create a new timeline, which is not yet persisted to disk.
370 481 : pub fn create_empty(
371 481 : conf: &SafeKeeperConf,
372 481 : ttid: TenantTimelineId,
373 481 : wal_backup_launcher_tx: Sender<TenantTimelineId>,
374 481 : server_info: ServerInfo,
375 481 : commit_lsn: Lsn,
376 481 : local_start_lsn: Lsn,
377 481 : ) -> Result<Timeline> {
378 481 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
379 481 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
380 481 : watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
381 481 : let (cancellation_tx, cancellation_rx) = watch::channel(false);
382 481 : let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
383 481 :
384 481 : Ok(Timeline {
385 481 : ttid,
386 481 : wal_backup_launcher_tx,
387 481 : commit_lsn_watch_tx,
388 481 : commit_lsn_watch_rx,
389 481 : term_flush_lsn_watch_tx,
390 481 : term_flush_lsn_watch_rx,
391 481 : mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
392 481 : walsenders: WalSenders::new(Lsn(0)),
393 481 : walreceivers: WalReceivers::new(),
394 481 : cancellation_rx,
395 481 : cancellation_tx,
396 481 : timeline_dir: conf.timeline_dir(&ttid),
397 : })
398 481 : }
399 :
400 : /// Initialize fresh timeline on disk and start background tasks. If init
401 : /// fails, timeline is cancelled and cannot be used anymore.
402 : ///
403 : /// Init is transactional, so if it fails, created files will be deleted,
404 : /// and state on disk should remain unchanged.
405 481 : pub async fn init_new(
406 481 : self: &Arc<Timeline>,
407 481 : shared_state: &mut MutexGuard<'_, SharedState>,
408 481 : conf: &SafeKeeperConf,
409 481 : ) -> Result<()> {
410 548 : match fs::metadata(&self.timeline_dir).await {
411 : Ok(_) => {
412 : // Timeline directory exists on disk, we should leave state unchanged
413 : // and return error.
414 UBC 0 : bail!(TimelineError::Invalid(self.ttid));
415 : }
416 CBC 481 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
417 UBC 0 : Err(e) => {
418 0 : return Err(e.into());
419 : }
420 : }
421 :
422 : // Create timeline directory.
423 CBC 495 : fs::create_dir_all(&self.timeline_dir).await?;
424 :
425 : // Write timeline to disk and start background tasks.
426 1559 : if let Err(e) = shared_state.sk.persist().await {
427 : // Bootstrap failed, cancel timeline and remove timeline directory.
428 UBC 0 : self.cancel(shared_state);
429 :
430 0 : if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
431 0 : warn!(
432 0 : "failed to remove timeline {} directory after bootstrap failure: {}",
433 0 : self.ttid, fs_err
434 0 : );
435 0 : }
436 :
437 0 : return Err(e);
438 CBC 481 : }
439 481 : self.bootstrap(conf);
440 481 : Ok(())
441 481 : }
442 :
443 : /// Bootstrap new or existing timeline starting background stasks.
444 565 : pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
445 565 : // Start recovery task which always runs on the timeline.
446 565 : tokio::spawn(recovery_main(self.clone(), conf.clone()));
447 565 : }
448 :
449 : /// Delete timeline from disk completely, by removing timeline directory. Background
450 : /// timeline activities will stop eventually.
451 31 : pub async fn delete_from_disk(
452 31 : &self,
453 31 : shared_state: &mut MutexGuard<'_, SharedState>,
454 31 : ) -> Result<(bool, bool)> {
455 31 : let was_active = shared_state.active;
456 31 : self.cancel(shared_state);
457 31 : let dir_existed = delete_dir(&self.timeline_dir).await?;
458 31 : Ok((dir_existed, was_active))
459 31 : }
460 :
461 : /// Cancel timeline to prevent further usage. Background tasks will stop
462 : /// eventually after receiving cancellation signal.
463 : ///
464 : /// Note that we can't notify backup launcher here while holding
465 : /// shared_state lock, as this is a potential deadlock: caller is
466 : /// responsible for that. Generally we should probably make WAL backup tasks
467 : /// to shut down on their own, checking once in a while whether it is the
468 : /// time.
469 31 : fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
470 31 : info!("timeline {} is cancelled", self.ttid);
471 31 : let _ = self.cancellation_tx.send(true);
472 31 : // Close associated FDs. Nobody will be able to touch timeline data once
473 31 : // it is cancelled, so WAL storage won't be opened again.
474 31 : shared_state.sk.wal_store.close();
475 31 : }
476 :
477 : /// Returns if timeline is cancelled.
478 5413907 : pub fn is_cancelled(&self) -> bool {
479 5413907 : *self.cancellation_rx.borrow()
480 5413907 : }
481 :
482 : /// Returns watch channel which gets value when timeline is cancelled. It is
483 : /// guaranteed to have not cancelled value observed (errors otherwise).
484 565 : pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
485 565 : let rx = self.cancellation_rx.clone();
486 565 : if *rx.borrow() {
487 UBC 0 : bail!(TimelineError::Cancelled(self.ttid));
488 CBC 565 : }
489 565 : Ok(rx)
490 565 : }
491 :
492 : /// Take a writing mutual exclusive lock on timeline shared_state.
493 5416017 : pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
494 5416017 : self.mutex.lock().await
495 5416017 : }
496 :
497 14253 : fn update_status(&self, shared_state: &mut SharedState) -> bool {
498 14253 : shared_state.update_status(
499 14253 : self.walreceivers.get_num(),
500 14253 : self.get_walsenders().get_remote_consistent_lsn(),
501 14253 : self.ttid,
502 14253 : )
503 14253 : }
504 :
505 : /// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
506 4511 : pub async fn update_status_notify(&self) -> Result<()> {
507 4511 : if self.is_cancelled() {
508 UBC 0 : bail!(TimelineError::Cancelled(self.ttid));
509 CBC 4511 : }
510 4511 : let is_wal_backup_action_pending: bool = {
511 4511 : let mut shared_state = self.write_shared_state().await;
512 4511 : self.update_status(&mut shared_state)
513 4511 : };
514 4511 : if is_wal_backup_action_pending {
515 : // Can fail only if channel to a static thread got closed, which is not normal at all.
516 2866 : self.wal_backup_launcher_tx.send(self.ttid).await?;
517 1645 : }
518 4511 : Ok(())
519 4511 : }
520 :
521 : /// Returns true if walsender should stop sending WAL to pageserver. We
522 : /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
523 : /// computes. While there might be nothing to stream already, we learn about
524 : /// remote_consistent_lsn update through replication feedback, and we want
525 : /// to stop pushing to the broker if pageserver is fully caughtup.
526 1999 : pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
527 1999 : if self.is_cancelled() {
528 LBC (1) : return true;
529 CBC 1999 : }
530 1999 : let shared_state = self.write_shared_state().await;
531 1999 : if self.walreceivers.get_num() == 0 {
532 463 : return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
533 463 : reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn;
534 1536 : }
535 1536 : false
536 1999 : }
537 :
538 : /// Ensure taht current term is t, erroring otherwise, and lock the state.
539 1 : pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
540 1 : let ss = self.write_shared_state().await;
541 1 : if ss.sk.state.acceptor_state.term != t {
542 1 : bail!(
543 1 : "failed to acquire term {}, current term {}",
544 1 : t,
545 1 : ss.sk.state.acceptor_state.term
546 1 : );
547 UBC 0 : }
548 0 : Ok(ss)
549 CBC 1 : }
550 :
551 : /// Returns whether s3 offloading is required and sets current status as
552 : /// matching it.
553 192 : pub async fn wal_backup_attend(&self) -> bool {
554 192 : if self.is_cancelled() {
555 UBC 0 : return false;
556 CBC 192 : }
557 192 :
558 192 : self.write_shared_state()
559 39 : .await
560 192 : .wal_backup_attend(self.walreceivers.get_num())
561 192 : }
562 :
563 : /// Returns commit_lsn watch channel.
564 714 : pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
565 714 : self.commit_lsn_watch_rx.clone()
566 714 : }
567 :
568 : /// Returns term_flush_lsn watch channel.
569 94 : pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
570 94 : self.term_flush_lsn_watch_rx.clone()
571 94 : }
572 :
573 : /// Pass arrived message to the safekeeper.
574 5377466 : pub async fn process_msg(
575 5377466 : &self,
576 5377466 : msg: &ProposerAcceptorMessage,
577 5377466 : ) -> Result<Option<AcceptorProposerMessage>> {
578 5377466 : if self.is_cancelled() {
579 UBC 0 : bail!(TimelineError::Cancelled(self.ttid));
580 CBC 5377466 : }
581 :
582 : let mut rmsg: Option<AcceptorProposerMessage>;
583 : let commit_lsn: Lsn;
584 : let term_flush_lsn: TermLsn;
585 : {
586 5377466 : let mut shared_state = self.write_shared_state().await;
587 5377466 : rmsg = shared_state.sk.process_msg(msg).await?;
588 :
589 : // if this is AppendResponse, fill in proper pageserver and hot
590 : // standby feedback.
591 5377466 : if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
592 2459760 : let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks();
593 2459760 : resp.hs_feedback = hs_feedback;
594 2459760 : resp.pageserver_feedback = ps_feedback;
595 2917706 : }
596 :
597 5377466 : commit_lsn = shared_state.sk.inmem.commit_lsn;
598 5377466 : term_flush_lsn =
599 5377466 : TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
600 5377466 : }
601 5377466 : self.commit_lsn_watch_tx.send(commit_lsn)?;
602 5377466 : self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
603 5377466 : Ok(rmsg)
604 5377466 : }
605 :
606 : /// Returns wal_seg_size.
607 25 : pub async fn get_wal_seg_size(&self) -> usize {
608 25 : self.write_shared_state().await.get_wal_seg_size()
609 25 : }
610 :
611 : /// Returns true only if the timeline is loaded and active.
612 8132 : pub async fn is_active(&self) -> bool {
613 8132 : if self.is_cancelled() {
614 UBC 0 : return false;
615 CBC 8132 : }
616 8132 :
617 8132 : self.write_shared_state().await.active
618 8132 : }
619 :
620 : /// Returns state of the timeline.
621 2678 : pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
622 2678 : let state = self.write_shared_state().await;
623 2678 : (state.sk.inmem.clone(), state.sk.state.clone())
624 2678 : }
625 :
626 : /// Returns latest backup_lsn.
627 297 : pub async fn get_wal_backup_lsn(&self) -> Lsn {
628 297 : self.write_shared_state().await.sk.inmem.backup_lsn
629 297 : }
630 :
631 : /// Sets backup_lsn to the given value.
632 20 : pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
633 20 : if self.is_cancelled() {
634 UBC 0 : bail!(TimelineError::Cancelled(self.ttid));
635 CBC 20 : }
636 :
637 20 : let mut state = self.write_shared_state().await;
638 20 : state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn);
639 20 : // we should check whether to shut down offloader, but this will be done
640 20 : // soon by peer communication anyway.
641 20 : Ok(())
642 20 : }
643 :
644 : /// Get safekeeper info for broadcasting to broker and other peers.
645 6665 : pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
646 6665 : let shared_state = self.write_shared_state().await;
647 6665 : shared_state.get_safekeeper_info(
648 6665 : &self.ttid,
649 6665 : conf,
650 6665 : self.walsenders.get_remote_consistent_lsn(),
651 6665 : )
652 6665 : }
653 :
654 : /// Update timeline state with peer safekeeper data.
655 9742 : pub async fn record_safekeeper_info(&self, mut sk_info: SafekeeperTimelineInfo) -> Result<()> {
656 9742 : // Update local remote_consistent_lsn in memory (in .walsenders) and in
657 9742 : // sk_info to pass it down to control file.
658 9742 : sk_info.remote_consistent_lsn = self
659 9742 : .walsenders
660 9742 : .update_remote_consistent_lsn(Lsn(sk_info.remote_consistent_lsn))
661 9742 : .0;
662 : let is_wal_backup_action_pending: bool;
663 : let commit_lsn: Lsn;
664 : {
665 9742 : let mut shared_state = self.write_shared_state().await;
666 9742 : shared_state.sk.record_safekeeper_info(&sk_info).await?;
667 9742 : let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
668 9742 : shared_state.peers_info.upsert(&peer_info);
669 9742 : is_wal_backup_action_pending = self.update_status(&mut shared_state);
670 9742 : commit_lsn = shared_state.sk.inmem.commit_lsn;
671 9742 : }
672 9742 : self.commit_lsn_watch_tx.send(commit_lsn)?;
673 : // Wake up wal backup launcher, if it is time to stop the offloading.
674 9742 : if is_wal_backup_action_pending {
675 8510 : self.wal_backup_launcher_tx.send(self.ttid).await?;
676 1232 : }
677 9742 : Ok(())
678 9742 : }
679 :
680 : /// Get our latest view of alive peers status on the timeline.
681 : /// We pass our own info through the broker as well, so when we don't have connection
682 : /// to the broker returned vec is empty.
683 472 : pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
684 472 : let shared_state = self.write_shared_state().await;
685 472 : let now = Instant::now();
686 472 : shared_state
687 472 : .peers_info
688 472 : .0
689 472 : .iter()
690 472 : // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
691 1183 : .filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout)
692 472 : .cloned()
693 472 : .collect()
694 472 : }
695 :
696 15511 : pub fn get_walsenders(&self) -> &Arc<WalSenders> {
697 15511 : &self.walsenders
698 15511 : }
699 :
700 2224 : pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
701 2224 : &self.walreceivers
702 2224 : }
703 :
704 : /// Returns flush_lsn.
705 488 : pub async fn get_flush_lsn(&self) -> Lsn {
706 488 : self.write_shared_state().await.sk.wal_store.flush_lsn()
707 488 : }
708 :
709 : /// Delete WAL segments from disk that are no longer needed. This is determined
710 : /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
711 1363 : pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
712 1363 : if self.is_cancelled() {
713 UBC 0 : bail!(TimelineError::Cancelled(self.ttid));
714 CBC 1363 : }
715 :
716 : let horizon_segno: XLogSegNo;
717 35 : let remover = {
718 1363 : let shared_state = self.write_shared_state().await;
719 1363 : horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
720 1363 : if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
721 1328 : return Ok(()); // nothing to do
722 35 : }
723 35 :
724 35 : // release the lock before removing
725 35 : shared_state.sk.wal_store.remove_up_to(horizon_segno - 1)
726 35 : };
727 35 :
728 35 : // delete old WAL files
729 96 : remover.await?;
730 :
731 : // update last_removed_segno
732 35 : let mut shared_state = self.write_shared_state().await;
733 35 : shared_state.last_removed_segno = horizon_segno;
734 35 : Ok(())
735 1363 : }
736 :
737 : /// Persist control file if there is something to save and enough time
738 : /// passed after the last save. This helps to keep remote_consistent_lsn up
739 : /// to date so that storage nodes restart doesn't cause many pageserver ->
740 : /// safekeeper reconnections.
741 1363 : pub async fn maybe_persist_control_file(&self) -> Result<()> {
742 1363 : let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn();
743 1363 : self.write_shared_state()
744 35 : .await
745 : .sk
746 1363 : .maybe_persist_control_file(remote_consistent_lsn)
747 81 : .await
748 1363 : }
749 :
750 : /// Gather timeline data for metrics. If the timeline is not active, returns
751 : /// None, we do not collect these.
752 51 : pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
753 51 : if self.is_cancelled() {
754 UBC 0 : return None;
755 CBC 51 : }
756 51 :
757 51 : let ps_feedback = self.walsenders.get_ps_feedback();
758 51 : let state = self.write_shared_state().await;
759 51 : if state.active {
760 51 : Some(FullTimelineInfo {
761 51 : ttid: self.ttid,
762 51 : ps_feedback,
763 51 : wal_backup_active: state.wal_backup_active,
764 51 : timeline_is_active: state.active,
765 51 : num_computes: self.walreceivers.get_num() as u32,
766 51 : last_removed_segno: state.last_removed_segno,
767 51 : epoch_start_lsn: state.sk.epoch_start_lsn,
768 51 : mem_state: state.sk.inmem.clone(),
769 51 : persisted_state: state.sk.state.clone(),
770 51 : flush_lsn: state.sk.wal_store.flush_lsn(),
771 51 : remote_consistent_lsn: self.get_walsenders().get_remote_consistent_lsn(),
772 51 : wal_storage: state.sk.wal_store.get_metrics(),
773 51 : })
774 : } else {
775 UBC 0 : None
776 : }
777 CBC 51 : }
778 :
779 : /// Returns in-memory timeline state to build a full debug dump.
780 5 : pub async fn memory_dump(&self) -> debug_dump::Memory {
781 5 : let state = self.write_shared_state().await;
782 :
783 5 : let (write_lsn, write_record_lsn, flush_lsn, file_open) =
784 5 : state.sk.wal_store.internal_state();
785 5 :
786 5 : debug_dump::Memory {
787 5 : is_cancelled: self.is_cancelled(),
788 5 : peers_info_len: state.peers_info.0.len(),
789 5 : walsenders: self.walsenders.get_all(),
790 5 : wal_backup_active: state.wal_backup_active,
791 5 : active: state.active,
792 5 : num_computes: self.walreceivers.get_num() as u32,
793 5 : last_removed_segno: state.last_removed_segno,
794 5 : epoch_start_lsn: state.sk.epoch_start_lsn,
795 5 : mem_state: state.sk.inmem.clone(),
796 5 : write_lsn,
797 5 : write_record_lsn,
798 5 : flush_lsn,
799 5 : file_open,
800 5 : }
801 5 : }
802 : }
803 :
804 : /// Deletes directory and it's contents. Returns false if directory does not exist.
805 31 : async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
806 31 : match fs::remove_dir_all(path).await {
807 17 : Ok(_) => Ok(true),
808 14 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
809 UBC 0 : Err(e) => Err(e.into()),
810 : }
811 CBC 31 : }
|