Line data Source code
1 : //! This module implements Timeline lifecycle management and has all necessary code
2 : //! to glue together SafeKeeper and all other background services.
3 :
4 : use anyhow::{anyhow, bail, Result};
5 : use camino::Utf8PathBuf;
6 : use serde::{Deserialize, Serialize};
7 : use tokio::fs::{self};
8 : use tokio_util::sync::CancellationToken;
9 : use utils::id::TenantId;
10 :
11 : use std::cmp::max;
12 : use std::ops::{Deref, DerefMut};
13 : use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14 : use std::sync::Arc;
15 : use std::time::Duration;
16 : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
17 : use tokio::{sync::watch, time::Instant};
18 : use tracing::*;
19 : use utils::http::error::ApiError;
20 : use utils::{
21 : id::{NodeId, TenantTimelineId},
22 : lsn::Lsn,
23 : };
24 :
25 : use storage_broker::proto::SafekeeperTimelineInfo;
26 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
27 :
28 : use crate::receive_wal::WalReceivers;
29 : use crate::safekeeper::{
30 : AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
31 : INVALID_TERM,
32 : };
33 : use crate::send_wal::WalSenders;
34 : use crate::state::{TimelineMemState, TimelinePersistentState};
35 : use crate::timelines_set::TimelinesSet;
36 : use crate::wal_backup::{self};
37 : use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
38 :
39 : use crate::metrics::FullTimelineInfo;
40 : use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
41 : use crate::{debug_dump, timeline_manager, wal_storage};
42 : use crate::{GlobalTimelines, SafeKeeperConf};
43 :
44 : /// Things safekeeper should know about timeline state on peers.
45 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
46 : pub struct PeerInfo {
47 : pub sk_id: NodeId,
48 : pub term: Term,
49 : /// Term of the last entry.
50 : pub last_log_term: Term,
51 : /// LSN of the last record.
52 : pub flush_lsn: Lsn,
53 : pub commit_lsn: Lsn,
54 : /// Since which LSN safekeeper has WAL.
55 : pub local_start_lsn: Lsn,
56 : /// When info was received. Serde annotations are not very useful but make
57 : /// the code compile -- we don't rely on this field externally.
58 : #[serde(skip)]
59 : #[serde(default = "Instant::now")]
60 : ts: Instant,
61 : pub pg_connstr: String,
62 : pub http_connstr: String,
63 : }
64 :
65 : impl PeerInfo {
66 0 : fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
67 0 : PeerInfo {
68 0 : sk_id: NodeId(sk_info.safekeeper_id),
69 0 : term: sk_info.term,
70 0 : last_log_term: sk_info.last_log_term,
71 0 : flush_lsn: Lsn(sk_info.flush_lsn),
72 0 : commit_lsn: Lsn(sk_info.commit_lsn),
73 0 : local_start_lsn: Lsn(sk_info.local_start_lsn),
74 0 : pg_connstr: sk_info.safekeeper_connstr.clone(),
75 0 : http_connstr: sk_info.http_connstr.clone(),
76 0 : ts,
77 0 : }
78 0 : }
79 : }
80 :
81 : // vector-based node id -> peer state map with very limited functionality we
82 : // need.
83 : #[derive(Debug, Clone, Default)]
84 : pub struct PeersInfo(pub Vec<PeerInfo>);
85 :
86 : impl PeersInfo {
87 0 : fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
88 0 : self.0.iter_mut().find(|p| p.sk_id == id)
89 0 : }
90 :
91 0 : fn upsert(&mut self, p: &PeerInfo) {
92 0 : match self.get(p.sk_id) {
93 0 : Some(rp) => *rp = p.clone(),
94 0 : None => self.0.push(p.clone()),
95 : }
96 0 : }
97 : }
98 :
99 : pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
100 :
101 : /// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
102 : /// automatically updates `watch::Sender` channels with state on drop.
103 : pub struct WriteGuardSharedState<'a> {
104 : tli: Arc<Timeline>,
105 : guard: RwLockWriteGuard<'a, SharedState>,
106 : skip_update: bool,
107 : }
108 :
109 : impl<'a> WriteGuardSharedState<'a> {
110 0 : fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
111 0 : WriteGuardSharedState {
112 0 : tli,
113 0 : guard,
114 0 : skip_update: false,
115 0 : }
116 0 : }
117 : }
118 :
119 : impl<'a> Deref for WriteGuardSharedState<'a> {
120 : type Target = SharedState;
121 :
122 0 : fn deref(&self) -> &Self::Target {
123 0 : &self.guard
124 0 : }
125 : }
126 :
127 : impl<'a> DerefMut for WriteGuardSharedState<'a> {
128 0 : fn deref_mut(&mut self) -> &mut Self::Target {
129 0 : &mut self.guard
130 0 : }
131 : }
132 :
133 : impl<'a> Drop for WriteGuardSharedState<'a> {
134 0 : fn drop(&mut self) {
135 0 : let term_flush_lsn = TermLsn::from((self.guard.sk.get_term(), self.guard.sk.flush_lsn()));
136 0 : let commit_lsn = self.guard.sk.state.inmem.commit_lsn;
137 0 :
138 0 : let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
139 0 : if *old != term_flush_lsn {
140 0 : *old = term_flush_lsn;
141 0 : true
142 : } else {
143 0 : false
144 : }
145 0 : });
146 0 :
147 0 : let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
148 0 : if *old != commit_lsn {
149 0 : *old = commit_lsn;
150 0 : true
151 : } else {
152 0 : false
153 : }
154 0 : });
155 0 :
156 0 : if !self.skip_update {
157 0 : // send notification about shared state update
158 0 : self.tli.shared_state_version_tx.send_modify(|old| {
159 0 : *old += 1;
160 0 : });
161 0 : }
162 0 : }
163 : }
164 :
165 : /// Shared state associated with database instance
166 : pub struct SharedState {
167 : /// Safekeeper object
168 : pub(crate) sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
169 : /// In memory list containing state of peers sent in latest messages from them.
170 : pub(crate) peers_info: PeersInfo,
171 : // True value hinders old WAL removal; this is used by snapshotting. We
172 : // could make it a counter, but there is no need to.
173 : pub(crate) wal_removal_on_hold: bool,
174 : }
175 :
176 : impl SharedState {
177 : /// Initialize fresh timeline state without persisting anything to disk.
178 0 : fn create_new(
179 0 : conf: &SafeKeeperConf,
180 0 : ttid: &TenantTimelineId,
181 0 : state: TimelinePersistentState,
182 0 : ) -> Result<Self> {
183 0 : if state.server.wal_seg_size == 0 {
184 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
185 0 : }
186 0 :
187 0 : if state.server.pg_version == UNKNOWN_SERVER_VERSION {
188 0 : bail!(TimelineError::UninitialinzedPgVersion(*ttid));
189 0 : }
190 0 :
191 0 : if state.commit_lsn < state.local_start_lsn {
192 0 : bail!(
193 0 : "commit_lsn {} is higher than local_start_lsn {}",
194 0 : state.commit_lsn,
195 0 : state.local_start_lsn
196 0 : );
197 0 : }
198 0 :
199 0 : // We don't want to write anything to disk, because we may have existing timeline there.
200 0 : // These functions should not change anything on disk.
201 0 : let timeline_dir = get_timeline_dir(conf, ttid);
202 0 : let control_store =
203 0 : control_file::FileStorage::create_new(timeline_dir.clone(), conf, state)?;
204 0 : let wal_store =
205 0 : wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
206 0 : let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
207 :
208 0 : Ok(Self {
209 0 : sk,
210 0 : peers_info: PeersInfo(vec![]),
211 0 : wal_removal_on_hold: false,
212 0 : })
213 0 : }
214 :
215 : /// Restore SharedState from control file. If file doesn't exist, bails out.
216 0 : fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
217 0 : let timeline_dir = get_timeline_dir(conf, ttid);
218 0 : let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
219 0 : if control_store.server.wal_seg_size == 0 {
220 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
221 0 : }
222 :
223 0 : let wal_store =
224 0 : wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
225 :
226 : Ok(Self {
227 0 : sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
228 0 : peers_info: PeersInfo(vec![]),
229 : wal_removal_on_hold: false,
230 : })
231 0 : }
232 :
233 0 : pub(crate) fn get_wal_seg_size(&self) -> usize {
234 0 : self.sk.state.server.wal_seg_size as usize
235 0 : }
236 :
237 0 : fn get_safekeeper_info(
238 0 : &self,
239 0 : ttid: &TenantTimelineId,
240 0 : conf: &SafeKeeperConf,
241 0 : standby_apply_lsn: Lsn,
242 0 : ) -> SafekeeperTimelineInfo {
243 0 : SafekeeperTimelineInfo {
244 0 : safekeeper_id: conf.my_id.0,
245 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
246 0 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
247 0 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
248 0 : }),
249 0 : term: self.sk.state.acceptor_state.term,
250 0 : last_log_term: self.sk.get_last_log_term(),
251 0 : flush_lsn: self.sk.flush_lsn().0,
252 0 : // note: this value is not flushed to control file yet and can be lost
253 0 : commit_lsn: self.sk.state.inmem.commit_lsn.0,
254 0 : remote_consistent_lsn: self.sk.state.inmem.remote_consistent_lsn.0,
255 0 : peer_horizon_lsn: self.sk.state.inmem.peer_horizon_lsn.0,
256 0 : safekeeper_connstr: conf
257 0 : .advertise_pg_addr
258 0 : .to_owned()
259 0 : .unwrap_or(conf.listen_pg_addr.clone()),
260 0 : http_connstr: conf.listen_http_addr.to_owned(),
261 0 : backup_lsn: self.sk.state.inmem.backup_lsn.0,
262 0 : local_start_lsn: self.sk.state.local_start_lsn.0,
263 0 : availability_zone: conf.availability_zone.clone(),
264 0 : standby_horizon: standby_apply_lsn.0,
265 0 : }
266 0 : }
267 :
268 : /// Get our latest view of alive peers status on the timeline.
269 : /// We pass our own info through the broker as well, so when we don't have connection
270 : /// to the broker returned vec is empty.
271 0 : pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
272 0 : let now = Instant::now();
273 0 : self.peers_info
274 0 : .0
275 0 : .iter()
276 0 : // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
277 0 : .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
278 0 : .cloned()
279 0 : .collect()
280 0 : }
281 : }
282 :
283 0 : #[derive(Debug, thiserror::Error)]
284 : pub enum TimelineError {
285 : #[error("Timeline {0} was cancelled and cannot be used anymore")]
286 : Cancelled(TenantTimelineId),
287 : #[error("Timeline {0} was not found in global map")]
288 : NotFound(TenantTimelineId),
289 : #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
290 : Invalid(TenantTimelineId),
291 : #[error("Timeline {0} is already exists")]
292 : AlreadyExists(TenantTimelineId),
293 : #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
294 : UninitializedWalSegSize(TenantTimelineId),
295 : #[error("Timeline {0} is not initialized, pg_version is unknown")]
296 : UninitialinzedPgVersion(TenantTimelineId),
297 : }
298 :
299 : // Convert to HTTP API error.
300 : impl From<TimelineError> for ApiError {
301 0 : fn from(te: TimelineError) -> ApiError {
302 0 : match te {
303 0 : TimelineError::NotFound(ttid) => {
304 0 : ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
305 : }
306 0 : _ => ApiError::InternalServerError(anyhow!("{}", te)),
307 : }
308 0 : }
309 : }
310 :
311 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
312 : /// It also holds SharedState and provides mutually exclusive access to it.
313 : pub struct Timeline {
314 : pub ttid: TenantTimelineId,
315 :
316 : /// Used to broadcast commit_lsn updates to all background jobs.
317 : commit_lsn_watch_tx: watch::Sender<Lsn>,
318 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
319 :
320 : /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
321 : /// them when sending in recovery mode (to walproposer or peers). Note: this
322 : /// is just a notification, WAL reading should always done with lock held as
323 : /// term can change otherwise.
324 : term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
325 : term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
326 :
327 : /// Broadcasts shared state updates.
328 : shared_state_version_tx: watch::Sender<usize>,
329 : shared_state_version_rx: watch::Receiver<usize>,
330 :
331 : /// Safekeeper and other state, that should remain consistent and
332 : /// synchronized with the disk. This is tokio mutex as we write WAL to disk
333 : /// while holding it, ensuring that consensus checks are in order.
334 : mutex: RwLock<SharedState>,
335 : walsenders: Arc<WalSenders>,
336 : walreceivers: Arc<WalReceivers>,
337 : timeline_dir: Utf8PathBuf,
338 :
339 : /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
340 : pub(crate) cancel: CancellationToken,
341 :
342 : // timeline_manager controlled state
343 : pub(crate) broker_active: AtomicBool,
344 : pub(crate) wal_backup_active: AtomicBool,
345 : pub(crate) last_removed_segno: AtomicU64,
346 : }
347 :
348 : impl Timeline {
349 : /// Load existing timeline from disk.
350 0 : pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Timeline> {
351 0 : let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
352 :
353 0 : let shared_state = SharedState::restore(conf, &ttid)?;
354 0 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
355 0 : watch::channel(shared_state.sk.state.commit_lsn);
356 0 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
357 0 : shared_state.sk.get_term(),
358 0 : shared_state.sk.flush_lsn(),
359 0 : )));
360 0 : let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
361 0 :
362 0 : let walreceivers = WalReceivers::new();
363 0 : Ok(Timeline {
364 0 : ttid,
365 0 : commit_lsn_watch_tx,
366 0 : commit_lsn_watch_rx,
367 0 : term_flush_lsn_watch_tx,
368 0 : term_flush_lsn_watch_rx,
369 0 : shared_state_version_tx,
370 0 : shared_state_version_rx,
371 0 : mutex: RwLock::new(shared_state),
372 0 : walsenders: WalSenders::new(walreceivers.clone()),
373 0 : walreceivers,
374 0 : cancel: CancellationToken::default(),
375 0 : timeline_dir: get_timeline_dir(conf, &ttid),
376 0 : broker_active: AtomicBool::new(false),
377 0 : wal_backup_active: AtomicBool::new(false),
378 0 : last_removed_segno: AtomicU64::new(0),
379 0 : })
380 0 : }
381 :
382 : /// Create a new timeline, which is not yet persisted to disk.
383 0 : pub fn create_empty(
384 0 : conf: &SafeKeeperConf,
385 0 : ttid: TenantTimelineId,
386 0 : server_info: ServerInfo,
387 0 : commit_lsn: Lsn,
388 0 : local_start_lsn: Lsn,
389 0 : ) -> Result<Timeline> {
390 0 : let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
391 0 : let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
392 0 : watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
393 0 : let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
394 0 :
395 0 : let state =
396 0 : TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
397 0 :
398 0 : let walreceivers = WalReceivers::new();
399 0 : Ok(Timeline {
400 0 : ttid,
401 0 : commit_lsn_watch_tx,
402 0 : commit_lsn_watch_rx,
403 0 : term_flush_lsn_watch_tx,
404 0 : term_flush_lsn_watch_rx,
405 0 : shared_state_version_tx,
406 0 : shared_state_version_rx,
407 0 : mutex: RwLock::new(SharedState::create_new(conf, &ttid, state)?),
408 0 : walsenders: WalSenders::new(walreceivers.clone()),
409 0 : walreceivers,
410 0 : cancel: CancellationToken::default(),
411 0 : timeline_dir: get_timeline_dir(conf, &ttid),
412 0 : broker_active: AtomicBool::new(false),
413 0 : wal_backup_active: AtomicBool::new(false),
414 0 : last_removed_segno: AtomicU64::new(0),
415 : })
416 0 : }
417 :
418 : /// Initialize fresh timeline on disk and start background tasks. If init
419 : /// fails, timeline is cancelled and cannot be used anymore.
420 : ///
421 : /// Init is transactional, so if it fails, created files will be deleted,
422 : /// and state on disk should remain unchanged.
423 0 : pub async fn init_new(
424 0 : self: &Arc<Timeline>,
425 0 : shared_state: &mut WriteGuardSharedState<'_>,
426 0 : conf: &SafeKeeperConf,
427 0 : broker_active_set: Arc<TimelinesSet>,
428 0 : ) -> Result<()> {
429 0 : match fs::metadata(&self.timeline_dir).await {
430 : Ok(_) => {
431 : // Timeline directory exists on disk, we should leave state unchanged
432 : // and return error.
433 0 : bail!(TimelineError::Invalid(self.ttid));
434 : }
435 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
436 0 : Err(e) => {
437 0 : return Err(e.into());
438 : }
439 : }
440 :
441 : // Create timeline directory.
442 0 : fs::create_dir_all(&self.timeline_dir).await?;
443 :
444 : // Write timeline to disk and start background tasks.
445 0 : if let Err(e) = shared_state.sk.state.flush().await {
446 : // Bootstrap failed, cancel timeline and remove timeline directory.
447 0 : self.cancel(shared_state);
448 :
449 0 : if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
450 0 : warn!(
451 0 : "failed to remove timeline {} directory after bootstrap failure: {}",
452 0 : self.ttid, fs_err
453 : );
454 0 : }
455 :
456 0 : return Err(e);
457 0 : }
458 0 : self.bootstrap(conf, broker_active_set);
459 0 : Ok(())
460 0 : }
461 :
462 : /// Bootstrap new or existing timeline starting background tasks.
463 0 : pub fn bootstrap(
464 0 : self: &Arc<Timeline>,
465 0 : conf: &SafeKeeperConf,
466 0 : broker_active_set: Arc<TimelinesSet>,
467 0 : ) {
468 0 : // Start manager task which will monitor timeline state and update
469 0 : // background tasks.
470 0 : tokio::spawn(timeline_manager::main_task(
471 0 : self.clone(),
472 0 : conf.clone(),
473 0 : broker_active_set,
474 0 : ));
475 0 : }
476 :
477 : /// Delete timeline from disk completely, by removing timeline directory.
478 : /// Background timeline activities will stop eventually.
479 : ///
480 : /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
481 : /// deletion API endpoint is retriable.
482 0 : pub async fn delete(
483 0 : &self,
484 0 : shared_state: &mut WriteGuardSharedState<'_>,
485 0 : only_local: bool,
486 0 : ) -> Result<bool> {
487 0 : self.cancel(shared_state);
488 0 :
489 0 : // TODO: It's better to wait for s3 offloader termination before
490 0 : // removing data from s3. Though since s3 doesn't have transactions it
491 0 : // still wouldn't guarantee absense of data after removal.
492 0 : let conf = GlobalTimelines::get_global_config();
493 0 : if !only_local && conf.is_wal_backup_enabled() {
494 : // Note: we concurrently delete remote storage data from multiple
495 : // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
496 : // do some retries anyway.
497 0 : wal_backup::delete_timeline(&self.ttid).await?;
498 0 : }
499 0 : let dir_existed = delete_dir(&self.timeline_dir).await?;
500 0 : Ok(dir_existed)
501 0 : }
502 :
503 : /// Cancel timeline to prevent further usage. Background tasks will stop
504 : /// eventually after receiving cancellation signal.
505 0 : fn cancel(&self, shared_state: &mut WriteGuardSharedState<'_>) {
506 0 : info!("timeline {} is cancelled", self.ttid);
507 0 : self.cancel.cancel();
508 0 : // Close associated FDs. Nobody will be able to touch timeline data once
509 0 : // it is cancelled, so WAL storage won't be opened again.
510 0 : shared_state.sk.wal_store.close();
511 0 : }
512 :
513 : /// Returns if timeline is cancelled.
514 0 : pub fn is_cancelled(&self) -> bool {
515 0 : self.cancel.is_cancelled()
516 0 : }
517 :
518 : /// Take a writing mutual exclusive lock on timeline shared_state.
519 0 : pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
520 0 : WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
521 0 : }
522 :
523 0 : pub async fn read_shared_state(&self) -> ReadGuardSharedState {
524 0 : self.mutex.read().await
525 0 : }
526 :
527 : /// Returns commit_lsn watch channel.
528 0 : pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
529 0 : self.commit_lsn_watch_rx.clone()
530 0 : }
531 :
532 : /// Returns term_flush_lsn watch channel.
533 0 : pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
534 0 : self.term_flush_lsn_watch_rx.clone()
535 0 : }
536 :
537 : /// Returns watch channel for SharedState update version.
538 0 : pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
539 0 : self.shared_state_version_rx.clone()
540 0 : }
541 :
542 : /// Returns wal_seg_size.
543 0 : pub async fn get_wal_seg_size(&self) -> usize {
544 0 : self.read_shared_state().await.get_wal_seg_size()
545 0 : }
546 :
547 : /// Returns state of the timeline.
548 0 : pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
549 0 : let state = self.read_shared_state().await;
550 0 : (state.sk.state.inmem.clone(), state.sk.state.clone())
551 0 : }
552 :
553 : /// Returns latest backup_lsn.
554 0 : pub async fn get_wal_backup_lsn(&self) -> Lsn {
555 0 : self.read_shared_state().await.sk.state.inmem.backup_lsn
556 0 : }
557 :
558 : /// Sets backup_lsn to the given value.
559 0 : pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
560 0 : if self.is_cancelled() {
561 0 : bail!(TimelineError::Cancelled(self.ttid));
562 0 : }
563 :
564 0 : let mut state = self.write_shared_state().await;
565 0 : state.sk.state.inmem.backup_lsn = max(state.sk.state.inmem.backup_lsn, backup_lsn);
566 0 : // we should check whether to shut down offloader, but this will be done
567 0 : // soon by peer communication anyway.
568 0 : Ok(())
569 0 : }
570 :
571 : /// Get safekeeper info for broadcasting to broker and other peers.
572 0 : pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
573 0 : let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
574 0 : let shared_state = self.read_shared_state().await;
575 0 : shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
576 0 : }
577 :
578 : /// Update timeline state with peer safekeeper data.
579 0 : pub async fn record_safekeeper_info(
580 0 : self: &Arc<Self>,
581 0 : sk_info: SafekeeperTimelineInfo,
582 0 : ) -> Result<()> {
583 : {
584 0 : let mut shared_state = self.write_shared_state().await;
585 0 : shared_state.sk.record_safekeeper_info(&sk_info).await?;
586 0 : let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
587 0 : shared_state.peers_info.upsert(&peer_info);
588 0 : }
589 0 : Ok(())
590 0 : }
591 :
592 0 : pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
593 0 : let shared_state = self.read_shared_state().await;
594 0 : shared_state.get_peers(conf.heartbeat_timeout)
595 0 : }
596 :
597 0 : pub fn get_walsenders(&self) -> &Arc<WalSenders> {
598 0 : &self.walsenders
599 0 : }
600 :
601 0 : pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
602 0 : &self.walreceivers
603 0 : }
604 :
605 : /// Returns flush_lsn.
606 0 : pub async fn get_flush_lsn(&self) -> Lsn {
607 0 : self.read_shared_state().await.sk.wal_store.flush_lsn()
608 0 : }
609 :
610 : /// Gather timeline data for metrics.
611 0 : pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
612 0 : if self.is_cancelled() {
613 0 : return None;
614 0 : }
615 0 :
616 0 : let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
617 0 : let state = self.read_shared_state().await;
618 0 : Some(FullTimelineInfo {
619 0 : ttid: self.ttid,
620 0 : ps_feedback_count,
621 0 : last_ps_feedback,
622 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
623 0 : timeline_is_active: self.broker_active.load(Ordering::Relaxed),
624 0 : num_computes: self.walreceivers.get_num() as u32,
625 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
626 0 : epoch_start_lsn: state.sk.term_start_lsn,
627 0 : mem_state: state.sk.state.inmem.clone(),
628 0 : persisted_state: state.sk.state.clone(),
629 0 : flush_lsn: state.sk.wal_store.flush_lsn(),
630 0 : wal_storage: state.sk.wal_store.get_metrics(),
631 0 : })
632 0 : }
633 :
634 : /// Returns in-memory timeline state to build a full debug dump.
635 0 : pub async fn memory_dump(&self) -> debug_dump::Memory {
636 0 : let state = self.read_shared_state().await;
637 :
638 0 : let (write_lsn, write_record_lsn, flush_lsn, file_open) =
639 0 : state.sk.wal_store.internal_state();
640 0 :
641 0 : debug_dump::Memory {
642 0 : is_cancelled: self.is_cancelled(),
643 0 : peers_info_len: state.peers_info.0.len(),
644 0 : walsenders: self.walsenders.get_all(),
645 0 : wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
646 0 : active: self.broker_active.load(Ordering::Relaxed),
647 0 : num_computes: self.walreceivers.get_num() as u32,
648 0 : last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
649 0 : epoch_start_lsn: state.sk.term_start_lsn,
650 0 : mem_state: state.sk.state.inmem.clone(),
651 0 : write_lsn,
652 0 : write_record_lsn,
653 0 : flush_lsn,
654 0 : file_open,
655 0 : }
656 0 : }
657 :
658 : /// Apply a function to the control file state and persist it.
659 0 : pub async fn map_control_file<T>(
660 0 : self: &Arc<Self>,
661 0 : f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
662 0 : ) -> Result<T> {
663 0 : let mut state = self.write_shared_state().await;
664 0 : let mut persistent_state = state.sk.state.start_change();
665 : // If f returns error, we abort the change and don't persist anything.
666 0 : let res = f(&mut persistent_state)?;
667 : // If persisting fails, we abort the change and return error.
668 0 : state.sk.state.finish_change(&persistent_state).await?;
669 0 : Ok(res)
670 0 : }
671 :
672 : /// Get the timeline guard for reading/writing WAL files.
673 : /// TODO: if WAL files are not present on disk (evicted), they will be
674 : /// downloaded from S3. Also there will logic for preventing eviction
675 : /// while someone is holding FullAccessTimeline guard.
676 0 : pub async fn full_access_guard(self: &Arc<Self>) -> Result<FullAccessTimeline> {
677 0 : if self.is_cancelled() {
678 0 : bail!(TimelineError::Cancelled(self.ttid));
679 0 : }
680 0 : Ok(FullAccessTimeline { tli: self.clone() })
681 0 : }
682 : }
683 :
684 : /// This is a guard that allows to read/write disk timeline state.
685 : /// All tasks that are using the disk should use this guard.
686 : #[derive(Clone)]
687 : pub struct FullAccessTimeline {
688 : pub tli: Arc<Timeline>,
689 : }
690 :
691 : impl Deref for FullAccessTimeline {
692 : type Target = Arc<Timeline>;
693 :
694 0 : fn deref(&self) -> &Self::Target {
695 0 : &self.tli
696 0 : }
697 : }
698 :
699 : impl FullAccessTimeline {
700 : /// Returns true if walsender should stop sending WAL to pageserver. We
701 : /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
702 : /// computes. While there might be nothing to stream already, we learn about
703 : /// remote_consistent_lsn update through replication feedback, and we want
704 : /// to stop pushing to the broker if pageserver is fully caughtup.
705 0 : pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
706 0 : if self.is_cancelled() {
707 0 : return true;
708 0 : }
709 0 : let shared_state = self.read_shared_state().await;
710 0 : if self.walreceivers.get_num() == 0 {
711 0 : return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
712 0 : reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
713 0 : }
714 0 : false
715 0 : }
716 :
717 : /// Ensure that current term is t, erroring otherwise, and lock the state.
718 0 : pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
719 0 : let ss = self.read_shared_state().await;
720 0 : if ss.sk.state.acceptor_state.term != t {
721 0 : bail!(
722 0 : "failed to acquire term {}, current term {}",
723 0 : t,
724 0 : ss.sk.state.acceptor_state.term
725 0 : );
726 0 : }
727 0 : Ok(ss)
728 0 : }
729 :
730 : /// Pass arrived message to the safekeeper.
731 0 : pub async fn process_msg(
732 0 : &self,
733 0 : msg: &ProposerAcceptorMessage,
734 0 : ) -> Result<Option<AcceptorProposerMessage>> {
735 0 : if self.is_cancelled() {
736 0 : bail!(TimelineError::Cancelled(self.ttid));
737 0 : }
738 :
739 : let mut rmsg: Option<AcceptorProposerMessage>;
740 : {
741 0 : let mut shared_state = self.write_shared_state().await;
742 0 : rmsg = shared_state.sk.process_msg(msg).await?;
743 :
744 : // if this is AppendResponse, fill in proper hot standby feedback.
745 0 : if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
746 0 : resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
747 0 : }
748 : }
749 0 : Ok(rmsg)
750 0 : }
751 :
752 0 : pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
753 0 : let (_, persisted_state) = self.get_state().await;
754 0 : let enable_remote_read = GlobalTimelines::get_global_config().is_wal_backup_enabled();
755 0 :
756 0 : WalReader::new(
757 0 : &self.ttid,
758 0 : self.timeline_dir.clone(),
759 0 : &persisted_state,
760 0 : start_lsn,
761 0 : enable_remote_read,
762 0 : )
763 0 : }
764 :
765 0 : pub fn get_timeline_dir(&self) -> Utf8PathBuf {
766 0 : self.timeline_dir.clone()
767 0 : }
768 :
769 : /// Update in memory remote consistent lsn.
770 0 : pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
771 0 : let mut shared_state = self.write_shared_state().await;
772 0 : shared_state.sk.state.inmem.remote_consistent_lsn =
773 0 : max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
774 0 : }
775 : }
776 :
777 : /// Deletes directory and it's contents. Returns false if directory does not exist.
778 0 : async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
779 0 : match fs::remove_dir_all(path).await {
780 0 : Ok(_) => Ok(true),
781 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
782 0 : Err(e) => Err(e.into()),
783 : }
784 0 : }
785 :
786 : /// Get a path to the tenant directory. If you just need to get a timeline directory,
787 : /// use FullAccessTimeline::get_timeline_dir instead.
788 14 : pub(crate) fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
789 14 : conf.workdir.join(tenant_id.to_string())
790 14 : }
791 :
792 : /// Get a path to the timeline directory. If you need to read WAL files from disk,
793 : /// use FullAccessTimeline::get_timeline_dir instead. This function does not check
794 : /// timeline eviction status and WAL files might not be present on disk.
795 14 : pub(crate) fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
796 14 : get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
797 14 : }
|