Line data Source code
1 : //! This module implements Timeline lifecycle management and has all necessary code
2 : //! to glue together SafeKeeper and all other background services.
3 :
4 : use anyhow::{anyhow, bail, Result};
5 : use postgres_ffi::XLogSegNo;
6 : use serde::{Deserialize, Serialize};
7 : use serde_with::serde_as;
8 : use tokio::fs;
9 :
10 : use serde_with::DisplayFromStr;
11 : use std::cmp::max;
12 : use std::path::PathBuf;
13 : use std::sync::Arc;
14 : use tokio::sync::{Mutex, MutexGuard};
15 : use tokio::{
16 : sync::{mpsc::Sender, watch},
17 : time::Instant,
18 : };
19 : use tracing::*;
20 : use utils::http::error::ApiError;
21 : use utils::{
22 : id::{NodeId, TenantTimelineId},
23 : lsn::Lsn,
24 : };
25 :
26 : use storage_broker::proto::SafekeeperTimelineInfo;
27 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
28 :
29 : use crate::receive_wal::WalReceivers;
30 : use crate::recovery::recovery_main;
31 : use crate::safekeeper::{
32 : AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
33 : SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
34 : };
35 : use crate::send_wal::WalSenders;
36 : use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
37 :
38 : use crate::metrics::FullTimelineInfo;
39 : use crate::wal_storage::Storage as wal_storage_iface;
40 : use crate::SafeKeeperConf;
41 : use crate::{debug_dump, wal_storage};
42 :
43 : /// Things safekeeper should know about timeline state on peers.
44 : #[serde_as]
45 10150 : #[derive(Debug, Clone, Serialize, Deserialize)]
46 : pub struct PeerInfo {
47 : pub sk_id: NodeId,
48 : /// Term of the last entry.
49 : _last_log_term: Term,
50 : /// LSN of the last record.
51 : #[serde_as(as = "DisplayFromStr")]
52 : _flush_lsn: Lsn,
53 : #[serde_as(as = "DisplayFromStr")]
54 : pub commit_lsn: Lsn,
55 : /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
56 : /// sk since backup_lsn.
57 : #[serde_as(as = "DisplayFromStr")]
58 : pub local_start_lsn: Lsn,
59 : /// When info was received. Serde annotations are not very useful but make
60 : /// the code compile -- we don't rely on this field externally.
61 : #[serde(skip)]
62 : #[serde(default = "Instant::now")]
63 : ts: Instant,
64 : }
65 :
66 : impl PeerInfo {
67 9077 : fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
68 9077 : PeerInfo {
69 9077 : sk_id: NodeId(sk_info.safekeeper_id),
70 9077 : _last_log_term: sk_info.last_log_term,
71 9077 : _flush_lsn: Lsn(sk_info.flush_lsn),
72 9077 : commit_lsn: Lsn(sk_info.commit_lsn),
73 9077 : local_start_lsn: Lsn(sk_info.local_start_lsn),
74 9077 : ts,
75 9077 : }
76 9077 : }
77 : }
78 :
79 : // vector-based node id -> peer state map with very limited functionality we
80 : // need.
81 0 : #[derive(Debug, Clone, Default)]
82 : pub struct PeersInfo(pub Vec<PeerInfo>);
83 :
84 : impl PeersInfo {
85 9077 : fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
86 12312 : self.0.iter_mut().find(|p| p.sk_id == id)
87 9077 : }
88 :
89 9077 : fn upsert(&mut self, p: &PeerInfo) {
90 9077 : match self.get(p.sk_id) {
91 8189 : Some(rp) => *rp = p.clone(),
92 888 : None => self.0.push(p.clone()),
93 : }
94 9077 : }
95 : }
96 :
97 : /// Shared state associated with database instance
98 : pub struct SharedState {
99 : /// Safekeeper object
100 : sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
101 : /// In memory list containing state of peers sent in latest messages from them.
102 : peers_info: PeersInfo,
103 : /// True when WAL backup launcher oversees the timeline, making sure WAL is
104 : /// offloaded, allows to bother launcher less.
105 : wal_backup_active: bool,
106 : /// True whenever there is at least some pending activity on timeline: live
107 : /// compute connection, pageserver is not caughtup (it must have latest WAL
108 : /// for new compute start) or WAL backuping is not finished. Practically it
109 : /// means safekeepers broadcast info to peers about the timeline, old WAL is
110 : /// trimmed.
111 : ///
112 : /// TODO: it might be better to remove tli completely from GlobalTimelines
113 : /// when tli is inactive instead of having this flag.
114 : active: bool,
115 : num_computes: u32,
116 : last_removed_segno: XLogSegNo,
117 : }
118 :
119 : impl SharedState {
120 : /// Initialize fresh timeline state without persisting anything to disk.
121 523 : fn create_new(
122 523 : conf: &SafeKeeperConf,
123 523 : ttid: &TenantTimelineId,
124 523 : state: SafeKeeperState,
125 523 : ) -> Result<Self> {
126 523 : if state.server.wal_seg_size == 0 {
127 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
128 523 : }
129 523 :
130 523 : if state.server.pg_version == UNKNOWN_SERVER_VERSION {
131 0 : bail!(TimelineError::UninitialinzedPgVersion(*ttid));
132 523 : }
133 523 :
134 523 : if state.commit_lsn < state.local_start_lsn {
135 0 : bail!(
136 0 : "commit_lsn {} is higher than local_start_lsn {}",
137 0 : state.commit_lsn,
138 0 : state.local_start_lsn
139 0 : );
140 523 : }
141 :
142 : // We don't want to write anything to disk, because we may have existing timeline there.
143 : // These functions should not change anything on disk.
144 523 : let control_store = control_file::FileStorage::create_new(ttid, conf, state)?;
145 523 : let wal_store =
146 523 : wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
147 523 : let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
148 :
149 523 : Ok(Self {
150 523 : sk,
151 523 : peers_info: PeersInfo(vec![]),
152 523 : wal_backup_active: false,
153 523 : active: false,
154 523 : num_computes: 0,
155 523 : last_removed_segno: 0,
156 523 : })
157 523 : }
158 :
159 : /// Restore SharedState from control file. If file doesn't exist, bails out.
160 81 : fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
161 81 : let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
162 81 : if control_store.server.wal_seg_size == 0 {
163 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
164 81 : }
165 :
166 81 : let wal_store =
167 81 : wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
168 :
169 : Ok(Self {
170 81 : sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
171 81 : peers_info: PeersInfo(vec![]),
172 : wal_backup_active: false,
173 : active: false,
174 : num_computes: 0,
175 : last_removed_segno: 0,
176 : })
177 81 : }
178 :
179 13817 : fn is_active(&self, num_computes: usize, remote_consistent_lsn: Lsn) -> bool {
180 13817 : self.is_wal_backup_required(num_computes)
181 : // FIXME: add tracking of relevant pageservers and check them here individually,
182 : // otherwise migration won't work (we suspend too early).
183 2569 : || remote_consistent_lsn < self.sk.inmem.commit_lsn
184 13817 : }
185 :
186 : /// Mark timeline active/inactive and return whether s3 offloading requires
187 : /// start/stop action.
188 13817 : fn update_status(
189 13817 : &mut self,
190 13817 : num_computes: usize,
191 13817 : remote_consistent_lsn: Lsn,
192 13817 : ttid: TenantTimelineId,
193 13817 : ) -> bool {
194 13817 : let is_active = self.is_active(num_computes, remote_consistent_lsn);
195 13817 : if self.active != is_active {
196 1702 : info!("timeline {} active={} now", ttid, is_active);
197 12115 : }
198 13817 : self.active = is_active;
199 13817 : self.is_wal_backup_action_pending(num_computes)
200 13817 : }
201 :
202 : /// Should we run s3 offloading in current state?
203 38737 : fn is_wal_backup_required(&self, num_computes: usize) -> bool {
204 38737 : let seg_size = self.get_wal_seg_size();
205 38737 : num_computes > 0 ||
206 : // Currently only the whole segment is offloaded, so compare segment numbers.
207 9980 : (self.sk.inmem.commit_lsn.segment_number(seg_size) >
208 9980 : self.sk.inmem.backup_lsn.segment_number(seg_size))
209 38737 : }
210 :
211 : /// Is current state of s3 offloading is not what it ought to be?
212 13817 : fn is_wal_backup_action_pending(&self, num_computes: usize) -> bool {
213 13817 : let res = self.wal_backup_active != self.is_wal_backup_required(num_computes);
214 13817 : if res {
215 10915 : let action_pending = if self.is_wal_backup_required(num_computes) {
216 10841 : "start"
217 : } else {
218 74 : "stop"
219 : };
220 10915 : trace!(
221 0 : "timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
222 0 : self.sk.state.timeline_id, action_pending, self.num_computes, self.sk.inmem.commit_lsn, self.sk.inmem.backup_lsn
223 0 : );
224 2902 : }
225 13817 : res
226 13817 : }
227 :
228 : /// Returns whether s3 offloading is required and sets current status as
229 : /// matching.
230 188 : fn wal_backup_attend(&mut self, num_computes: usize) -> bool {
231 188 : self.wal_backup_active = self.is_wal_backup_required(num_computes);
232 188 : self.wal_backup_active
233 188 : }
234 :
235 38762 : fn get_wal_seg_size(&self) -> usize {
236 38762 : self.sk.state.server.wal_seg_size as usize
237 38762 : }
238 :
239 6300 : fn get_safekeeper_info(
240 6300 : &self,
241 6300 : ttid: &TenantTimelineId,
242 6300 : conf: &SafeKeeperConf,
243 6300 : remote_consistent_lsn: Lsn,
244 6300 : ) -> SafekeeperTimelineInfo {
245 6300 : SafekeeperTimelineInfo {
246 6300 : safekeeper_id: conf.my_id.0,
247 6300 : tenant_timeline_id: Some(ProtoTenantTimelineId {
248 6300 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
249 6300 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
250 6300 : }),
251 6300 : term: self.sk.state.acceptor_state.term,
252 6300 : last_log_term: self.sk.get_epoch(),
253 6300 : flush_lsn: self.sk.flush_lsn().0,
254 6300 : // note: this value is not flushed to control file yet and can be lost
255 6300 : commit_lsn: self.sk.inmem.commit_lsn.0,
256 6300 : remote_consistent_lsn: remote_consistent_lsn.0,
257 6300 : peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0,
258 6300 : safekeeper_connstr: conf
259 6300 : .advertise_pg_addr
260 6300 : .to_owned()
261 6300 : .unwrap_or(conf.listen_pg_addr.clone()),
262 6300 : http_connstr: conf.listen_http_addr.to_owned(),
263 6300 : backup_lsn: self.sk.inmem.backup_lsn.0,
264 6300 : local_start_lsn: self.sk.state.local_start_lsn.0,
265 6300 : availability_zone: conf.availability_zone.clone(),
266 6300 : }
267 6300 : }
268 : }
269 :
270 0 : #[derive(Debug, thiserror::Error)]
271 : pub enum TimelineError {
272 : #[error("Timeline {0} was cancelled and cannot be used anymore")]
273 : Cancelled(TenantTimelineId),
274 : #[error("Timeline {0} was not found in global map")]
275 : NotFound(TenantTimelineId),
276 : #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
277 : Invalid(TenantTimelineId),
278 : #[error("Timeline {0} is already exists")]
279 : AlreadyExists(TenantTimelineId),
280 : #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
281 : UninitializedWalSegSize(TenantTimelineId),
282 : #[error("Timeline {0} is not initialized, pg_version is unknown")]
283 : UninitialinzedPgVersion(TenantTimelineId),
284 : }
285 :
286 : // Convert to HTTP API error.
287 : impl From<TimelineError> for ApiError {
288 0 : fn from(te: TimelineError) -> ApiError {
289 0 : match te {
290 0 : TimelineError::NotFound(ttid) => {
291 0 : ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
292 : }
293 0 : _ => ApiError::InternalServerError(anyhow!("{}", te)),
294 : }
295 0 : }
296 : }
297 :
298 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
299 : /// It also holds SharedState and provides mutually exclusive access to it.
300 : pub struct Timeline {
301 : pub ttid: TenantTimelineId,
302 :
303 : /// Sending here asks for wal backup launcher attention (start/stop
304 : /// offloading). Sending ttid instead of concrete command allows to do
305 : /// sending without timeline lock.
306 : pub wal_backup_launcher_tx: Sender<TenantTimelineId>,
307 :
308 : /// Used to broadcast commit_lsn updates to all background jobs.
309 : commit_lsn_watch_tx: watch::Sender<Lsn>,
310 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
311 :
312 : /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
313 : /// them when sending in recovery mode (to walproposer or peers). Note: this
314 : /// is just a notification, WAL reading should always done with lock held as
315 : /// term can change otherwise.
316 : term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
317 : term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
318 :
319 : /// Safekeeper and other state, that should remain consistent and
320 : /// synchronized with the disk. This is tokio mutex as we write WAL to disk
321 : /// while holding it, ensuring that consensus checks are in order.
322 : mutex: Mutex<SharedState>,
323 : walsenders: Arc<WalSenders>,
324 : walreceivers: Arc<WalReceivers>,
325 :
326 : /// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
327 : cancellation_tx: watch::Sender<bool>,
328 :
329 : /// Timeline should not be used after cancellation. Background tasks should
330 : /// monitor this channel and stop eventually after receiving `true` from this channel.
331 : cancellation_rx: watch::Receiver<bool>,
332 :
333 : /// Directory where timeline state is stored.
334 : pub timeline_dir: PathBuf,
335 : }
336 :
337 : impl Timeline {
338 : /// Load existing timeline from disk.
339 81 : pub fn load_timeline(
340 81 : conf: &SafeKeeperConf,
341 81 : ttid: TenantTimelineId,
342 81 : wal_backup_launcher_tx: Sender<TenantTimelineId>,
343 81 : ) -> Result<Timeline> {
344 81 : let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
345 :
346 81 : let shared_state = SharedState::restore(conf, &ttid)?;
347 81 : let rcl = shared_state.sk.state.remote_consistent_lsn;
348 81 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
349 81 : watch::channel(shared_state.sk.state.commit_lsn);
350 81 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
351 81 : shared_state.sk.get_term(),
352 81 : shared_state.sk.flush_lsn(),
353 81 : )));
354 81 : let (cancellation_tx, cancellation_rx) = watch::channel(false);
355 81 :
356 81 : Ok(Timeline {
357 81 : ttid,
358 81 : wal_backup_launcher_tx,
359 81 : commit_lsn_watch_tx,
360 81 : commit_lsn_watch_rx,
361 81 : term_flush_lsn_watch_tx,
362 81 : term_flush_lsn_watch_rx,
363 81 : mutex: Mutex::new(shared_state),
364 81 : walsenders: WalSenders::new(rcl),
365 81 : walreceivers: WalReceivers::new(),
366 81 : cancellation_rx,
367 81 : cancellation_tx,
368 81 : timeline_dir: conf.timeline_dir(&ttid),
369 81 : })
370 81 : }
371 :
372 : /// Create a new timeline, which is not yet persisted to disk.
373 523 : pub fn create_empty(
374 523 : conf: &SafeKeeperConf,
375 523 : ttid: TenantTimelineId,
376 523 : wal_backup_launcher_tx: Sender<TenantTimelineId>,
377 523 : server_info: ServerInfo,
378 523 : commit_lsn: Lsn,
379 523 : local_start_lsn: Lsn,
380 523 : ) -> Result<Timeline> {
381 523 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
382 523 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
383 523 : watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
384 523 : let (cancellation_tx, cancellation_rx) = watch::channel(false);
385 523 : let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
386 523 :
387 523 : Ok(Timeline {
388 523 : ttid,
389 523 : wal_backup_launcher_tx,
390 523 : commit_lsn_watch_tx,
391 523 : commit_lsn_watch_rx,
392 523 : term_flush_lsn_watch_tx,
393 523 : term_flush_lsn_watch_rx,
394 523 : mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
395 523 : walsenders: WalSenders::new(Lsn(0)),
396 523 : walreceivers: WalReceivers::new(),
397 523 : cancellation_rx,
398 523 : cancellation_tx,
399 523 : timeline_dir: conf.timeline_dir(&ttid),
400 : })
401 523 : }
402 :
403 : /// Initialize fresh timeline on disk and start background tasks. If init
404 : /// fails, timeline is cancelled and cannot be used anymore.
405 : ///
406 : /// Init is transactional, so if it fails, created files will be deleted,
407 : /// and state on disk should remain unchanged.
408 523 : pub async fn init_new(
409 523 : self: &Arc<Timeline>,
410 523 : shared_state: &mut MutexGuard<'_, SharedState>,
411 523 : conf: &SafeKeeperConf,
412 523 : ) -> Result<()> {
413 595 : match fs::metadata(&self.timeline_dir).await {
414 : Ok(_) => {
415 : // Timeline directory exists on disk, we should leave state unchanged
416 : // and return error.
417 0 : bail!(TimelineError::Invalid(self.ttid));
418 : }
419 523 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
420 0 : Err(e) => {
421 0 : return Err(e.into());
422 : }
423 : }
424 :
425 : // Create timeline directory.
426 536 : fs::create_dir_all(&self.timeline_dir).await?;
427 :
428 : // Write timeline to disk and start background tasks.
429 1687 : if let Err(e) = shared_state.sk.persist().await {
430 : // Bootstrap failed, cancel timeline and remove timeline directory.
431 0 : self.cancel(shared_state);
432 :
433 0 : if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
434 0 : warn!(
435 0 : "failed to remove timeline {} directory after bootstrap failure: {}",
436 0 : self.ttid, fs_err
437 0 : );
438 0 : }
439 :
440 0 : return Err(e);
441 523 : }
442 523 : self.bootstrap(conf);
443 523 : Ok(())
444 523 : }
445 :
446 : /// Bootstrap new or existing timeline starting background stasks.
447 604 : pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
448 604 : // Start recovery task which always runs on the timeline.
449 604 : tokio::spawn(recovery_main(self.clone(), conf.clone()));
450 604 : }
451 :
452 : /// Delete timeline from disk completely, by removing timeline directory. Background
453 : /// timeline activities will stop eventually.
454 31 : pub async fn delete_from_disk(
455 31 : &self,
456 31 : shared_state: &mut MutexGuard<'_, SharedState>,
457 31 : ) -> Result<(bool, bool)> {
458 31 : let was_active = shared_state.active;
459 31 : self.cancel(shared_state);
460 31 : let dir_existed = delete_dir(&self.timeline_dir).await?;
461 31 : Ok((dir_existed, was_active))
462 31 : }
463 :
464 : /// Cancel timeline to prevent further usage. Background tasks will stop
465 : /// eventually after receiving cancellation signal.
466 : ///
467 : /// Note that we can't notify backup launcher here while holding
468 : /// shared_state lock, as this is a potential deadlock: caller is
469 : /// responsible for that. Generally we should probably make WAL backup tasks
470 : /// to shut down on their own, checking once in a while whether it is the
471 : /// time.
472 31 : fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
473 31 : info!("timeline {} is cancelled", self.ttid);
474 31 : let _ = self.cancellation_tx.send(true);
475 31 : // Close associated FDs. Nobody will be able to touch timeline data once
476 31 : // it is cancelled, so WAL storage won't be opened again.
477 31 : shared_state.sk.wal_store.close();
478 31 : }
479 :
480 : /// Returns if timeline is cancelled.
481 4883812 : pub fn is_cancelled(&self) -> bool {
482 4883812 : *self.cancellation_rx.borrow()
483 4883812 : }
484 :
485 : /// Returns watch channel which gets value when timeline is cancelled. It is
486 : /// guaranteed to have not cancelled value observed (errors otherwise).
487 604 : pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
488 604 : let rx = self.cancellation_rx.clone();
489 604 : if *rx.borrow() {
490 0 : bail!(TimelineError::Cancelled(self.ttid));
491 604 : }
492 604 : Ok(rx)
493 604 : }
494 :
495 : /// Take a writing mutual exclusive lock on timeline shared_state.
496 4885949 : pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
497 4885949 : self.mutex.lock().await
498 4885949 : }
499 :
500 13817 : fn update_status(&self, shared_state: &mut SharedState) -> bool {
501 13817 : shared_state.update_status(
502 13817 : self.walreceivers.get_num(),
503 13817 : self.get_walsenders().get_remote_consistent_lsn(),
504 13817 : self.ttid,
505 13817 : )
506 13817 : }
507 :
508 : /// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
509 4740 : pub async fn update_status_notify(&self) -> Result<()> {
510 4740 : if self.is_cancelled() {
511 0 : bail!(TimelineError::Cancelled(self.ttid));
512 4740 : }
513 4740 : let is_wal_backup_action_pending: bool = {
514 4740 : let mut shared_state = self.write_shared_state().await;
515 4740 : self.update_status(&mut shared_state)
516 4740 : };
517 4740 : if is_wal_backup_action_pending {
518 : // Can fail only if channel to a static thread got closed, which is not normal at all.
519 3049 : self.wal_backup_launcher_tx.send(self.ttid).await?;
520 1691 : }
521 4740 : Ok(())
522 4740 : }
523 :
524 : /// Returns true if walsender should stop sending WAL to pageserver. We
525 : /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
526 : /// computes. While there might be nothing to stream already, we learn about
527 : /// remote_consistent_lsn update through replication feedback, and we want
528 : /// to stop pushing to the broker if pageserver is fully caughtup.
529 1941 : pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
530 1941 : if self.is_cancelled() {
531 0 : return true;
532 1941 : }
533 1941 : let shared_state = self.write_shared_state().await;
534 1941 : if shared_state.num_computes == 0 {
535 1941 : return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
536 1941 : reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn;
537 0 : }
538 0 : false
539 1941 : }
540 :
541 : /// Ensure taht current term is t, erroring otherwise, and lock the state.
542 1 : pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
543 1 : let ss = self.write_shared_state().await;
544 1 : if ss.sk.state.acceptor_state.term != t {
545 1 : bail!(
546 1 : "failed to acquire term {}, current term {}",
547 1 : t,
548 1 : ss.sk.state.acceptor_state.term
549 1 : );
550 0 : }
551 0 : Ok(ss)
552 1 : }
553 :
554 : /// Returns whether s3 offloading is required and sets current status as
555 : /// matching it.
556 188 : pub async fn wal_backup_attend(&self) -> bool {
557 188 : if self.is_cancelled() {
558 0 : return false;
559 188 : }
560 188 :
561 188 : self.write_shared_state()
562 42 : .await
563 188 : .wal_backup_attend(self.walreceivers.get_num())
564 188 : }
565 :
566 : /// Returns commit_lsn watch channel.
567 791 : pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
568 791 : self.commit_lsn_watch_rx.clone()
569 791 : }
570 :
571 : /// Returns term_flush_lsn watch channel.
572 64 : pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
573 64 : self.term_flush_lsn_watch_rx.clone()
574 64 : }
575 :
576 : /// Pass arrived message to the safekeeper.
577 4848632 : pub async fn process_msg(
578 4848632 : &self,
579 4848632 : msg: &ProposerAcceptorMessage,
580 4848632 : ) -> Result<Option<AcceptorProposerMessage>> {
581 4848632 : if self.is_cancelled() {
582 0 : bail!(TimelineError::Cancelled(self.ttid));
583 4848632 : }
584 :
585 : let mut rmsg: Option<AcceptorProposerMessage>;
586 : let commit_lsn: Lsn;
587 : let term_flush_lsn: TermLsn;
588 : {
589 4848632 : let mut shared_state = self.write_shared_state().await;
590 4848632 : rmsg = shared_state.sk.process_msg(msg).await?;
591 :
592 : // if this is AppendResponse, fill in proper pageserver and hot
593 : // standby feedback.
594 4848632 : if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
595 2200283 : let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks();
596 2200283 : resp.hs_feedback = hs_feedback;
597 2200283 : resp.pageserver_feedback = ps_feedback;
598 2648349 : }
599 :
600 4848632 : commit_lsn = shared_state.sk.inmem.commit_lsn;
601 4848632 : term_flush_lsn =
602 4848632 : TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
603 4848632 : }
604 4848632 : self.commit_lsn_watch_tx.send(commit_lsn)?;
605 4848632 : self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
606 4848632 : Ok(rmsg)
607 4848632 : }
608 :
609 : /// Returns wal_seg_size.
610 25 : pub async fn get_wal_seg_size(&self) -> usize {
611 25 : self.write_shared_state().await.get_wal_seg_size()
612 25 : }
613 :
614 : /// Returns true only if the timeline is loaded and active.
615 7727 : pub async fn is_active(&self) -> bool {
616 7727 : if self.is_cancelled() {
617 0 : return false;
618 7727 : }
619 7727 :
620 7727 : self.write_shared_state().await.active
621 7727 : }
622 :
623 : /// Returns state of the timeline.
624 2851 : pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
625 2851 : let state = self.write_shared_state().await;
626 2851 : (state.sk.inmem.clone(), state.sk.state.clone())
627 2851 : }
628 :
629 : /// Returns latest backup_lsn.
630 287 : pub async fn get_wal_backup_lsn(&self) -> Lsn {
631 287 : self.write_shared_state().await.sk.inmem.backup_lsn
632 287 : }
633 :
634 : /// Sets backup_lsn to the given value.
635 23 : pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
636 23 : if self.is_cancelled() {
637 0 : bail!(TimelineError::Cancelled(self.ttid));
638 23 : }
639 :
640 23 : let mut state = self.write_shared_state().await;
641 23 : state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn);
642 23 : // we should check whether to shut down offloader, but this will be done
643 23 : // soon by peer communication anyway.
644 23 : Ok(())
645 23 : }
646 :
647 : /// Get safekeeper info for broadcasting to broker and other peers.
648 6300 : pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
649 6300 : let shared_state = self.write_shared_state().await;
650 6300 : shared_state.get_safekeeper_info(
651 6300 : &self.ttid,
652 6300 : conf,
653 6300 : self.walsenders.get_remote_consistent_lsn(),
654 6300 : )
655 6300 : }
656 :
657 : /// Update timeline state with peer safekeeper data.
658 9077 : pub async fn record_safekeeper_info(&self, mut sk_info: SafekeeperTimelineInfo) -> Result<()> {
659 9077 : // Update local remote_consistent_lsn in memory (in .walsenders) and in
660 9077 : // sk_info to pass it down to control file.
661 9077 : sk_info.remote_consistent_lsn = self
662 9077 : .walsenders
663 9077 : .update_remote_consistent_lsn(Lsn(sk_info.remote_consistent_lsn))
664 9077 : .0;
665 : let is_wal_backup_action_pending: bool;
666 : let commit_lsn: Lsn;
667 : {
668 9077 : let mut shared_state = self.write_shared_state().await;
669 9077 : shared_state.sk.record_safekeeper_info(&sk_info).await?;
670 9077 : let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
671 9077 : shared_state.peers_info.upsert(&peer_info);
672 9077 : is_wal_backup_action_pending = self.update_status(&mut shared_state);
673 9077 : commit_lsn = shared_state.sk.inmem.commit_lsn;
674 9077 : }
675 9077 : self.commit_lsn_watch_tx.send(commit_lsn)?;
676 : // Wake up wal backup launcher, if it is time to stop the offloading.
677 9077 : if is_wal_backup_action_pending {
678 7866 : self.wal_backup_launcher_tx.send(self.ttid).await?;
679 1211 : }
680 9077 : Ok(())
681 9077 : }
682 :
683 : /// Get our latest view of alive peers status on the timeline.
684 : /// We pass our own info through the broker as well, so when we don't have connection
685 : /// to the broker returned vec is empty.
686 443 : pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
687 443 : let shared_state = self.write_shared_state().await;
688 443 : let now = Instant::now();
689 443 : shared_state
690 443 : .peers_info
691 443 : .0
692 443 : .iter()
693 443 : // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
694 1082 : .filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout)
695 443 : .cloned()
696 443 : .collect()
697 443 : }
698 :
699 15096 : pub fn get_walsenders(&self) -> &Arc<WalSenders> {
700 15096 : &self.walsenders
701 15096 : }
702 :
703 2304 : pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
704 2304 : &self.walreceivers
705 2304 : }
706 :
707 : /// Returns flush_lsn.
708 460 : pub async fn get_flush_lsn(&self) -> Lsn {
709 460 : self.write_shared_state().await.sk.wal_store.flush_lsn()
710 460 : }
711 :
712 : /// Delete WAL segments from disk that are no longer needed. This is determined
713 : /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
714 1315 : pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
715 1315 : if self.is_cancelled() {
716 0 : bail!(TimelineError::Cancelled(self.ttid));
717 1315 : }
718 :
719 : let horizon_segno: XLogSegNo;
720 14 : let remover = {
721 1315 : let shared_state = self.write_shared_state().await;
722 1315 : horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
723 1315 : if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
724 1301 : return Ok(()); // nothing to do
725 14 : }
726 14 : let remover = shared_state.sk.wal_store.remove_up_to(horizon_segno - 1);
727 14 : // release the lock before removing
728 14 : remover
729 14 : };
730 14 :
731 14 : // delete old WAL files
732 54 : remover.await?;
733 :
734 : // update last_removed_segno
735 14 : let mut shared_state = self.write_shared_state().await;
736 14 : shared_state.last_removed_segno = horizon_segno;
737 14 : Ok(())
738 1315 : }
739 :
740 : /// Persist control file if there is something to save and enough time
741 : /// passed after the last save. This helps to keep remote_consistent_lsn up
742 : /// to date so that storage nodes restart doesn't cause many pageserver ->
743 : /// safekeeper reconnections.
744 1315 : pub async fn maybe_persist_control_file(&self) -> Result<()> {
745 1315 : let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn();
746 1315 : self.write_shared_state()
747 40 : .await
748 : .sk
749 1315 : .maybe_persist_control_file(remote_consistent_lsn)
750 3 : .await
751 1315 : }
752 :
753 : /// Gather timeline data for metrics. If the timeline is not active, returns
754 : /// None, we do not collect these.
755 51 : pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
756 51 : if self.is_cancelled() {
757 0 : return None;
758 51 : }
759 51 :
760 51 : let ps_feedback = self.walsenders.get_ps_feedback();
761 51 : let state = self.write_shared_state().await;
762 51 : if state.active {
763 51 : Some(FullTimelineInfo {
764 51 : ttid: self.ttid,
765 51 : ps_feedback,
766 51 : wal_backup_active: state.wal_backup_active,
767 51 : timeline_is_active: state.active,
768 51 : num_computes: state.num_computes,
769 51 : last_removed_segno: state.last_removed_segno,
770 51 : epoch_start_lsn: state.sk.epoch_start_lsn,
771 51 : mem_state: state.sk.inmem.clone(),
772 51 : persisted_state: state.sk.state.clone(),
773 51 : flush_lsn: state.sk.wal_store.flush_lsn(),
774 51 : remote_consistent_lsn: self.get_walsenders().get_remote_consistent_lsn(),
775 51 : wal_storage: state.sk.wal_store.get_metrics(),
776 51 : })
777 : } else {
778 0 : None
779 : }
780 51 : }
781 :
782 : /// Returns in-memory timeline state to build a full debug dump.
783 5 : pub async fn memory_dump(&self) -> debug_dump::Memory {
784 5 : let state = self.write_shared_state().await;
785 :
786 5 : let (write_lsn, write_record_lsn, flush_lsn, file_open) =
787 5 : state.sk.wal_store.internal_state();
788 5 :
789 5 : debug_dump::Memory {
790 5 : is_cancelled: self.is_cancelled(),
791 5 : peers_info_len: state.peers_info.0.len(),
792 5 : walsenders: self.walsenders.get_all(),
793 5 : wal_backup_active: state.wal_backup_active,
794 5 : active: state.active,
795 5 : num_computes: state.num_computes,
796 5 : last_removed_segno: state.last_removed_segno,
797 5 : epoch_start_lsn: state.sk.epoch_start_lsn,
798 5 : mem_state: state.sk.inmem.clone(),
799 5 : write_lsn,
800 5 : write_record_lsn,
801 5 : flush_lsn,
802 5 : file_open,
803 5 : }
804 5 : }
805 : }
806 :
807 : /// Deletes directory and it's contents. Returns false if directory does not exist.
808 31 : async fn delete_dir(path: &PathBuf) -> Result<bool> {
809 31 : match fs::remove_dir_all(path).await {
810 17 : Ok(_) => Ok(true),
811 14 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
812 0 : Err(e) => Err(e.into()),
813 : }
814 31 : }
|