LCOV - code coverage report
Current view: top level - safekeeper/src - wal_storage.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 85.0 % 439 373
Test Date: 2023-09-06 10:18:01 Functions: 79.2 % 53 42

            Line data    Source code
       1              : //! This module has everything to deal with WAL -- reading and writing to disk.
       2              : //!
       3              : //! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
       4              : //! PG timeline is always 1, so WAL segments are usually have names like this:
       5              : //! - 000000010000000000000001
       6              : //! - 000000010000000000000002.partial
       7              : //!
       8              : //! Note that last file has `.partial` suffix, that's different from postgres.
       9              : 
      10              : use anyhow::{bail, Context, Result};
      11              : use bytes::Bytes;
      12              : use futures::future::BoxFuture;
      13              : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
      14              : use postgres_ffi::{XLogSegNo, PG_TLI};
      15              : use remote_storage::RemotePath;
      16              : use std::cmp::{max, min};
      17              : use std::io::{self, SeekFrom};
      18              : use std::path::{Path, PathBuf};
      19              : use std::pin::Pin;
      20              : use tokio::fs::{self, remove_file, File, OpenOptions};
      21              : use tokio::io::{AsyncRead, AsyncWriteExt};
      22              : use tokio::io::{AsyncReadExt, AsyncSeekExt};
      23              : use tracing::*;
      24              : 
      25              : use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
      26              : use crate::safekeeper::SafeKeeperState;
      27              : use crate::wal_backup::read_object;
      28              : use crate::SafeKeeperConf;
      29              : use postgres_ffi::waldecoder::WalStreamDecoder;
      30              : use postgres_ffi::XLogFileName;
      31              : use postgres_ffi::XLOG_BLCKSZ;
      32              : use pq_proto::SystemId;
      33              : use utils::{id::TenantTimelineId, lsn::Lsn};
      34              : 
      35              : #[async_trait::async_trait]
      36              : pub trait Storage {
      37              :     /// LSN of last durably stored WAL record.
      38              :     fn flush_lsn(&self) -> Lsn;
      39              : 
      40              :     /// Write piece of WAL from buf to disk, but not necessarily sync it.
      41              :     async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
      42              : 
      43              :     /// Truncate WAL at specified LSN, which must be the end of WAL record.
      44              :     async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
      45              : 
      46              :     /// Durably store WAL on disk, up to the last written WAL record.
      47              :     async fn flush_wal(&mut self) -> Result<()>;
      48              : 
      49              :     /// Remove all segments <= given segno. Returns function doing that as we
      50              :     /// want to perform it without timeline lock.
      51              :     fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
      52              : 
      53              :     /// Release resources associated with the storage -- technically, close FDs.
      54              :     /// Currently we don't remove timelines until restart (#3146), so need to
      55              :     /// spare descriptors. This would be useful for temporary tli detach as
      56              :     /// well.
      57            0 :     fn close(&mut self) {}
      58              : 
      59              :     /// Get metrics for this timeline.
      60              :     fn get_metrics(&self) -> WalStorageMetrics;
      61              : }
      62              : 
      63              : /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
      64              : /// for better performance. Storage is initialized in the constructor.
      65              : ///
      66              : /// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
      67              : /// its filename and may be not fully flushed.
      68              : ///
      69              : /// Relationship of LSNs:
      70              : /// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
      71              : ///
      72              : /// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
      73              : pub struct PhysicalStorage {
      74              :     metrics: WalStorageMetrics,
      75              :     timeline_dir: PathBuf,
      76              :     conf: SafeKeeperConf,
      77              : 
      78              :     /// Size of WAL segment in bytes.
      79              :     wal_seg_size: usize,
      80              : 
      81              :     /// Written to disk, but possibly still in the cache and not fully persisted.
      82              :     /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
      83              :     write_lsn: Lsn,
      84              : 
      85              :     /// The LSN of the last WAL record written to disk. Still can be not fully flushed.
      86              :     write_record_lsn: Lsn,
      87              : 
      88              :     /// The LSN of the last WAL record flushed to disk.
      89              :     flush_record_lsn: Lsn,
      90              : 
      91              :     /// Decoder is required for detecting boundaries of WAL records.
      92              :     decoder: WalStreamDecoder,
      93              : 
      94              :     /// Cached open file for the last segment.
      95              :     ///
      96              :     /// If Some(file) is open, then it always:
      97              :     /// - has ".partial" suffix
      98              :     /// - points to write_lsn, so no seek is needed for writing
      99              :     /// - doesn't point to the end of the segment
     100              :     file: Option<File>,
     101              : 
     102              :     /// When false, we have just initialized storage using the LSN from find_end_of_wal().
     103              :     /// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
     104              :     /// there can be a case with unexpected .partial file.
     105              :     ///
     106              :     /// Imagine the following:
     107              :     /// - 000000010000000000000001
     108              :     ///   - it was fully written, but the last record is split between 2 segments
     109              :     ///   - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment
     110              :     ///   - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0
     111              :     /// - 000000010000000000000002.partial
     112              :     ///   - it has only 1 byte written, which is not enough to make a full WAL record
     113              :     ///
     114              :     /// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
     115              :     /// This flag will be set to true after the first truncate_wal() call.
     116              :     ///
     117              :     /// [`write_lsn`]: Self::write_lsn
     118              :     is_truncated_after_restart: bool,
     119              : }
     120              : 
     121              : impl PhysicalStorage {
     122              :     /// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from
     123              :     /// the disk. Otherwise, all LSNs are set to zero.
     124          605 :     pub fn new(
     125          605 :         ttid: &TenantTimelineId,
     126          605 :         timeline_dir: PathBuf,
     127          605 :         conf: &SafeKeeperConf,
     128          605 :         state: &SafeKeeperState,
     129          605 :     ) -> Result<PhysicalStorage> {
     130          605 :         let wal_seg_size = state.server.wal_seg_size as usize;
     131              : 
     132              :         // Find out where stored WAL ends, starting at commit_lsn which is a
     133              :         // known recent record boundary (unless we don't have WAL at all).
     134              :         //
     135              :         // NB: find_end_of_wal MUST be backwards compatible with the previously
     136              :         // written WAL. If find_end_of_wal fails to read any WAL written by an
     137              :         // older version of the code, we could lose data forever.
     138          605 :         let write_lsn = if state.commit_lsn == Lsn(0) {
     139          514 :             Lsn(0)
     140              :         } else {
     141           91 :             match state.server.pg_version / 10000 {
     142           91 :                 14 => postgres_ffi::v14::xlog_utils::find_end_of_wal(
     143           91 :                     &timeline_dir,
     144           91 :                     wal_seg_size,
     145           91 :                     state.commit_lsn,
     146           91 :                 )?,
     147            0 :                 15 => postgres_ffi::v15::xlog_utils::find_end_of_wal(
     148            0 :                     &timeline_dir,
     149            0 :                     wal_seg_size,
     150            0 :                     state.commit_lsn,
     151            0 :                 )?,
     152            0 :                 _ => bail!("unsupported postgres version: {}", state.server.pg_version),
     153              :             }
     154              :         };
     155              : 
     156              :         // TODO: do we really know that write_lsn is fully flushed to disk?
     157              :         //      If not, maybe it's better to call fsync() here to be sure?
     158          605 :         let flush_lsn = write_lsn;
     159          605 : 
     160          605 :         debug!(
     161            0 :             "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
     162            0 :             ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
     163            0 :         );
     164          605 :         if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
     165            0 :             warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id);
     166          605 :         }
     167              : 
     168          605 :         Ok(PhysicalStorage {
     169          605 :             metrics: WalStorageMetrics::default(),
     170          605 :             timeline_dir,
     171          605 :             conf: conf.clone(),
     172          605 :             wal_seg_size,
     173          605 :             write_lsn,
     174          605 :             write_record_lsn: write_lsn,
     175          605 :             flush_record_lsn: flush_lsn,
     176          605 :             decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
     177          605 :             file: None,
     178          605 :             is_truncated_after_restart: false,
     179          605 :         })
     180          605 :     }
     181              : 
     182              :     /// Get all known state of the storage.
     183            5 :     pub fn internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
     184            5 :         (
     185            5 :             self.write_lsn,
     186            5 :             self.write_record_lsn,
     187            5 :             self.flush_record_lsn,
     188            5 :             self.file.is_some(),
     189            5 :         )
     190            5 :     }
     191              : 
     192              :     /// Call fdatasync if config requires so.
     193      1187301 :     async fn fdatasync_file(&mut self, file: &mut File) -> Result<()> {
     194      1187301 :         if !self.conf.no_sync {
     195              :             self.metrics
     196            0 :                 .observe_flush_seconds(time_io_closure(file.sync_data()).await?);
     197      1187301 :         }
     198      1187301 :         Ok(())
     199      1187301 :     }
     200              : 
     201              :     /// Call fsync if config requires so.
     202         1331 :     async fn fsync_file(&mut self, file: &mut File) -> Result<()> {
     203         1331 :         if !self.conf.no_sync {
     204              :             self.metrics
     205            0 :                 .observe_flush_seconds(time_io_closure(file.sync_all()).await?);
     206         1331 :         }
     207         1331 :         Ok(())
     208         1331 :     }
     209              : 
     210              :     /// Open or create WAL segment file. Caller must call seek to the wanted position.
     211              :     /// Returns `file` and `is_partial`.
     212         1981 :     async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
     213         1981 :         let (wal_file_path, wal_file_partial_path) =
     214         1981 :             wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
     215              : 
     216              :         // Try to open already completed segment
     217         1981 :         if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
     218            0 :             Ok((file, false))
     219         1981 :         } else if let Ok(file) = OpenOptions::new()
     220         1981 :             .write(true)
     221         1981 :             .open(&wal_file_partial_path)
     222         1957 :             .await
     223              :         {
     224              :             // Try to open existing partial file
     225          650 :             Ok((file, true))
     226              :         } else {
     227              :             // Create and fill new partial file
     228         1331 :             let mut file = OpenOptions::new()
     229         1331 :                 .create(true)
     230         1331 :                 .write(true)
     231         1331 :                 .open(&wal_file_partial_path)
     232         1317 :                 .await
     233         1331 :                 .with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
     234              : 
     235      2632805 :             write_zeroes(&mut file, self.wal_seg_size).await?;
     236         1331 :             self.fsync_file(&mut file).await?;
     237         1331 :             Ok((file, true))
     238              :         }
     239         1981 :     }
     240              : 
     241              :     /// Write WAL bytes, which are known to be located in a single WAL segment.
     242      1461159 :     async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
     243      1461159 :         let mut file = if let Some(file) = self.file.take() {
     244      1459781 :             file
     245              :         } else {
     246      1598117 :             let (mut file, is_partial) = self.open_or_create(segno).await?;
     247         1378 :             assert!(is_partial, "unexpected write into non-partial segment file");
     248         1378 :             file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     249         1378 :             file
     250              :         };
     251              : 
     252      1461159 :         file.write_all(buf).await?;
     253              :         // Note: flush just ensures write above reaches the OS (this is not
     254              :         // needed in case of sync IO as Write::write there calls directly write
     255              :         // syscall, but needed in case of async). It does *not* fsyncs the file.
     256      1461159 :         file.flush().await?;
     257              : 
     258      1461159 :         if xlogoff + buf.len() == self.wal_seg_size {
     259              :             // If we reached the end of a WAL segment, flush and close it.
     260          818 :             self.fdatasync_file(&mut file).await?;
     261              : 
     262              :             // Rename partial file to completed file
     263          818 :             let (wal_file_path, wal_file_partial_path) =
     264          818 :                 wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
     265          818 :             fs::rename(wal_file_partial_path, wal_file_path).await?;
     266      1460341 :         } else {
     267      1460341 :             // otherwise, file can be reused later
     268      1460341 :             self.file = Some(file);
     269      1460341 :         }
     270              : 
     271      1461159 :         Ok(())
     272      1461159 :     }
     273              : 
     274              :     /// Writes WAL to the segment files, until everything is writed. If some segments
     275              :     /// are fully written, they are flushed to disk. The last (partial) segment can
     276              :     /// be flushed separately later.
     277              :     ///
     278              :     /// Updates `write_lsn`.
     279      1460402 :     async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
     280      1460402 :         if self.write_lsn != pos {
     281              :             // need to flush the file before discarding it
     282            0 :             if let Some(mut file) = self.file.take() {
     283            0 :                 self.fdatasync_file(&mut file).await?;
     284            0 :             }
     285              : 
     286            0 :             self.write_lsn = pos;
     287      1460402 :         }
     288              : 
     289      2921561 :         while !buf.is_empty() {
     290              :             // Extract WAL location for this block
     291      1461159 :             let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size);
     292      1461159 :             let segno = self.write_lsn.segment_number(self.wal_seg_size);
     293              : 
     294              :             // If crossing a WAL boundary, only write up until we reach wal segment size.
     295      1461159 :             let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
     296          757 :                 self.wal_seg_size - xlogoff
     297              :             } else {
     298      1460402 :                 buf.len()
     299              :             };
     300              : 
     301      1461159 :             self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
     302      3041998 :                 .await?;
     303      1461159 :             self.write_lsn += bytes_write as u64;
     304      1461159 :             buf = &buf[bytes_write..];
     305              :         }
     306              : 
     307      1460402 :         Ok(())
     308      1460402 :     }
     309              : }
     310              : 
     311              : #[async_trait::async_trait]
     312              : impl Storage for PhysicalStorage {
     313              :     /// flush_lsn returns LSN of last durably stored WAL record.
     314      9726306 :     fn flush_lsn(&self) -> Lsn {
     315      9726306 :         self.flush_record_lsn
     316      9726306 :     }
     317              : 
     318              :     /// Write WAL to disk.
     319      1460402 :     async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
     320              :         // Disallow any non-sequential writes, which can result in gaps or overwrites.
     321              :         // If we need to move the pointer, use truncate_wal() instead.
     322      1460402 :         if self.write_lsn > startpos {
     323            0 :             bail!(
     324            0 :                 "write_wal rewrites WAL written before, write_lsn={}, startpos={}",
     325            0 :                 self.write_lsn,
     326            0 :                 startpos
     327            0 :             );
     328      1460402 :         }
     329      1460402 :         if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
     330            0 :             bail!(
     331            0 :                 "write_wal creates gap in written WAL, write_lsn={}, startpos={}",
     332            0 :                 self.write_lsn,
     333            0 :                 startpos
     334            0 :             );
     335      1460402 :         }
     336              : 
     337      3041998 :         let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
     338              :         // WAL is written, updating write metrics
     339      1460402 :         self.metrics.observe_write_seconds(write_seconds);
     340      1460402 :         self.metrics.observe_write_bytes(buf.len());
     341      1460402 : 
     342      1460402 :         // figure out last record's end lsn for reporting (if we got the
     343      1460402 :         // whole record)
     344      1460402 :         if self.decoder.available() != startpos {
     345          492 :             info!(
     346          492 :                 "restart decoder from {} to {}",
     347          492 :                 self.decoder.available(),
     348          492 :                 startpos,
     349          492 :             );
     350          492 :             let pg_version = self.decoder.pg_version;
     351          492 :             self.decoder = WalStreamDecoder::new(startpos, pg_version);
     352      1459910 :         }
     353      1460402 :         self.decoder.feed_bytes(buf);
     354              :         loop {
     355    103016069 :             match self.decoder.poll_decode()? {
     356      1460402 :                 None => break, // no full record yet
     357    101555667 :                 Some((lsn, _rec)) => {
     358    101555667 :                     self.write_record_lsn = lsn;
     359    101555667 :                 }
     360              :             }
     361              :         }
     362              : 
     363      1460402 :         Ok(())
     364      2920804 :     }
     365              : 
     366      2202314 :     async fn flush_wal(&mut self) -> Result<()> {
     367      2202314 :         if self.flush_record_lsn == self.write_record_lsn {
     368              :             // no need to do extra flush
     369      1016424 :             return Ok(());
     370      1185890 :         }
     371              : 
     372      1185890 :         if let Some(mut unflushed_file) = self.file.take() {
     373      1185871 :             self.fdatasync_file(&mut unflushed_file).await?;
     374      1185871 :             self.file = Some(unflushed_file);
     375              :         } else {
     376              :             // We have unflushed data (write_lsn != flush_lsn), but no file.
     377              :             // This should only happen if last file was fully written and flushed,
     378              :             // but haven't updated flush_lsn yet.
     379           19 :             if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
     380            0 :                 bail!(
     381            0 :                     "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
     382            0 :                     self.write_lsn,
     383            0 :                     self.flush_record_lsn
     384            0 :                 );
     385           19 :             }
     386              :         }
     387              : 
     388              :         // everything is flushed now, let's update flush_lsn
     389      1185890 :         self.flush_record_lsn = self.write_record_lsn;
     390      1185890 :         Ok(())
     391      4404628 :     }
     392              : 
     393              :     /// Truncate written WAL by removing all WAL segments after the given LSN.
     394              :     /// end_pos must point to the end of the WAL record.
     395          971 :     async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
     396              :         // Streaming must not create a hole, so truncate cannot be called on non-written lsn
     397          971 :         if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
     398            0 :             bail!(
     399            0 :                 "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
     400            0 :                 self.write_lsn,
     401            0 :                 end_pos
     402            0 :             );
     403          971 :         }
     404          971 : 
     405          971 :         // Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
     406          971 :         // disk (this happens on each connect).
     407          971 :         if self.is_truncated_after_restart
     408          377 :             && end_pos == self.write_lsn
     409          368 :             && end_pos == self.flush_record_lsn
     410              :         {
     411          368 :             return Ok(());
     412          603 :         }
     413              : 
     414              :         // Close previously opened file, if any
     415          603 :         if let Some(mut unflushed_file) = self.file.take() {
     416            9 :             self.fdatasync_file(&mut unflushed_file).await?;
     417          594 :         }
     418              : 
     419          603 :         let xlogoff = end_pos.segment_offset(self.wal_seg_size);
     420          603 :         let segno = end_pos.segment_number(self.wal_seg_size);
     421          603 : 
     422          603 :         // Remove all segments after the given LSN.
     423         1204 :         remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
     424              : 
     425      1039915 :         let (mut file, is_partial) = self.open_or_create(segno).await?;
     426              : 
     427              :         // Fill end with zeroes
     428          603 :         file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     429       692101 :         write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
     430          603 :         self.fdatasync_file(&mut file).await?;
     431              : 
     432          603 :         if !is_partial {
     433              :             // Make segment partial once again
     434            0 :             let (wal_file_path, wal_file_partial_path) =
     435            0 :                 wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
     436            0 :             fs::rename(wal_file_path, wal_file_partial_path).await?;
     437          603 :         }
     438              : 
     439              :         // Update LSNs
     440          603 :         self.write_lsn = end_pos;
     441          603 :         self.write_record_lsn = end_pos;
     442          603 :         self.flush_record_lsn = end_pos;
     443          603 :         self.is_truncated_after_restart = true;
     444          603 :         Ok(())
     445         1942 :     }
     446              : 
     447           14 :     fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
     448           14 :         let timeline_dir = self.timeline_dir.clone();
     449           14 :         let wal_seg_size = self.wal_seg_size;
     450           14 :         Box::pin(async move {
     451           54 :             remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
     452           14 :         })
     453           14 :     }
     454              : 
     455           31 :     fn close(&mut self) {
     456           31 :         // close happens in destructor
     457           31 :         let _open_file = self.file.take();
     458           31 :     }
     459              : 
     460           51 :     fn get_metrics(&self) -> WalStorageMetrics {
     461           51 :         self.metrics.clone()
     462           51 :     }
     463              : }
     464              : 
     465              : /// Remove all WAL segments in timeline_dir that match the given predicate.
     466          617 : async fn remove_segments_from_disk(
     467          617 :     timeline_dir: &Path,
     468          617 :     wal_seg_size: usize,
     469          617 :     remove_predicate: impl Fn(XLogSegNo) -> bool,
     470          617 : ) -> Result<()> {
     471          617 :     let mut n_removed = 0;
     472          617 :     let mut min_removed = u64::MAX;
     473          617 :     let mut max_removed = u64::MIN;
     474              : 
     475          617 :     let mut entries = fs::read_dir(timeline_dir).await?;
     476         1466 :     while let Some(entry) = entries.next_entry().await? {
     477          849 :         let entry_path = entry.path();
     478          849 :         let fname = entry_path.file_name().unwrap();
     479              : 
     480          849 :         if let Some(fname_str) = fname.to_str() {
     481              :             /* Ignore files that are not XLOG segments */
     482          849 :             if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) {
     483          617 :                 continue;
     484          232 :             }
     485          232 :             let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
     486          232 :             if remove_predicate(segno) {
     487           26 :                 remove_file(entry_path).await?;
     488           26 :                 n_removed += 1;
     489           26 :                 min_removed = min(min_removed, segno);
     490           26 :                 max_removed = max(max_removed, segno);
     491           26 :                 REMOVED_WAL_SEGMENTS.inc();
     492          206 :             }
     493            0 :         }
     494              :     }
     495              : 
     496          617 :     if n_removed > 0 {
     497           13 :         info!(
     498           13 :             "removed {} WAL segments [{}; {}]",
     499           13 :             n_removed, min_removed, max_removed
     500           13 :         );
     501          604 :     }
     502          617 :     Ok(())
     503          617 : }
     504              : 
     505              : pub struct WalReader {
     506              :     workdir: PathBuf,
     507              :     timeline_dir: PathBuf,
     508              :     wal_seg_size: usize,
     509              :     pos: Lsn,
     510              :     wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
     511              : 
     512              :     // S3 will be used to read WAL if LSN is not available locally
     513              :     enable_remote_read: bool,
     514              : 
     515              :     // We don't have WAL locally if LSN is less than local_start_lsn
     516              :     local_start_lsn: Lsn,
     517              :     // We will respond with zero-ed bytes before this Lsn as long as
     518              :     // pos is in the same segment as timeline_start_lsn.
     519              :     timeline_start_lsn: Lsn,
     520              :     // integer version number of PostgreSQL, e.g. 14; 15; 16
     521              :     pg_version: u32,
     522              :     system_id: SystemId,
     523              :     timeline_start_segment: Option<Bytes>,
     524              : }
     525              : 
     526              : impl WalReader {
     527          830 :     pub fn new(
     528          830 :         workdir: PathBuf,
     529          830 :         timeline_dir: PathBuf,
     530          830 :         state: &SafeKeeperState,
     531          830 :         start_pos: Lsn,
     532          830 :         enable_remote_read: bool,
     533          830 :     ) -> Result<Self> {
     534          830 :         if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
     535            0 :             bail!("state uninitialized, no data to read");
     536          830 :         }
     537          830 : 
     538          830 :         // TODO: Upgrade to bail!() once we know this couldn't possibly happen
     539          830 :         if state.timeline_start_lsn == Lsn(0) {
     540            8 :             warn!("timeline_start_lsn uninitialized before initializing wal reader");
     541          822 :         }
     542              : 
     543          830 :         if start_pos
     544          830 :             < state
     545          830 :                 .timeline_start_lsn
     546          830 :                 .segment_lsn(state.server.wal_seg_size as usize)
     547              :         {
     548            0 :             bail!(
     549            0 :                 "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
     550            0 :                 start_pos,
     551            0 :                 state.timeline_start_lsn
     552            0 :             );
     553          830 :         }
     554          830 : 
     555          830 :         Ok(Self {
     556          830 :             workdir,
     557          830 :             timeline_dir,
     558          830 :             wal_seg_size: state.server.wal_seg_size as usize,
     559          830 :             pos: start_pos,
     560          830 :             wal_segment: None,
     561          830 :             enable_remote_read,
     562          830 :             local_start_lsn: state.local_start_lsn,
     563          830 :             timeline_start_lsn: state.timeline_start_lsn,
     564          830 :             pg_version: state.server.pg_version / 10000,
     565          830 :             system_id: state.server.system_id,
     566          830 :             timeline_start_segment: None,
     567          830 :         })
     568          830 :     }
     569              : 
     570       740438 :     pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
     571       740438 :         // If this timeline is new, we may not have a full segment yet, so
     572       740438 :         // we pad the first bytes of the timeline's first WAL segment with 0s
     573       740438 :         if self.pos < self.timeline_start_lsn {
     574              :             debug_assert_eq!(
     575           53 :                 self.pos.segment_number(self.wal_seg_size),
     576           53 :                 self.timeline_start_lsn.segment_number(self.wal_seg_size)
     577              :             );
     578              : 
     579              :             // All bytes after timeline_start_lsn are in WAL, but those before
     580              :             // are not, so we manually construct an empty segment for the bytes
     581              :             // not available in this timeline.
     582           53 :             if self.timeline_start_segment.is_none() {
     583            1 :                 let it = postgres_ffi::generate_wal_segment(
     584            1 :                     self.timeline_start_lsn.segment_number(self.wal_seg_size),
     585            1 :                     self.system_id,
     586            1 :                     self.pg_version,
     587            1 :                     self.timeline_start_lsn,
     588            1 :                 )?;
     589            1 :                 self.timeline_start_segment = Some(it);
     590           52 :             }
     591              : 
     592           53 :             assert!(self.timeline_start_segment.is_some());
     593           53 :             let segment = self.timeline_start_segment.take().unwrap();
     594           53 : 
     595           53 :             let seg_bytes = &segment[..];
     596           53 : 
     597           53 :             // How much of the current segment have we already consumed?
     598           53 :             let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
     599           53 : 
     600           53 :             // How many bytes may we consume in total?
     601           53 :             let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
     602              : 
     603           53 :             debug_assert!(seg_bytes.len() > pos_seg_offset);
     604           53 :             debug_assert!(seg_bytes.len() > tl_start_seg_offset);
     605              : 
     606              :             // Copy as many bytes as possible into the buffer
     607           53 :             let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
     608           53 :             buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
     609           53 : 
     610           53 :             self.pos += len as u64;
     611           53 : 
     612           53 :             // If we're done with the segment, we can release it's memory.
     613           53 :             // However, if we're not yet done, store it so that we don't have to
     614           53 :             // construct the segment the next time this function is called.
     615           53 :             if self.pos < self.timeline_start_lsn {
     616           52 :                 self.timeline_start_segment = Some(segment);
     617           52 :             }
     618              : 
     619           53 :             return Ok(len);
     620       740385 :         }
     621              : 
     622       740385 :         let mut wal_segment = match self.wal_segment.take() {
     623       739038 :             Some(reader) => reader,
     624         2893 :             None => self.open_segment().await?,
     625              :         };
     626              : 
     627              :         // How much to read and send in message? We cannot cross the WAL file
     628              :         // boundary, and we don't want send more than provided buffer.
     629       740385 :         let xlogoff = self.pos.segment_offset(self.wal_seg_size);
     630       740385 :         let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
     631       740385 : 
     632       740385 :         // Read some data from the file.
     633       740385 :         let buf = &mut buf[0..send_size];
     634       740385 :         let send_size = wal_segment.read_exact(buf).await?;
     635       740384 :         self.pos += send_size as u64;
     636       740384 : 
     637       740384 :         // Decide whether to reuse this file. If we don't set wal_segment here
     638       740384 :         // a new reader will be opened next time.
     639       740384 :         if self.pos.segment_offset(self.wal_seg_size) != 0 {
     640       739652 :             self.wal_segment = Some(wal_segment);
     641       739652 :         }
     642              : 
     643       740384 :         Ok(send_size)
     644       740437 :     }
     645              : 
     646              :     /// Open WAL segment at the current position of the reader.
     647         1347 :     async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
     648         1347 :         let xlogoff = self.pos.segment_offset(self.wal_seg_size);
     649         1347 :         let segno = self.pos.segment_number(self.wal_seg_size);
     650         1347 :         let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
     651         1347 :         let wal_file_path = self.timeline_dir.join(wal_file_name);
     652         1347 : 
     653         1347 :         // Try to open local file, if we may have WAL locally
     654         1347 :         if self.pos >= self.local_start_lsn {
     655         1566 :             let res = Self::open_wal_file(&wal_file_path).await;
     656         1341 :             match res {
     657         1341 :                 Ok(mut file) => {
     658         1341 :                     file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     659         1341 :                     return Ok(Box::pin(file));
     660              :                 }
     661            0 :                 Err(e) => {
     662            0 :                     let is_not_found = e.chain().any(|e| {
     663            0 :                         if let Some(e) = e.downcast_ref::<io::Error>() {
     664            0 :                             e.kind() == io::ErrorKind::NotFound
     665              :                         } else {
     666            0 :                             false
     667              :                         }
     668            0 :                     });
     669            0 :                     if !is_not_found {
     670            0 :                         return Err(e);
     671            0 :                     }
     672              :                     // NotFound is expected, fall through to remote read
     673              :                 }
     674              :             };
     675            6 :         }
     676              : 
     677              :         // Try to open remote file, if remote reads are enabled
     678            6 :         if self.enable_remote_read {
     679            6 :             let remote_wal_file_path = wal_file_path
     680            6 :                 .strip_prefix(&self.workdir)
     681            6 :                 .context("Failed to strip workdir prefix")
     682            6 :                 .and_then(RemotePath::new)
     683            6 :                 .with_context(|| {
     684            0 :                     format!(
     685            0 :                         "Failed to resolve remote part of path {:?} for base {:?}",
     686            0 :                         wal_file_path, self.workdir,
     687            0 :                     )
     688            6 :                 })?;
     689           21 :             return read_object(&remote_wal_file_path, xlogoff as u64).await;
     690            0 :         }
     691            0 : 
     692            0 :         bail!("WAL segment is not found")
     693         1347 :     }
     694              : 
     695              :     /// Helper function for opening a wal file.
     696         1341 :     async fn open_wal_file(wal_file_path: &Path) -> Result<tokio::fs::File> {
     697         1341 :         // First try to open the .partial file.
     698         1341 :         let mut partial_path = wal_file_path.to_owned();
     699         1341 :         partial_path.set_extension("partial");
     700         1341 :         if let Ok(opened_file) = tokio::fs::File::open(&partial_path).await {
     701         1101 :             return Ok(opened_file);
     702          240 :         }
     703          240 : 
     704          240 :         // If that failed, try it without the .partial extension.
     705          240 :         tokio::fs::File::open(&wal_file_path)
     706          241 :             .await
     707          240 :             .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
     708          240 :             .map_err(|e| {
     709            0 :                 warn!("{}", e);
     710            0 :                 e
     711          240 :             })
     712         1341 :     }
     713              : }
     714              : 
     715              : /// Zero block for filling created WAL segments.
     716              : const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
     717              : 
     718              : /// Helper for filling file with zeroes.
     719         1934 : async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
     720      3433682 :     while count >= XLOG_BLCKSZ {
     721      3431748 :         file.write_all(ZERO_BLOCK).await?;
     722      3431748 :         count -= XLOG_BLCKSZ;
     723              :     }
     724         1934 :     file.write_all(&ZERO_BLOCK[0..count]).await?;
     725         1934 :     file.flush().await?;
     726         1934 :     Ok(())
     727         1934 : }
     728              : 
     729              : /// Helper returning full path to WAL segment file and its .partial brother.
     730         2799 : fn wal_file_paths(
     731         2799 :     timeline_dir: &Path,
     732         2799 :     segno: XLogSegNo,
     733         2799 :     wal_seg_size: usize,
     734         2799 : ) -> Result<(PathBuf, PathBuf)> {
     735         2799 :     let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     736         2799 :     let wal_file_path = timeline_dir.join(wal_file_name.clone());
     737         2799 :     let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
     738         2799 :     Ok((wal_file_path, wal_file_partial_path))
     739         2799 : }
        

Generated by: LCOV version 2.1-beta