LCOV - code coverage report
Current view: top level - safekeeper/src - wal_storage.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 66.8 % 425 284
Test Date: 2025-07-16 12:29:03 Functions: 66.7 % 48 32

            Line data    Source code
       1              : //! This module has everything to deal with WAL -- reading and writing to disk.
       2              : //!
       3              : //! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
       4              : //! PG timeline is always 1, so WAL segments are usually have names like this:
       5              : //! - 000000010000000000000001
       6              : //! - 000000010000000000000002.partial
       7              : //!
       8              : //! Note that last file has `.partial` suffix, that's different from postgres.
       9              : 
      10              : use std::cmp::{max, min};
      11              : use std::future::Future;
      12              : use std::io::{ErrorKind, SeekFrom};
      13              : use std::pin::Pin;
      14              : 
      15              : use anyhow::{Context, Result, bail};
      16              : use bytes::Bytes;
      17              : use camino::{Utf8Path, Utf8PathBuf};
      18              : use futures::future::BoxFuture;
      19              : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
      20              : use postgres_ffi::waldecoder::WalStreamDecoder;
      21              : use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo, dispatch_pgversion};
      22              : use postgres_versioninfo::{PgMajorVersion, PgVersionId};
      23              : use pq_proto::SystemId;
      24              : use remote_storage::RemotePath;
      25              : use std::sync::Arc;
      26              : use tokio::fs::{self, File, OpenOptions, remove_file};
      27              : use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
      28              : use tracing::*;
      29              : use utils::crashsafe::durable_rename;
      30              : use utils::id::TenantTimelineId;
      31              : use utils::lsn::Lsn;
      32              : 
      33              : use crate::metrics::{
      34              :     REMOVED_WAL_SEGMENTS, WAL_DISK_IO_ERRORS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics,
      35              :     time_io_closure,
      36              : };
      37              : use crate::state::TimelinePersistentState;
      38              : use crate::wal_backup::{WalBackup, read_object, remote_timeline_path};
      39              : 
      40              : pub trait Storage {
      41              :     // Last written LSN.
      42              :     fn write_lsn(&self) -> Lsn;
      43              :     /// LSN of last durably stored WAL record.
      44              :     fn flush_lsn(&self) -> Lsn;
      45              : 
      46              :     /// Initialize segment by creating proper long header at the beginning of
      47              :     /// the segment and short header at the page of given LSN. This is only used
      48              :     /// for timeline initialization because compute will stream data only since
      49              :     /// init_lsn. Other segment headers are included in compute stream.
      50              :     fn initialize_first_segment(
      51              :         &mut self,
      52              :         init_lsn: Lsn,
      53              :     ) -> impl Future<Output = Result<()>> + Send;
      54              : 
      55              :     /// Write piece of WAL from buf to disk, but not necessarily sync it.
      56              :     fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> impl Future<Output = Result<()>> + Send;
      57              : 
      58              :     /// Truncate WAL at specified LSN, which must be the end of WAL record.
      59              :     fn truncate_wal(&mut self, end_pos: Lsn) -> impl Future<Output = Result<()>> + Send;
      60              : 
      61              :     /// Durably store WAL on disk, up to the last written WAL record.
      62              :     fn flush_wal(&mut self) -> impl Future<Output = Result<()>> + Send;
      63              : 
      64              :     /// Remove all segments <= given segno. Returns function doing that as we
      65              :     /// want to perform it without timeline lock.
      66              :     fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
      67              : 
      68              :     /// Release resources associated with the storage -- technically, close FDs.
      69              :     /// Currently we don't remove timelines until restart (#3146), so need to
      70              :     /// spare descriptors. This would be useful for temporary tli detach as
      71              :     /// well.
      72            0 :     fn close(&mut self) {}
      73              : 
      74              :     /// Get metrics for this timeline.
      75              :     fn get_metrics(&self) -> WalStorageMetrics;
      76              : }
      77              : 
      78              : /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
      79              : /// for better performance. Storage is initialized in the constructor.
      80              : ///
      81              : /// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
      82              : /// its filename and may be not fully flushed.
      83              : ///
      84              : /// Relationship of LSNs:
      85              : /// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
      86              : ///
      87              : /// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
      88              : pub struct PhysicalStorage {
      89              :     metrics: WalStorageMetrics,
      90              :     timeline_dir: Utf8PathBuf,
      91              : 
      92              :     /// Disables fsync if true.
      93              :     no_sync: bool,
      94              : 
      95              :     /// Size of WAL segment in bytes.
      96              :     wal_seg_size: usize,
      97              :     pg_version: PgVersionId,
      98              :     system_id: u64,
      99              : 
     100              :     /// Written to disk, but possibly still in the cache and not fully persisted.
     101              :     /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
     102              :     write_lsn: Lsn,
     103              : 
     104              :     /// The LSN of the last WAL record written to disk. Still can be not fully
     105              :     /// flushed.
     106              :     ///
     107              :     /// Note: Normally it (and flush_record_lsn) is <= write_lsn, but after xlog
     108              :     /// switch ingest the reverse is true because we don't bump write_lsn up to
     109              :     /// the next segment: WAL stream from the compute doesn't have the gap and
     110              :     /// for simplicity / as a sanity check we disallow any non-sequential
     111              :     /// writes, so write zeros as is.
     112              :     ///
     113              :     /// Similar effect is in theory possible due to LSN alignment: if record
     114              :     /// ends at *2, decoder will report end lsn as *8 even though we haven't
     115              :     /// written these zeros yet. In practice compute likely never sends
     116              :     /// non-aligned chunks of data.
     117              :     write_record_lsn: Lsn,
     118              : 
     119              :     /// The last LSN flushed to disk. May be in the middle of a record.
     120              :     ///
     121              :     /// NB: when the rest of the system refers to `flush_lsn`, it usually
     122              :     /// actually refers to `flush_record_lsn`. This ambiguity can be dangerous
     123              :     /// and should be resolved.
     124              :     flush_lsn: Lsn,
     125              : 
     126              :     /// The LSN of the last WAL record flushed to disk.
     127              :     flush_record_lsn: Lsn,
     128              : 
     129              :     /// Decoder is required for detecting boundaries of WAL records.
     130              :     decoder: WalStreamDecoder,
     131              : 
     132              :     /// Cached open file for the last segment.
     133              :     ///
     134              :     /// If Some(file) is open, then it always:
     135              :     /// - has ".partial" suffix
     136              :     /// - points to write_lsn, so no seek is needed for writing
     137              :     /// - doesn't point to the end of the segment
     138              :     file: Option<File>,
     139              : 
     140              :     /// When true, WAL truncation potentially has been interrupted and we need
     141              :     /// to finish it before allowing WAL writes; see truncate_wal for details.
     142              :     /// In this case [`write_lsn`] can be less than actually written WAL on
     143              :     /// disk. In particular, there can be a case with unexpected .partial file.
     144              :     ///
     145              :     /// Imagine the following:
     146              :     /// - 000000010000000000000001
     147              :     ///   - it was fully written, but the last record is split between 2
     148              :     ///     segments
     149              :     ///   - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in
     150              :     ///     the end of this segment
     151              :     ///   - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were
     152              :     ///     initialized to 0/1FFFFF0
     153              :     /// - 000000010000000000000002.partial
     154              :     ///   - it has only 1 byte written, which is not enough to make a full WAL
     155              :     ///     record
     156              :     ///
     157              :     /// Partial segment 002 has no WAL records, and it will be removed by the
     158              :     /// next truncate_wal(). This flag will be set to false after the first
     159              :     /// successful truncate_wal() call.
     160              :     ///
     161              :     /// [`write_lsn`]: Self::write_lsn
     162              :     pending_wal_truncation: bool,
     163              : }
     164              : 
     165              : impl PhysicalStorage {
     166              :     /// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from
     167              :     /// the disk. Otherwise, all LSNs are set to zero.
     168            5 :     pub fn new(
     169            5 :         ttid: &TenantTimelineId,
     170            5 :         timeline_dir: &Utf8Path,
     171            5 :         state: &TimelinePersistentState,
     172            5 :         no_sync: bool,
     173            5 :     ) -> Result<PhysicalStorage> {
     174            5 :         let wal_seg_size = state.server.wal_seg_size as usize;
     175              : 
     176              :         // Find out where stored WAL ends, starting at commit_lsn which is a
     177              :         // known recent record boundary (unless we don't have WAL at all).
     178              :         //
     179              :         // NB: find_end_of_wal MUST be backwards compatible with the previously
     180              :         // written WAL. If find_end_of_wal fails to read any WAL written by an
     181              :         // older version of the code, we could lose data forever.
     182            5 :         let write_lsn = if state.commit_lsn == Lsn(0) {
     183            5 :             Lsn(0)
     184              :         } else {
     185            0 :             let version = PgMajorVersion::try_from(state.server.pg_version).unwrap();
     186              : 
     187            0 :             dispatch_pgversion!(
     188            0 :                 version,
     189            0 :                 pgv::xlog_utils::find_end_of_wal(
     190            0 :                     timeline_dir.as_std_path(),
     191            0 :                     wal_seg_size,
     192            0 :                     state.commit_lsn,
     193            0 :                 )?,
     194            0 :                 bail!("unsupported postgres version: {}", version)
     195              :             )
     196              :         };
     197              : 
     198              :         // note: this assumes we fsync'ed whole datadir on start.
     199            5 :         let flush_lsn = write_lsn;
     200              : 
     201            5 :         debug!(
     202            0 :             "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
     203              :             ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
     204              :         );
     205            5 :         if flush_lsn < state.commit_lsn {
     206              :             // note: can never happen. find_end_of_wal returns provided start_lsn
     207              :             // (state.commit_lsn in our case) if it doesn't find anything.
     208            0 :             bail!(
     209            0 :                 "timeline {} potential data loss: flush_lsn {} by find_end_of_wal is less than commit_lsn  {} from control file",
     210              :                 ttid.timeline_id,
     211              :                 flush_lsn,
     212              :                 state.commit_lsn
     213              :             );
     214            5 :         }
     215            5 :         if flush_lsn < state.peer_horizon_lsn {
     216            0 :             warn!(
     217            0 :                 "timeline {}: flush_lsn {} is less than cfile peer_horizon_lsn {}",
     218              :                 ttid.timeline_id, flush_lsn, state.peer_horizon_lsn
     219              :             );
     220            5 :         }
     221              : 
     222            5 :         Ok(PhysicalStorage {
     223            5 :             metrics: WalStorageMetrics::default(),
     224            5 :             timeline_dir: timeline_dir.to_path_buf(),
     225            5 :             no_sync,
     226            5 :             wal_seg_size,
     227            5 :             pg_version: state.server.pg_version,
     228            5 :             system_id: state.server.system_id,
     229            5 :             write_lsn,
     230            5 :             write_record_lsn: write_lsn,
     231            5 :             flush_lsn,
     232            5 :             flush_record_lsn: flush_lsn,
     233            5 :             decoder: WalStreamDecoder::new(
     234            5 :                 write_lsn,
     235            5 :                 PgMajorVersion::try_from(state.server.pg_version).unwrap(),
     236            5 :             ),
     237            5 :             file: None,
     238            5 :             pending_wal_truncation: true,
     239            5 :         })
     240            5 :     }
     241              : 
     242              :     /// Get all known state of the storage.
     243            0 :     pub fn internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
     244            0 :         (
     245            0 :             self.write_lsn,
     246            0 :             self.write_record_lsn,
     247            0 :             self.flush_record_lsn,
     248            0 :             self.file.is_some(),
     249            0 :         )
     250            0 :     }
     251              : 
     252              :     /// Call fsync if config requires so.
     253            5 :     async fn fsync_file(&mut self, file: &File) -> Result<()> {
     254            5 :         if !self.no_sync {
     255            5 :             self.metrics
     256            5 :                 .observe_flush_seconds(time_io_closure(file.sync_all()).await?);
     257            0 :         }
     258            5 :         Ok(())
     259            5 :     }
     260              : 
     261              :     /// Call fdatasync if config requires so.
     262          620 :     async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
     263          620 :         if !self.no_sync {
     264          620 :             self.metrics
     265          620 :                 .observe_flush_seconds(time_io_closure(file.sync_data()).await?);
     266            0 :         }
     267          620 :         Ok(())
     268          620 :     }
     269              : 
     270              :     /// Open or create WAL segment file. Caller must call seek to the wanted position.
     271              :     /// Returns `file` and `is_partial`.
     272           15 :     async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
     273           15 :         let (wal_file_path, wal_file_partial_path) =
     274           15 :             wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
     275              : 
     276              :         // Try to open already completed segment
     277           15 :         if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
     278            0 :             Ok((file, false))
     279           15 :         } else if let Ok(file) = OpenOptions::new()
     280           15 :             .write(true)
     281           15 :             .open(&wal_file_partial_path)
     282           15 :             .await
     283              :         {
     284              :             // Try to open existing partial file
     285           10 :             Ok((file, true))
     286              :         } else {
     287            5 :             let _timer = WAL_STORAGE_OPERATION_SECONDS
     288            5 :                 .with_label_values(&["initialize_segment"])
     289            5 :                 .start_timer();
     290              :             // Create and fill new partial file
     291              :             //
     292              :             // We're using fdatasync during WAL writing, so file size must not
     293              :             // change; to this end it is filled with zeros here. To avoid using
     294              :             // half initialized segment, first bake it under tmp filename and
     295              :             // then rename.
     296            5 :             let tmp_path = self.timeline_dir.join("waltmp");
     297            5 :             let file: File = File::create(&tmp_path).await.with_context(|| {
     298              :                 /* BEGIN_HADRON */
     299            0 :                 WAL_DISK_IO_ERRORS.inc();
     300              :                 /* END_HADRON */
     301            0 :                 format!("Failed to open tmp wal file {:?}", &tmp_path)
     302            0 :             })?;
     303              : 
     304            5 :             fail::fail_point!("sk-zero-segment", |_| {
     305            0 :                 info!("sk-zero-segment failpoint hit");
     306            0 :                 Err(anyhow::anyhow!("failpoint: sk-zero-segment"))
     307            0 :             });
     308            5 :             file.set_len(self.wal_seg_size as u64).await?;
     309              : 
     310            5 :             if let Err(e) = durable_rename(&tmp_path, &wal_file_partial_path, !self.no_sync).await {
     311              :                 // Probably rename succeeded, but fsync of it failed. Remove
     312              :                 // the file then to avoid using it.
     313            0 :                 remove_file(wal_file_partial_path)
     314            0 :                     .await
     315            0 :                     .or_else(utils::fs_ext::ignore_not_found)?;
     316            0 :                 return Err(e.into());
     317            5 :             }
     318            5 :             Ok((file, true))
     319              :         }
     320           15 :     }
     321              : 
     322              :     /// Write WAL bytes, which are known to be located in a single WAL segment. Returns true if the
     323              :     /// segment was completed, closed, and flushed to disk.
     324          620 :     async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<bool> {
     325          620 :         let mut file = if let Some(file) = self.file.take() {
     326          615 :             file
     327              :         } else {
     328            5 :             let (mut file, is_partial) = self.open_or_create(segno).await?;
     329            5 :             assert!(is_partial, "unexpected write into non-partial segment file");
     330            5 :             file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     331            5 :             file
     332              :         };
     333              : 
     334          620 :         file.write_all(buf).await?;
     335              :         // Note: flush just ensures write above reaches the OS (this is not
     336              :         // needed in case of sync IO as Write::write there calls directly write
     337              :         // syscall, but needed in case of async). It does *not* fsyncs the file.
     338          620 :         file.flush().await?;
     339              : 
     340          620 :         if xlogoff + buf.len() == self.wal_seg_size {
     341              :             // If we reached the end of a WAL segment, flush and close it.
     342            0 :             self.fdatasync_file(&file).await?;
     343              : 
     344              :             // Rename partial file to completed file
     345            0 :             let (wal_file_path, wal_file_partial_path) =
     346            0 :                 wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
     347            0 :             fs::rename(wal_file_partial_path, wal_file_path).await?;
     348            0 :             Ok(true)
     349              :         } else {
     350              :             // otherwise, file can be reused later
     351          620 :             self.file = Some(file);
     352          620 :             Ok(false)
     353              :         }
     354          620 :     }
     355              : 
     356              :     /// Writes WAL to the segment files, until everything is writed. If some segments
     357              :     /// are fully written, they are flushed to disk. The last (partial) segment can
     358              :     /// be flushed separately later.
     359              :     ///
     360              :     /// Updates `write_lsn` and `flush_lsn`.
     361          620 :     async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
     362              :         // TODO: this shouldn't be possible, except possibly with write_lsn == 0.
     363              :         // Rename this method to `append_exact`, and make it append-only, removing
     364              :         // the `pos` parameter and this check. For this reason, we don't update
     365              :         // `flush_lsn` here.
     366          620 :         if self.write_lsn != pos {
     367              :             // need to flush the file before discarding it
     368            0 :             if let Some(file) = self.file.take() {
     369            0 :                 self.fdatasync_file(&file).await?;
     370            0 :             }
     371              : 
     372            0 :             self.write_lsn = pos;
     373          620 :         }
     374              : 
     375         1240 :         while !buf.is_empty() {
     376              :             // Extract WAL location for this block
     377          620 :             let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size);
     378          620 :             let segno = self.write_lsn.segment_number(self.wal_seg_size);
     379              : 
     380              :             // If crossing a WAL boundary, only write up until we reach wal segment size.
     381          620 :             let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
     382            0 :                 self.wal_seg_size - xlogoff
     383              :             } else {
     384          620 :                 buf.len()
     385              :             };
     386              : 
     387          620 :             let flushed = self
     388          620 :                 .write_in_segment(segno, xlogoff, &buf[..bytes_write])
     389          620 :                 .await
     390              :                 /* BEGIN_HADRON */
     391          620 :                 .inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?;
     392              :             /* END_HADRON */
     393              : 
     394          620 :             self.write_lsn += bytes_write as u64;
     395          620 :             if flushed {
     396            0 :                 self.flush_lsn = self.write_lsn;
     397          620 :             }
     398          620 :             buf = &buf[bytes_write..];
     399              :         }
     400              : 
     401          620 :         Ok(())
     402          620 :     }
     403              : }
     404              : 
     405              : impl Storage for PhysicalStorage {
     406              :     // Last written LSN.
     407          620 :     fn write_lsn(&self) -> Lsn {
     408          620 :         self.write_lsn
     409          620 :     }
     410              :     /// flush_lsn returns LSN of last durably stored WAL record.
     411              :     ///
     412              :     /// TODO: flush_lsn() returns flush_record_lsn, but write_lsn() returns write_lsn: confusing.
     413              :     #[allow(clippy::misnamed_getters)]
     414         5161 :     fn flush_lsn(&self) -> Lsn {
     415         5161 :         self.flush_record_lsn
     416         5161 :     }
     417              : 
     418            5 :     async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()> {
     419            5 :         let _timer = WAL_STORAGE_OPERATION_SECONDS
     420            5 :             .with_label_values(&["initialize_first_segment"])
     421            5 :             .start_timer();
     422              : 
     423            5 :         let segno = init_lsn.segment_number(self.wal_seg_size);
     424            5 :         let (mut file, _) = self.open_or_create(segno).await?;
     425            5 :         let major_pg_version = PgMajorVersion::try_from(self.pg_version).unwrap();
     426            5 :         let wal_seg =
     427            5 :             postgres_ffi::generate_wal_segment(segno, self.system_id, major_pg_version, init_lsn)?;
     428            5 :         file.seek(SeekFrom::Start(0)).await?;
     429            5 :         file.write_all(&wal_seg).await?;
     430            5 :         file.flush().await?;
     431            5 :         info!("initialized segno {} at lsn {}", segno, init_lsn);
     432              :         // note: file is *not* fsynced
     433            5 :         Ok(())
     434            5 :     }
     435              : 
     436              :     /// Write WAL to disk.
     437          620 :     async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
     438              :         // Disallow any non-sequential writes, which can result in gaps or overwrites.
     439              :         // If we need to move the pointer, use truncate_wal() instead.
     440          620 :         if self.write_lsn > startpos {
     441            0 :             bail!(
     442            0 :                 "write_wal rewrites WAL written before, write_lsn={}, startpos={}",
     443              :                 self.write_lsn,
     444              :                 startpos
     445              :             );
     446          620 :         }
     447          620 :         if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
     448            0 :             bail!(
     449            0 :                 "write_wal creates gap in written WAL, write_lsn={}, startpos={}",
     450              :                 self.write_lsn,
     451              :                 startpos
     452              :             );
     453          620 :         }
     454          620 :         if self.pending_wal_truncation {
     455            0 :             bail!(
     456            0 :                 "write_wal called with pending WAL truncation, write_lsn={}, startpos={}",
     457              :                 self.write_lsn,
     458              :                 startpos
     459              :             );
     460          620 :         }
     461              : 
     462          620 :         let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
     463              :         // WAL is written, updating write metrics
     464          620 :         self.metrics.observe_write_seconds(write_seconds);
     465          620 :         self.metrics.observe_write_bytes(buf.len());
     466              : 
     467              :         // Figure out the last record's end LSN and update `write_record_lsn`
     468              :         // (if we got a whole record). The write may also have closed and
     469              :         // flushed a segment, so update `flush_record_lsn` as well.
     470          620 :         if self.decoder.available() != startpos {
     471            5 :             info!(
     472            0 :                 "restart decoder from {} to {}",
     473            0 :                 self.decoder.available(),
     474              :                 startpos,
     475              :             );
     476            5 :             let pg_version = self.decoder.pg_version;
     477            5 :             self.decoder = WalStreamDecoder::new(startpos, pg_version);
     478          615 :         }
     479          620 :         self.decoder.feed_bytes(buf);
     480              : 
     481          620 :         if self.write_record_lsn <= self.flush_lsn {
     482          620 :             // We may have flushed a previously written record.
     483          620 :             self.flush_record_lsn = self.write_record_lsn;
     484          620 :         }
     485         1240 :         while let Some((lsn, _rec)) = self.decoder.poll_decode()? {
     486          620 :             self.write_record_lsn = lsn;
     487          620 :             if lsn <= self.flush_lsn {
     488            0 :                 self.flush_record_lsn = lsn;
     489          620 :             }
     490              :         }
     491              : 
     492          620 :         Ok(())
     493          620 :     }
     494              : 
     495          620 :     async fn flush_wal(&mut self) -> Result<()> {
     496          620 :         if self.flush_record_lsn == self.write_record_lsn {
     497              :             // no need to do extra flush
     498            0 :             return Ok(());
     499          620 :         }
     500              : 
     501          620 :         if let Some(unflushed_file) = self.file.take() {
     502          620 :             self.fdatasync_file(&unflushed_file)
     503          620 :                 .await
     504              :                 /* BEGIN_HADRON */
     505          620 :                 .inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?;
     506              :             /* END_HADRON */
     507          620 :             self.file = Some(unflushed_file);
     508              :         } else {
     509              :             // We have unflushed data (write_lsn != flush_lsn), but no file. This
     510              :             // shouldn't happen, since the segment is flushed on close.
     511            0 :             bail!(
     512            0 :                 "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
     513              :                 self.write_lsn,
     514              :                 self.flush_record_lsn
     515              :             );
     516              :         }
     517              : 
     518              :         // everything is flushed now, let's update flush_lsn
     519          620 :         self.flush_lsn = self.write_lsn;
     520          620 :         self.flush_record_lsn = self.write_record_lsn;
     521          620 :         Ok(())
     522          620 :     }
     523              : 
     524              :     /// Truncate written WAL by removing all WAL segments after the given LSN.
     525              :     /// end_pos must point to the end of the WAL record.
     526            5 :     async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
     527            5 :         let _timer = WAL_STORAGE_OPERATION_SECONDS
     528            5 :             .with_label_values(&["truncate_wal"])
     529            5 :             .start_timer();
     530              : 
     531              :         // Streaming must not create a hole, so truncate cannot be called on
     532              :         // non-written lsn.
     533            5 :         if self.write_record_lsn != Lsn(0) && end_pos > self.write_record_lsn {
     534            0 :             bail!(
     535            0 :                 "truncate_wal called on non-written WAL, write_record_lsn={}, end_pos={}",
     536              :                 self.write_record_lsn,
     537              :                 end_pos
     538              :             );
     539            5 :         }
     540              : 
     541              :         // Quick exit if nothing to do and we know that the state is clean to
     542              :         // avoid writing up to 16 MiB of zeros on disk (this happens on each
     543              :         // connect).
     544            5 :         if !self.pending_wal_truncation
     545            0 :             && end_pos == self.write_lsn
     546            0 :             && end_pos == self.flush_record_lsn
     547              :         {
     548            0 :             return Ok(());
     549            5 :         }
     550              : 
     551              :         // Atomicity: we start with LSNs reset because once on disk deletion is
     552              :         // started it can't be reversed. However, we might crash/error in the
     553              :         // middle, leaving garbage above the truncation point. In theory,
     554              :         // concatenated with previous records it might form bogus WAL (though
     555              :         // very unlikely in practice because CRC would guard from that). To
     556              :         // protect, set pending_wal_truncation flag before beginning: it means
     557              :         // truncation must be retried and WAL writes are prohibited until it
     558              :         // succeeds. Flag is also set on boot because we don't know if the last
     559              :         // state was clean.
     560              :         //
     561              :         // Protocol (HandleElected before first AppendRequest) ensures we'll
     562              :         // always try to ensure clean truncation before any writes.
     563            5 :         self.pending_wal_truncation = true;
     564              : 
     565            5 :         self.write_lsn = end_pos;
     566            5 :         self.flush_lsn = end_pos;
     567            5 :         self.write_record_lsn = end_pos;
     568            5 :         self.flush_record_lsn = end_pos;
     569              : 
     570              :         // Close previously opened file, if any
     571            5 :         if let Some(unflushed_file) = self.file.take() {
     572            0 :             self.fdatasync_file(&unflushed_file).await?;
     573            5 :         }
     574              : 
     575            5 :         let xlogoff = end_pos.segment_offset(self.wal_seg_size);
     576            5 :         let segno = end_pos.segment_number(self.wal_seg_size);
     577              : 
     578              :         // Remove all segments after the given LSN.
     579            5 :         remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
     580              : 
     581            5 :         let (file, is_partial) = self.open_or_create(segno).await?;
     582              : 
     583              :         // Fill end with zeroes
     584            5 :         file.set_len(xlogoff as u64).await?;
     585            5 :         file.set_len(self.wal_seg_size as u64).await?;
     586            5 :         self.fsync_file(&file).await?;
     587              : 
     588            5 :         if !is_partial {
     589              :             // Make segment partial once again
     590            0 :             let (wal_file_path, wal_file_partial_path) =
     591            0 :                 wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
     592            0 :             fs::rename(wal_file_path, wal_file_partial_path).await?;
     593            5 :         }
     594              : 
     595            5 :         self.pending_wal_truncation = false;
     596            5 :         info!("truncated WAL to {}", end_pos);
     597            5 :         Ok(())
     598            5 :     }
     599              : 
     600            0 :     fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
     601            0 :         let timeline_dir = self.timeline_dir.clone();
     602            0 :         let wal_seg_size = self.wal_seg_size;
     603            0 :         Box::pin(async move {
     604            0 :             remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
     605            0 :         })
     606            0 :     }
     607              : 
     608            0 :     fn close(&mut self) {
     609              :         // close happens in destructor
     610            0 :         let _open_file = self.file.take();
     611            0 :     }
     612              : 
     613            0 :     fn get_metrics(&self) -> WalStorageMetrics {
     614            0 :         self.metrics.clone()
     615            0 :     }
     616              : }
     617              : 
     618              : /// Remove all WAL segments in timeline_dir that match the given predicate.
     619            5 : async fn remove_segments_from_disk(
     620            5 :     timeline_dir: &Utf8Path,
     621            5 :     wal_seg_size: usize,
     622            5 :     remove_predicate: impl Fn(XLogSegNo) -> bool,
     623            5 : ) -> Result<()> {
     624            5 :     let _timer = WAL_STORAGE_OPERATION_SECONDS
     625            5 :         .with_label_values(&["remove_segments_from_disk"])
     626            5 :         .start_timer();
     627              : 
     628            5 :     let mut n_removed = 0;
     629            5 :     let mut min_removed = u64::MAX;
     630            5 :     let mut max_removed = u64::MIN;
     631              : 
     632            5 :     let mut entries = fs::read_dir(timeline_dir).await?;
     633           15 :     while let Some(entry) = entries.next_entry().await? {
     634           10 :         let entry_path = entry.path();
     635           10 :         let fname = entry_path.file_name().unwrap();
     636              :         /* Ignore files that are not XLOG segments */
     637           10 :         if !IsXLogFileName(fname) && !IsPartialXLogFileName(fname) {
     638            5 :             continue;
     639            5 :         }
     640            5 :         let (segno, _) = XLogFromFileName(fname, wal_seg_size)?;
     641            5 :         if remove_predicate(segno) {
     642            0 :             remove_file(entry_path).await?;
     643            0 :             n_removed += 1;
     644            0 :             min_removed = min(min_removed, segno);
     645            0 :             max_removed = max(max_removed, segno);
     646            0 :             REMOVED_WAL_SEGMENTS.inc();
     647            5 :         }
     648              :     }
     649              : 
     650            5 :     if n_removed > 0 {
     651            0 :         info!(
     652            0 :             "removed {} WAL segments [{}; {}]",
     653              :             n_removed, min_removed, max_removed
     654              :         );
     655            5 :     }
     656            5 :     Ok(())
     657            5 : }
     658              : 
     659              : pub struct WalReader {
     660              :     remote_path: RemotePath,
     661              :     timeline_dir: Utf8PathBuf,
     662              :     wal_seg_size: usize,
     663              :     pos: Lsn,
     664              :     wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
     665              : 
     666              :     // S3 will be used to read WAL if LSN is not available locally
     667              :     wal_backup: Arc<WalBackup>,
     668              : 
     669              :     // We don't have WAL locally if LSN is less than local_start_lsn
     670              :     local_start_lsn: Lsn,
     671              :     // We will respond with zero-ed bytes before this Lsn as long as
     672              :     // pos is in the same segment as timeline_start_lsn.
     673              :     timeline_start_lsn: Lsn,
     674              :     // integer version number of PostgreSQL, e.g. 14; 15; 16
     675              :     pg_version: PgMajorVersion,
     676              :     system_id: SystemId,
     677              :     timeline_start_segment: Option<Bytes>,
     678              : }
     679              : 
     680              : impl WalReader {
     681            9 :     pub fn new(
     682            9 :         ttid: &TenantTimelineId,
     683            9 :         timeline_dir: Utf8PathBuf,
     684            9 :         state: &TimelinePersistentState,
     685            9 :         start_pos: Lsn,
     686            9 :         wal_backup: Arc<WalBackup>,
     687            9 :     ) -> Result<Self> {
     688            9 :         if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
     689            0 :             bail!("state uninitialized, no data to read");
     690            9 :         }
     691              : 
     692              :         // TODO: Upgrade to bail!() once we know this couldn't possibly happen
     693            9 :         if state.timeline_start_lsn == Lsn(0) {
     694            0 :             warn!("timeline_start_lsn uninitialized before initializing wal reader");
     695            9 :         }
     696              : 
     697            9 :         if start_pos
     698            9 :             < state
     699            9 :                 .timeline_start_lsn
     700            9 :                 .segment_lsn(state.server.wal_seg_size as usize)
     701              :         {
     702            0 :             bail!(
     703            0 :                 "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
     704              :                 start_pos,
     705              :                 state.timeline_start_lsn
     706              :             );
     707            9 :         }
     708              : 
     709              :         Ok(Self {
     710            9 :             remote_path: remote_timeline_path(ttid)?,
     711            9 :             timeline_dir,
     712            9 :             wal_seg_size: state.server.wal_seg_size as usize,
     713            9 :             pos: start_pos,
     714            9 :             wal_segment: None,
     715            9 :             wal_backup,
     716            9 :             local_start_lsn: state.local_start_lsn,
     717            9 :             timeline_start_lsn: state.timeline_start_lsn,
     718            9 :             pg_version: PgMajorVersion::try_from(state.server.pg_version).unwrap(),
     719            9 :             system_id: state.server.system_id,
     720            9 :             timeline_start_segment: None,
     721              :         })
     722            9 :     }
     723              : 
     724              :     /// Read WAL at current position into provided buf, returns number of bytes
     725              :     /// read. It can be smaller than buf size only if segment boundary is
     726              :     /// reached.
     727           79 :     pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
     728              :         // If this timeline is new, we may not have a full segment yet, so
     729              :         // we pad the first bytes of the timeline's first WAL segment with 0s
     730           79 :         if self.pos < self.timeline_start_lsn {
     731            0 :             debug_assert_eq!(
     732            0 :                 self.pos.segment_number(self.wal_seg_size),
     733            0 :                 self.timeline_start_lsn.segment_number(self.wal_seg_size)
     734              :             );
     735              : 
     736              :             // All bytes after timeline_start_lsn are in WAL, but those before
     737              :             // are not, so we manually construct an empty segment for the bytes
     738              :             // not available in this timeline.
     739            0 :             if self.timeline_start_segment.is_none() {
     740            0 :                 let it = postgres_ffi::generate_wal_segment(
     741            0 :                     self.timeline_start_lsn.segment_number(self.wal_seg_size),
     742            0 :                     self.system_id,
     743            0 :                     self.pg_version,
     744            0 :                     self.timeline_start_lsn,
     745            0 :                 )?;
     746            0 :                 self.timeline_start_segment = Some(it);
     747            0 :             }
     748              : 
     749            0 :             assert!(self.timeline_start_segment.is_some());
     750            0 :             let segment = self.timeline_start_segment.take().unwrap();
     751              : 
     752            0 :             let seg_bytes = &segment[..];
     753              : 
     754              :             // How much of the current segment have we already consumed?
     755            0 :             let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
     756              : 
     757              :             // How many bytes may we consume in total?
     758            0 :             let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
     759              : 
     760            0 :             debug_assert!(seg_bytes.len() > pos_seg_offset);
     761            0 :             debug_assert!(seg_bytes.len() > tl_start_seg_offset);
     762              : 
     763              :             // Copy as many bytes as possible into the buffer
     764            0 :             let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
     765            0 :             buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
     766              : 
     767            0 :             self.pos += len as u64;
     768              : 
     769              :             // If we're done with the segment, we can release it's memory.
     770              :             // However, if we're not yet done, store it so that we don't have to
     771              :             // construct the segment the next time this function is called.
     772            0 :             if self.pos < self.timeline_start_lsn {
     773            0 :                 self.timeline_start_segment = Some(segment);
     774            0 :             }
     775              : 
     776            0 :             return Ok(len);
     777           79 :         }
     778              : 
     779           79 :         let mut wal_segment = match self.wal_segment.take() {
     780           70 :             Some(reader) => reader,
     781            9 :             None => self.open_segment().await?,
     782              :         };
     783              : 
     784              :         // How much to read and send in message? We cannot cross the WAL file
     785              :         // boundary, and we don't want send more than provided buffer.
     786           78 :         let xlogoff = self.pos.segment_offset(self.wal_seg_size);
     787           78 :         let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
     788              : 
     789              :         // Read some data from the file.
     790           78 :         let buf = &mut buf[0..send_size];
     791           78 :         let send_size = wal_segment.read_exact(buf).await?;
     792           76 :         self.pos += send_size as u64;
     793              : 
     794              :         // Decide whether to reuse this file. If we don't set wal_segment here
     795              :         // a new reader will be opened next time.
     796           76 :         if self.pos.segment_offset(self.wal_seg_size) != 0 {
     797           76 :             self.wal_segment = Some(wal_segment);
     798           76 :         }
     799              : 
     800           76 :         Ok(send_size)
     801           76 :     }
     802              : 
     803              :     /// Open WAL segment at the current position of the reader.
     804            9 :     async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
     805            9 :         let xlogoff = self.pos.segment_offset(self.wal_seg_size);
     806            9 :         let segno = self.pos.segment_number(self.wal_seg_size);
     807            9 :         let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
     808              : 
     809              :         // Try to open local file, if we may have WAL locally
     810            9 :         if self.pos >= self.local_start_lsn {
     811            9 :             let res = open_wal_file(&self.timeline_dir, segno, self.wal_seg_size).await?;
     812            8 :             if let Some((mut file, _)) = res {
     813            8 :                 file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     814            8 :                 return Ok(Box::pin(file));
     815            0 :             } else {
     816            0 :                 // NotFound is expected, fall through to remote read
     817            0 :             }
     818            0 :         }
     819              : 
     820              :         // Try to open remote file, if remote reads are enabled
     821            0 :         if let Some(storage) = self.wal_backup.get_storage() {
     822            0 :             let remote_wal_file_path = self.remote_path.join(&wal_file_name);
     823            0 :             return read_object(&storage, &remote_wal_file_path, xlogoff as u64).await;
     824            0 :         }
     825              : 
     826            0 :         bail!("WAL segment is not found")
     827            8 :     }
     828              : }
     829              : 
     830              : /// Helper function for opening WAL segment `segno` in `dir`. Returns file and
     831              : /// whether it is .partial.
     832            9 : pub(crate) async fn open_wal_file(
     833            9 :     timeline_dir: &Utf8Path,
     834            9 :     segno: XLogSegNo,
     835            9 :     wal_seg_size: usize,
     836            9 : ) -> Result<Option<(tokio::fs::File, bool)>> {
     837            9 :     let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size);
     838              : 
     839              :     // First try to open the .partial file.
     840            9 :     let mut partial_path = wal_file_path.to_owned();
     841            9 :     partial_path.set_extension("partial");
     842            9 :     if let Ok(opened_file) = tokio::fs::File::open(&wal_file_partial_path).await {
     843            8 :         return Ok(Some((opened_file, true)));
     844            0 :     }
     845              : 
     846              :     // If that failed, try it without the .partial extension.
     847            0 :     let pf_res = tokio::fs::File::open(&wal_file_path).await;
     848            0 :     if let Err(e) = &pf_res {
     849            0 :         if e.kind() == ErrorKind::NotFound {
     850            0 :             return Ok(None);
     851            0 :         }
     852            0 :     }
     853            0 :     let pf = pf_res
     854            0 :         .with_context(|| format!("failed to open WAL file {wal_file_path:#}"))
     855            0 :         .map_err(|e| {
     856            0 :             warn!("{e}");
     857            0 :             e
     858            0 :         })?;
     859              : 
     860            0 :     Ok(Some((pf, false)))
     861            8 : }
     862              : 
     863              : /// Helper returning full path to WAL segment file and its .partial brother.
     864           24 : pub fn wal_file_paths(
     865           24 :     timeline_dir: &Utf8Path,
     866           24 :     segno: XLogSegNo,
     867           24 :     wal_seg_size: usize,
     868           24 : ) -> (Utf8PathBuf, Utf8PathBuf) {
     869           24 :     let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     870           24 :     let wal_file_path = timeline_dir.join(wal_file_name.clone());
     871           24 :     let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
     872           24 :     (wal_file_path, wal_file_partial_path)
     873           24 : }
        

Generated by: LCOV version 2.1-beta