LCOV - code coverage report
Current view: top level - safekeeper/src - wal_storage.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 87.1 % 442 385
Test Date: 2024-02-12 20:26:03 Functions: 86.8 % 53 46

            Line data    Source code
       1              : //! This module has everything to deal with WAL -- reading and writing to disk.
       2              : //!
       3              : //! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
       4              : //! PG timeline is always 1, so WAL segments are usually have names like this:
       5              : //! - 000000010000000000000001
       6              : //! - 000000010000000000000002.partial
       7              : //!
       8              : //! Note that last file has `.partial` suffix, that's different from postgres.
       9              : 
      10              : use anyhow::{bail, Context, Result};
      11              : use bytes::Bytes;
      12              : use camino::{Utf8Path, Utf8PathBuf};
      13              : use futures::future::BoxFuture;
      14              : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
      15              : use postgres_ffi::{dispatch_pgversion, XLogSegNo, PG_TLI};
      16              : use remote_storage::RemotePath;
      17              : use std::cmp::{max, min};
      18              : use std::io::{self, SeekFrom};
      19              : use std::pin::Pin;
      20              : use tokio::fs::{self, remove_file, File, OpenOptions};
      21              : use tokio::io::{AsyncRead, AsyncWriteExt};
      22              : use tokio::io::{AsyncReadExt, AsyncSeekExt};
      23              : use tracing::*;
      24              : use utils::crashsafe::durable_rename;
      25              : 
      26              : use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
      27              : use crate::state::TimelinePersistentState;
      28              : use crate::wal_backup::read_object;
      29              : use crate::SafeKeeperConf;
      30              : use postgres_ffi::waldecoder::WalStreamDecoder;
      31              : use postgres_ffi::XLogFileName;
      32              : use postgres_ffi::XLOG_BLCKSZ;
      33              : use pq_proto::SystemId;
      34              : use utils::{id::TenantTimelineId, lsn::Lsn};
      35              : 
      36              : #[async_trait::async_trait]
      37              : pub trait Storage {
      38              :     /// LSN of last durably stored WAL record.
      39              :     fn flush_lsn(&self) -> Lsn;
      40              : 
      41              :     /// Write piece of WAL from buf to disk, but not necessarily sync it.
      42              :     async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
      43              : 
      44              :     /// Truncate WAL at specified LSN, which must be the end of WAL record.
      45              :     async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
      46              : 
      47              :     /// Durably store WAL on disk, up to the last written WAL record.
      48              :     async fn flush_wal(&mut self) -> Result<()>;
      49              : 
      50              :     /// Remove all segments <= given segno. Returns function doing that as we
      51              :     /// want to perform it without timeline lock.
      52              :     fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
      53              : 
      54              :     /// Release resources associated with the storage -- technically, close FDs.
      55              :     /// Currently we don't remove timelines until restart (#3146), so need to
      56              :     /// spare descriptors. This would be useful for temporary tli detach as
      57              :     /// well.
      58            0 :     fn close(&mut self) {}
      59              : 
      60              :     /// Get metrics for this timeline.
      61              :     fn get_metrics(&self) -> WalStorageMetrics;
      62              : }
      63              : 
      64              : /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
      65              : /// for better performance. Storage is initialized in the constructor.
      66              : ///
      67              : /// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
      68              : /// its filename and may be not fully flushed.
      69              : ///
      70              : /// Relationship of LSNs:
      71              : /// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
      72              : ///
      73              : /// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
      74              : pub struct PhysicalStorage {
      75              :     metrics: WalStorageMetrics,
      76              :     timeline_dir: Utf8PathBuf,
      77              :     conf: SafeKeeperConf,
      78              : 
      79              :     /// Size of WAL segment in bytes.
      80              :     wal_seg_size: usize,
      81              : 
      82              :     /// Written to disk, but possibly still in the cache and not fully persisted.
      83              :     /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
      84              :     write_lsn: Lsn,
      85              : 
      86              :     /// The LSN of the last WAL record written to disk. Still can be not fully flushed.
      87              :     write_record_lsn: Lsn,
      88              : 
      89              :     /// The LSN of the last WAL record flushed to disk.
      90              :     flush_record_lsn: Lsn,
      91              : 
      92              :     /// Decoder is required for detecting boundaries of WAL records.
      93              :     decoder: WalStreamDecoder,
      94              : 
      95              :     /// Cached open file for the last segment.
      96              :     ///
      97              :     /// If Some(file) is open, then it always:
      98              :     /// - has ".partial" suffix
      99              :     /// - points to write_lsn, so no seek is needed for writing
     100              :     /// - doesn't point to the end of the segment
     101              :     file: Option<File>,
     102              : 
     103              :     /// When false, we have just initialized storage using the LSN from find_end_of_wal().
     104              :     /// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
     105              :     /// there can be a case with unexpected .partial file.
     106              :     ///
     107              :     /// Imagine the following:
     108              :     /// - 000000010000000000000001
     109              :     ///   - it was fully written, but the last record is split between 2 segments
     110              :     ///   - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment
     111              :     ///   - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0
     112              :     /// - 000000010000000000000002.partial
     113              :     ///   - it has only 1 byte written, which is not enough to make a full WAL record
     114              :     ///
     115              :     /// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
     116              :     /// This flag will be set to true after the first truncate_wal() call.
     117              :     ///
     118              :     /// [`write_lsn`]: Self::write_lsn
     119              :     is_truncated_after_restart: bool,
     120              : }
     121              : 
     122              : impl PhysicalStorage {
     123              :     /// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from
     124              :     /// the disk. Otherwise, all LSNs are set to zero.
     125          661 :     pub fn new(
     126          661 :         ttid: &TenantTimelineId,
     127          661 :         timeline_dir: Utf8PathBuf,
     128          661 :         conf: &SafeKeeperConf,
     129          661 :         state: &TimelinePersistentState,
     130          661 :     ) -> Result<PhysicalStorage> {
     131          661 :         let wal_seg_size = state.server.wal_seg_size as usize;
     132              : 
     133              :         // Find out where stored WAL ends, starting at commit_lsn which is a
     134              :         // known recent record boundary (unless we don't have WAL at all).
     135              :         //
     136              :         // NB: find_end_of_wal MUST be backwards compatible with the previously
     137              :         // written WAL. If find_end_of_wal fails to read any WAL written by an
     138              :         // older version of the code, we could lose data forever.
     139          661 :         let write_lsn = if state.commit_lsn == Lsn(0) {
     140          476 :             Lsn(0)
     141              :         } else {
     142          185 :             let version = state.server.pg_version / 10000;
     143          185 : 
     144          185 :             dispatch_pgversion!(
     145          185 :                 version,
     146              :                 pgv::xlog_utils::find_end_of_wal(
     147          185 :                     timeline_dir.as_std_path(),
     148          185 :                     wal_seg_size,
     149          185 :                     state.commit_lsn,
     150            0 :                 )?,
     151            0 :                 bail!("unsupported postgres version: {}", version)
     152              :             )
     153              :         };
     154              : 
     155              :         // TODO: do we really know that write_lsn is fully flushed to disk?
     156              :         //      If not, maybe it's better to call fsync() here to be sure?
     157          661 :         let flush_lsn = write_lsn;
     158          661 : 
     159          661 :         debug!(
     160            0 :             "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
     161            0 :             ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
     162            0 :         );
     163          661 :         if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
     164            0 :             warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id);
     165          661 :         }
     166              : 
     167          661 :         Ok(PhysicalStorage {
     168          661 :             metrics: WalStorageMetrics::default(),
     169          661 :             timeline_dir,
     170          661 :             conf: conf.clone(),
     171          661 :             wal_seg_size,
     172          661 :             write_lsn,
     173          661 :             write_record_lsn: write_lsn,
     174          661 :             flush_record_lsn: flush_lsn,
     175          661 :             decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
     176          661 :             file: None,
     177          661 :             is_truncated_after_restart: false,
     178          661 :         })
     179          661 :     }
     180              : 
     181              :     /// Get all known state of the storage.
     182            5 :     pub fn internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
     183            5 :         (
     184            5 :             self.write_lsn,
     185            5 :             self.write_record_lsn,
     186            5 :             self.flush_record_lsn,
     187            5 :             self.file.is_some(),
     188            5 :         )
     189            5 :     }
     190              : 
     191              :     /// Call fdatasync if config requires so.
     192       977169 :     async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
     193       977169 :         if !self.conf.no_sync {
     194            0 :             self.metrics
     195            0 :                 .observe_flush_seconds(time_io_closure(file.sync_data()).await?);
     196       977169 :         }
     197       977169 :         Ok(())
     198       977169 :     }
     199              : 
     200              :     /// Open or create WAL segment file. Caller must call seek to the wanted position.
     201              :     /// Returns `file` and `is_partial`.
     202         1900 :     async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
     203         1900 :         let (wal_file_path, wal_file_partial_path) =
     204         1900 :             wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
     205              : 
     206              :         // Try to open already completed segment
     207         1900 :         if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
     208            0 :             Ok((file, false))
     209         1900 :         } else if let Ok(file) = OpenOptions::new()
     210         1900 :             .write(true)
     211         1900 :             .open(&wal_file_partial_path)
     212         1855 :             .await
     213              :         {
     214              :             // Try to open existing partial file
     215          644 :             Ok((file, true))
     216              :         } else {
     217              :             // Create and fill new partial file
     218              :             //
     219              :             // We're using fdatasync during WAL writing, so file size must not
     220              :             // change; to this end it is filled with zeros here. To avoid using
     221              :             // half initialized segment, first bake it under tmp filename and
     222              :             // then rename.
     223         1256 :             let tmp_path = self.timeline_dir.join("waltmp");
     224         1256 :             let mut file = OpenOptions::new()
     225         1256 :                 .create(true)
     226         1256 :                 .write(true)
     227         1256 :                 .open(&tmp_path)
     228         1220 :                 .await
     229         1256 :                 .with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?;
     230              : 
     231      2414878 :             write_zeroes(&mut file, self.wal_seg_size).await?;
     232              : 
     233              :             // Note: this doesn't get into observe_flush_seconds metric. But
     234              :             // segment init should be separate metric, if any.
     235            0 :             if let Err(e) =
     236         1254 :                 durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
     237              :             {
     238              :                 // Probably rename succeeded, but fsync of it failed. Remove
     239              :                 // the file then to avoid using it.
     240            0 :                 remove_file(wal_file_partial_path)
     241            0 :                     .await
     242            0 :                     .or_else(utils::fs_ext::ignore_not_found)?;
     243            0 :                 return Err(e.into());
     244         1254 :             }
     245         1254 :             Ok((file, true))
     246              :         }
     247         1900 :     }
     248              : 
     249              :     /// Write WAL bytes, which are known to be located in a single WAL segment.
     250      1713279 :     async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
     251      1713279 :         let mut file = if let Some(file) = self.file.take() {
     252      1711940 :             file
     253              :         } else {
     254      1479199 :             let (mut file, is_partial) = self.open_or_create(segno).await?;
     255         1337 :             assert!(is_partial, "unexpected write into non-partial segment file");
     256         1337 :             file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     257         1337 :             file
     258              :         };
     259              : 
     260      1713277 :         file.write_all(buf).await?;
     261              :         // Note: flush just ensures write above reaches the OS (this is not
     262              :         // needed in case of sync IO as Write::write there calls directly write
     263              :         // syscall, but needed in case of async). It does *not* fsyncs the file.
     264      1713277 :         file.flush().await?;
     265              : 
     266      1713277 :         if xlogoff + buf.len() == self.wal_seg_size {
     267              :             // If we reached the end of a WAL segment, flush and close it.
     268          780 :             self.fdatasync_file(&file).await?;
     269              : 
     270              :             // Rename partial file to completed file
     271          780 :             let (wal_file_path, wal_file_partial_path) =
     272          780 :                 wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
     273          780 :             fs::rename(wal_file_partial_path, wal_file_path).await?;
     274      1712497 :         } else {
     275      1712497 :             // otherwise, file can be reused later
     276      1712497 :             self.file = Some(file);
     277      1712497 :         }
     278              : 
     279      1713277 :         Ok(())
     280      1713279 :     }
     281              : 
     282              :     /// Writes WAL to the segment files, until everything is writed. If some segments
     283              :     /// are fully written, they are flushed to disk. The last (partial) segment can
     284              :     /// be flushed separately later.
     285              :     ///
     286              :     /// Updates `write_lsn`.
     287      1712564 :     async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
     288      1712564 :         if self.write_lsn != pos {
     289              :             // need to flush the file before discarding it
     290            0 :             if let Some(file) = self.file.take() {
     291            0 :                 self.fdatasync_file(&file).await?;
     292            0 :             }
     293              : 
     294            0 :             self.write_lsn = pos;
     295      1712564 :         }
     296              : 
     297      3425841 :         while !buf.is_empty() {
     298              :             // Extract WAL location for this block
     299      1713279 :             let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size);
     300      1713279 :             let segno = self.write_lsn.segment_number(self.wal_seg_size);
     301              : 
     302              :             // If crossing a WAL boundary, only write up until we reach wal segment size.
     303      1713279 :             let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
     304          715 :                 self.wal_seg_size - xlogoff
     305              :             } else {
     306      1712564 :                 buf.len()
     307              :             };
     308              : 
     309      1713279 :             self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
     310      3147876 :                 .await?;
     311      1713277 :             self.write_lsn += bytes_write as u64;
     312      1713277 :             buf = &buf[bytes_write..];
     313              :         }
     314              : 
     315      1712562 :         Ok(())
     316      1712564 :     }
     317              : }
     318              : 
     319              : #[async_trait::async_trait]
     320              : impl Storage for PhysicalStorage {
     321              :     /// flush_lsn returns LSN of last durably stored WAL record.
     322      8710085 :     fn flush_lsn(&self) -> Lsn {
     323      8710085 :         self.flush_record_lsn
     324      8710085 :     }
     325              : 
     326              :     /// Write WAL to disk.
     327      1712564 :     async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
     328              :         // Disallow any non-sequential writes, which can result in gaps or overwrites.
     329              :         // If we need to move the pointer, use truncate_wal() instead.
     330      1712564 :         if self.write_lsn > startpos {
     331            0 :             bail!(
     332            0 :                 "write_wal rewrites WAL written before, write_lsn={}, startpos={}",
     333            0 :                 self.write_lsn,
     334            0 :                 startpos
     335            0 :             );
     336      1712564 :         }
     337      1712564 :         if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
     338            0 :             bail!(
     339            0 :                 "write_wal creates gap in written WAL, write_lsn={}, startpos={}",
     340            0 :                 self.write_lsn,
     341            0 :                 startpos
     342            0 :             );
     343      1712564 :         }
     344              : 
     345      3147876 :         let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
     346              :         // WAL is written, updating write metrics
     347      1712562 :         self.metrics.observe_write_seconds(write_seconds);
     348      1712562 :         self.metrics.observe_write_bytes(buf.len());
     349      1712562 : 
     350      1712562 :         // figure out last record's end lsn for reporting (if we got the
     351      1712562 :         // whole record)
     352      1712562 :         if self.decoder.available() != startpos {
     353          483 :             info!(
     354          483 :                 "restart decoder from {} to {}",
     355          483 :                 self.decoder.available(),
     356          483 :                 startpos,
     357          483 :             );
     358          483 :             let pg_version = self.decoder.pg_version;
     359          483 :             self.decoder = WalStreamDecoder::new(startpos, pg_version);
     360      1712079 :         }
     361      1712562 :         self.decoder.feed_bytes(buf);
     362              :         loop {
     363     83977895 :             match self.decoder.poll_decode()? {
     364      1712562 :                 None => break, // no full record yet
     365     82265333 :                 Some((lsn, _rec)) => {
     366     82265333 :                     self.write_record_lsn = lsn;
     367     82265333 :                 }
     368              :             }
     369              :         }
     370              : 
     371      1712562 :         Ok(())
     372      5137692 :     }
     373              : 
     374      1662134 :     async fn flush_wal(&mut self) -> Result<()> {
     375      1662134 :         if self.flush_record_lsn == self.write_record_lsn {
     376              :             // no need to do extra flush
     377       686298 :             return Ok(());
     378       975836 :         }
     379              : 
     380       975836 :         if let Some(unflushed_file) = self.file.take() {
     381       975819 :             self.fdatasync_file(&unflushed_file).await?;
     382       975819 :             self.file = Some(unflushed_file);
     383              :         } else {
     384              :             // We have unflushed data (write_lsn != flush_lsn), but no file.
     385              :             // This should only happen if last file was fully written and flushed,
     386              :             // but haven't updated flush_lsn yet.
     387           17 :             if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
     388            0 :                 bail!(
     389            0 :                     "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
     390            0 :                     self.write_lsn,
     391            0 :                     self.flush_record_lsn
     392            0 :                 );
     393           17 :             }
     394              :         }
     395              : 
     396              :         // everything is flushed now, let's update flush_lsn
     397       975836 :         self.flush_record_lsn = self.write_record_lsn;
     398       975836 :         Ok(())
     399      4986402 :     }
     400              : 
     401              :     /// Truncate written WAL by removing all WAL segments after the given LSN.
     402              :     /// end_pos must point to the end of the WAL record.
     403          868 :     async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
     404              :         // Streaming must not create a hole, so truncate cannot be called on non-written lsn
     405          868 :         if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
     406            0 :             bail!(
     407            0 :                 "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
     408            0 :                 self.write_lsn,
     409            0 :                 end_pos
     410            0 :             );
     411          868 :         }
     412          868 : 
     413          868 :         // Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
     414          868 :         // disk (this happens on each connect).
     415          868 :         if self.is_truncated_after_restart
     416          316 :             && end_pos == self.write_lsn
     417          307 :             && end_pos == self.flush_record_lsn
     418              :         {
     419          307 :             return Ok(());
     420          561 :         }
     421              : 
     422              :         // Close previously opened file, if any
     423          561 :         if let Some(unflushed_file) = self.file.take() {
     424            9 :             self.fdatasync_file(&unflushed_file).await?;
     425          552 :         }
     426              : 
     427          561 :         let xlogoff = end_pos.segment_offset(self.wal_seg_size);
     428          561 :         let segno = end_pos.segment_number(self.wal_seg_size);
     429          561 : 
     430          561 :         // Remove all segments after the given LSN.
     431          561 :         remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
     432              : 
     433       941791 :         let (mut file, is_partial) = self.open_or_create(segno).await?;
     434              : 
     435              :         // Fill end with zeroes
     436          561 :         file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     437       645393 :         write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
     438          561 :         self.fdatasync_file(&file).await?;
     439              : 
     440          561 :         if !is_partial {
     441              :             // Make segment partial once again
     442            0 :             let (wal_file_path, wal_file_partial_path) =
     443            0 :                 wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
     444            0 :             fs::rename(wal_file_path, wal_file_partial_path).await?;
     445          561 :         }
     446              : 
     447              :         // Update LSNs
     448          561 :         self.write_lsn = end_pos;
     449          561 :         self.write_record_lsn = end_pos;
     450          561 :         self.flush_record_lsn = end_pos;
     451          561 :         self.is_truncated_after_restart = true;
     452          561 :         Ok(())
     453         2604 :     }
     454              : 
     455           31 :     fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
     456           31 :         let timeline_dir = self.timeline_dir.clone();
     457           31 :         let wal_seg_size = self.wal_seg_size;
     458           31 :         Box::pin(async move {
     459           65 :             remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
     460           31 :         })
     461           31 :     }
     462              : 
     463           28 :     fn close(&mut self) {
     464           28 :         // close happens in destructor
     465           28 :         let _open_file = self.file.take();
     466           28 :     }
     467              : 
     468           51 :     fn get_metrics(&self) -> WalStorageMetrics {
     469           51 :         self.metrics.clone()
     470           51 :     }
     471              : }
     472              : 
     473              : /// Remove all WAL segments in timeline_dir that match the given predicate.
     474          592 : async fn remove_segments_from_disk(
     475          592 :     timeline_dir: &Utf8Path,
     476          592 :     wal_seg_size: usize,
     477          592 :     remove_predicate: impl Fn(XLogSegNo) -> bool,
     478          592 : ) -> Result<()> {
     479          592 :     let mut n_removed = 0;
     480          592 :     let mut min_removed = u64::MAX;
     481          592 :     let mut max_removed = u64::MIN;
     482              : 
     483          592 :     let mut entries = fs::read_dir(timeline_dir).await?;
     484         1446 :     while let Some(entry) = entries.next_entry().await? {
     485          854 :         let entry_path = entry.path();
     486          854 :         let fname = entry_path.file_name().unwrap();
     487              : 
     488          854 :         if let Some(fname_str) = fname.to_str() {
     489              :             /* Ignore files that are not XLOG segments */
     490          854 :             if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) {
     491          593 :                 continue;
     492          261 :             }
     493          261 :             let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
     494          261 :             if remove_predicate(segno) {
     495           14 :                 remove_file(entry_path).await?;
     496           14 :                 n_removed += 1;
     497           14 :                 min_removed = min(min_removed, segno);
     498           14 :                 max_removed = max(max_removed, segno);
     499           14 :                 REMOVED_WAL_SEGMENTS.inc();
     500          247 :             }
     501            0 :         }
     502              :     }
     503              : 
     504          592 :     if n_removed > 0 {
     505            7 :         info!(
     506            7 :             "removed {} WAL segments [{}; {}]",
     507            7 :             n_removed, min_removed, max_removed
     508            7 :         );
     509          585 :     }
     510          592 :     Ok(())
     511          592 : }
     512              : 
     513              : pub struct WalReader {
     514              :     workdir: Utf8PathBuf,
     515              :     timeline_dir: Utf8PathBuf,
     516              :     wal_seg_size: usize,
     517              :     pos: Lsn,
     518              :     wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
     519              : 
     520              :     // S3 will be used to read WAL if LSN is not available locally
     521              :     enable_remote_read: bool,
     522              : 
     523              :     // We don't have WAL locally if LSN is less than local_start_lsn
     524              :     local_start_lsn: Lsn,
     525              :     // We will respond with zero-ed bytes before this Lsn as long as
     526              :     // pos is in the same segment as timeline_start_lsn.
     527              :     timeline_start_lsn: Lsn,
     528              :     // integer version number of PostgreSQL, e.g. 14; 15; 16
     529              :     pg_version: u32,
     530              :     system_id: SystemId,
     531              :     timeline_start_segment: Option<Bytes>,
     532              : }
     533              : 
     534              : impl WalReader {
     535          868 :     pub fn new(
     536          868 :         workdir: Utf8PathBuf,
     537          868 :         timeline_dir: Utf8PathBuf,
     538          868 :         state: &TimelinePersistentState,
     539          868 :         start_pos: Lsn,
     540          868 :         enable_remote_read: bool,
     541          868 :     ) -> Result<Self> {
     542          868 :         if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
     543            0 :             bail!("state uninitialized, no data to read");
     544          868 :         }
     545          868 : 
     546          868 :         // TODO: Upgrade to bail!() once we know this couldn't possibly happen
     547          868 :         if state.timeline_start_lsn == Lsn(0) {
     548            2 :             warn!("timeline_start_lsn uninitialized before initializing wal reader");
     549          866 :         }
     550              : 
     551          868 :         if start_pos
     552          868 :             < state
     553          868 :                 .timeline_start_lsn
     554          868 :                 .segment_lsn(state.server.wal_seg_size as usize)
     555              :         {
     556            0 :             bail!(
     557            0 :                 "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
     558            0 :                 start_pos,
     559            0 :                 state.timeline_start_lsn
     560            0 :             );
     561          868 :         }
     562          868 : 
     563          868 :         Ok(Self {
     564          868 :             workdir,
     565          868 :             timeline_dir,
     566          868 :             wal_seg_size: state.server.wal_seg_size as usize,
     567          868 :             pos: start_pos,
     568          868 :             wal_segment: None,
     569          868 :             enable_remote_read,
     570          868 :             local_start_lsn: state.local_start_lsn,
     571          868 :             timeline_start_lsn: state.timeline_start_lsn,
     572          868 :             pg_version: state.server.pg_version / 10000,
     573          868 :             system_id: state.server.system_id,
     574          868 :             timeline_start_segment: None,
     575          868 :         })
     576          868 :     }
     577              : 
     578              :     /// Read WAL at current position into provided buf, returns number of bytes
     579              :     /// read. It can be smaller than buf size only if segment boundary is
     580              :     /// reached.
     581       807471 :     pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
     582       807471 :         // If this timeline is new, we may not have a full segment yet, so
     583       807471 :         // we pad the first bytes of the timeline's first WAL segment with 0s
     584       807471 :         if self.pos < self.timeline_start_lsn {
     585              :             debug_assert_eq!(
     586          318 :                 self.pos.segment_number(self.wal_seg_size),
     587          318 :                 self.timeline_start_lsn.segment_number(self.wal_seg_size)
     588              :             );
     589              : 
     590              :             // All bytes after timeline_start_lsn are in WAL, but those before
     591              :             // are not, so we manually construct an empty segment for the bytes
     592              :             // not available in this timeline.
     593          318 :             if self.timeline_start_segment.is_none() {
     594            6 :                 let it = postgres_ffi::generate_wal_segment(
     595            6 :                     self.timeline_start_lsn.segment_number(self.wal_seg_size),
     596            6 :                     self.system_id,
     597            6 :                     self.pg_version,
     598            6 :                     self.timeline_start_lsn,
     599            6 :                 )?;
     600            6 :                 self.timeline_start_segment = Some(it);
     601          312 :             }
     602              : 
     603          318 :             assert!(self.timeline_start_segment.is_some());
     604          318 :             let segment = self.timeline_start_segment.take().unwrap();
     605          318 : 
     606          318 :             let seg_bytes = &segment[..];
     607          318 : 
     608          318 :             // How much of the current segment have we already consumed?
     609          318 :             let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
     610          318 : 
     611          318 :             // How many bytes may we consume in total?
     612          318 :             let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
     613              : 
     614          318 :             debug_assert!(seg_bytes.len() > pos_seg_offset);
     615          318 :             debug_assert!(seg_bytes.len() > tl_start_seg_offset);
     616              : 
     617              :             // Copy as many bytes as possible into the buffer
     618          318 :             let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
     619          318 :             buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
     620          318 : 
     621          318 :             self.pos += len as u64;
     622          318 : 
     623          318 :             // If we're done with the segment, we can release it's memory.
     624          318 :             // However, if we're not yet done, store it so that we don't have to
     625          318 :             // construct the segment the next time this function is called.
     626          318 :             if self.pos < self.timeline_start_lsn {
     627          312 :                 self.timeline_start_segment = Some(segment);
     628          312 :             }
     629              : 
     630          318 :             return Ok(len);
     631       807153 :         }
     632              : 
     633       807153 :         let mut wal_segment = match self.wal_segment.take() {
     634       805520 :             Some(reader) => reader,
     635         3499 :             None => self.open_segment().await?,
     636              :         };
     637              : 
     638              :         // How much to read and send in message? We cannot cross the WAL file
     639              :         // boundary, and we don't want send more than provided buffer.
     640       807153 :         let xlogoff = self.pos.segment_offset(self.wal_seg_size);
     641       807153 :         let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
     642       807153 : 
     643       807153 :         // Read some data from the file.
     644       807153 :         let buf = &mut buf[0..send_size];
     645       807153 :         let send_size = wal_segment.read_exact(buf).await?;
     646       807150 :         self.pos += send_size as u64;
     647       807150 : 
     648       807150 :         // Decide whether to reuse this file. If we don't set wal_segment here
     649       807150 :         // a new reader will be opened next time.
     650       807150 :         if self.pos.segment_offset(self.wal_seg_size) != 0 {
     651       806295 :             self.wal_segment = Some(wal_segment);
     652       806295 :         }
     653              : 
     654       807150 :         Ok(send_size)
     655       807468 :     }
     656              : 
     657              :     /// Open WAL segment at the current position of the reader.
     658         1633 :     async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
     659         1633 :         let xlogoff = self.pos.segment_offset(self.wal_seg_size);
     660         1633 :         let segno = self.pos.segment_number(self.wal_seg_size);
     661         1633 :         let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
     662         1633 :         let wal_file_path = self.timeline_dir.join(wal_file_name);
     663         1633 : 
     664         1633 :         // Try to open local file, if we may have WAL locally
     665         1633 :         if self.pos >= self.local_start_lsn {
     666         1824 :             let res = Self::open_wal_file(&wal_file_path).await;
     667         1631 :             match res {
     668         1595 :                 Ok(mut file) => {
     669         1595 :                     file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     670         1595 :                     return Ok(Box::pin(file));
     671              :                 }
     672           36 :                 Err(e) => {
     673           72 :                     let is_not_found = e.chain().any(|e| {
     674           72 :                         if let Some(e) = e.downcast_ref::<io::Error>() {
     675           36 :                             e.kind() == io::ErrorKind::NotFound
     676              :                         } else {
     677           36 :                             false
     678              :                         }
     679           72 :                     });
     680           36 :                     if !is_not_found {
     681            0 :                         return Err(e);
     682           36 :                     }
     683              :                     // NotFound is expected, fall through to remote read
     684              :                 }
     685              :             };
     686            2 :         }
     687              : 
     688              :         // Try to open remote file, if remote reads are enabled
     689           38 :         if self.enable_remote_read {
     690           38 :             let remote_wal_file_path = wal_file_path
     691           38 :                 .strip_prefix(&self.workdir)
     692           38 :                 .context("Failed to strip workdir prefix")
     693           38 :                 .and_then(RemotePath::new)
     694           38 :                 .with_context(|| {
     695            0 :                     format!(
     696            0 :                         "Failed to resolve remote part of path {:?} for base {:?}",
     697            0 :                         wal_file_path, self.workdir,
     698            0 :                     )
     699           38 :                 })?;
     700          113 :             return read_object(&remote_wal_file_path, xlogoff as u64).await;
     701            0 :         }
     702            0 : 
     703            0 :         bail!("WAL segment is not found")
     704         1633 :     }
     705              : 
     706              :     /// Helper function for opening a wal file.
     707         1631 :     async fn open_wal_file(wal_file_path: &Utf8Path) -> Result<tokio::fs::File> {
     708         1631 :         // First try to open the .partial file.
     709         1631 :         let mut partial_path = wal_file_path.to_owned();
     710         1631 :         partial_path.set_extension("partial");
     711         1631 :         if let Ok(opened_file) = tokio::fs::File::open(&partial_path).await {
     712         1410 :             return Ok(opened_file);
     713          221 :         }
     714          221 : 
     715          221 :         // If that failed, try it without the .partial extension.
     716          221 :         tokio::fs::File::open(&wal_file_path)
     717          218 :             .await
     718          221 :             .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
     719          221 :             .map_err(|e| {
     720           36 :                 warn!("{}", e);
     721           36 :                 e
     722          221 :             })
     723         1631 :     }
     724              : }
     725              : 
     726              : /// Zero block for filling created WAL segments.
     727              : const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
     728              : 
     729              : /// Helper for filling file with zeroes.
     730         1817 : async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
     731         1817 :     fail::fail_point!("sk-write-zeroes", |_| {
     732            2 :         info!("write_zeroes hit failpoint");
     733            2 :         Err(anyhow::anyhow!("failpoint: sk-write-zeroes"))
     734         1817 :     });
     735              : 
     736      3235841 :     while count >= XLOG_BLCKSZ {
     737      3234026 :         file.write_all(ZERO_BLOCK).await?;
     738      3234026 :         count -= XLOG_BLCKSZ;
     739              :     }
     740         1815 :     file.write_all(&ZERO_BLOCK[0..count]).await?;
     741         1815 :     file.flush().await?;
     742         1815 :     Ok(())
     743         1817 : }
     744              : 
     745              : /// Helper returning full path to WAL segment file and its .partial brother.
     746         2728 : pub fn wal_file_paths(
     747         2728 :     timeline_dir: &Utf8Path,
     748         2728 :     segno: XLogSegNo,
     749         2728 :     wal_seg_size: usize,
     750         2728 : ) -> Result<(Utf8PathBuf, Utf8PathBuf)> {
     751         2728 :     let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     752         2728 :     let wal_file_path = timeline_dir.join(wal_file_name.clone());
     753         2728 :     let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
     754         2728 :     Ok((wal_file_path, wal_file_partial_path))
     755         2728 : }
        

Generated by: LCOV version 2.1-beta