LCOV - differential code coverage report
Current view: top level - safekeeper/src - wal_storage.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 85.4 % 438 374 64 374
Current Date: 2023-10-19 02:04:12 Functions: 79.2 % 53 42 11 42
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! This module has everything to deal with WAL -- reading and writing to disk.
       2                 : //!
       3                 : //! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
       4                 : //! PG timeline is always 1, so WAL segments are usually have names like this:
       5                 : //! - 000000010000000000000001
       6                 : //! - 000000010000000000000002.partial
       7                 : //!
       8                 : //! Note that last file has `.partial` suffix, that's different from postgres.
       9                 : 
      10                 : use anyhow::{bail, Context, Result};
      11                 : use bytes::Bytes;
      12                 : use camino::{Utf8Path, Utf8PathBuf};
      13                 : use futures::future::BoxFuture;
      14                 : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
      15                 : use postgres_ffi::{dispatch_pgversion, XLogSegNo, PG_TLI};
      16                 : use remote_storage::RemotePath;
      17                 : use std::cmp::{max, min};
      18                 : use std::io::{self, SeekFrom};
      19                 : use std::pin::Pin;
      20                 : use tokio::fs::{self, remove_file, File, OpenOptions};
      21                 : use tokio::io::{AsyncRead, AsyncWriteExt};
      22                 : use tokio::io::{AsyncReadExt, AsyncSeekExt};
      23                 : use tracing::*;
      24                 : 
      25                 : use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
      26                 : use crate::safekeeper::SafeKeeperState;
      27                 : use crate::wal_backup::read_object;
      28                 : use crate::SafeKeeperConf;
      29                 : use postgres_ffi::waldecoder::WalStreamDecoder;
      30                 : use postgres_ffi::XLogFileName;
      31                 : use postgres_ffi::XLOG_BLCKSZ;
      32                 : use pq_proto::SystemId;
      33                 : use utils::{id::TenantTimelineId, lsn::Lsn};
      34                 : 
      35                 : #[async_trait::async_trait]
      36                 : pub trait Storage {
      37                 :     /// LSN of last durably stored WAL record.
      38                 :     fn flush_lsn(&self) -> Lsn;
      39                 : 
      40                 :     /// Write piece of WAL from buf to disk, but not necessarily sync it.
      41                 :     async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
      42                 : 
      43                 :     /// Truncate WAL at specified LSN, which must be the end of WAL record.
      44                 :     async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
      45                 : 
      46                 :     /// Durably store WAL on disk, up to the last written WAL record.
      47                 :     async fn flush_wal(&mut self) -> Result<()>;
      48                 : 
      49                 :     /// Remove all segments <= given segno. Returns function doing that as we
      50                 :     /// want to perform it without timeline lock.
      51                 :     fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
      52                 : 
      53                 :     /// Release resources associated with the storage -- technically, close FDs.
      54                 :     /// Currently we don't remove timelines until restart (#3146), so need to
      55                 :     /// spare descriptors. This would be useful for temporary tli detach as
      56                 :     /// well.
      57 UBC           0 :     fn close(&mut self) {}
      58                 : 
      59                 :     /// Get metrics for this timeline.
      60                 :     fn get_metrics(&self) -> WalStorageMetrics;
      61                 : }
      62                 : 
      63                 : /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
      64                 : /// for better performance. Storage is initialized in the constructor.
      65                 : ///
      66                 : /// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
      67                 : /// its filename and may be not fully flushed.
      68                 : ///
      69                 : /// Relationship of LSNs:
      70                 : /// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
      71                 : ///
      72                 : /// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
      73                 : pub struct PhysicalStorage {
      74                 :     metrics: WalStorageMetrics,
      75                 :     timeline_dir: Utf8PathBuf,
      76                 :     conf: SafeKeeperConf,
      77                 : 
      78                 :     /// Size of WAL segment in bytes.
      79                 :     wal_seg_size: usize,
      80                 : 
      81                 :     /// Written to disk, but possibly still in the cache and not fully persisted.
      82                 :     /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
      83                 :     write_lsn: Lsn,
      84                 : 
      85                 :     /// The LSN of the last WAL record written to disk. Still can be not fully flushed.
      86                 :     write_record_lsn: Lsn,
      87                 : 
      88                 :     /// The LSN of the last WAL record flushed to disk.
      89                 :     flush_record_lsn: Lsn,
      90                 : 
      91                 :     /// Decoder is required for detecting boundaries of WAL records.
      92                 :     decoder: WalStreamDecoder,
      93                 : 
      94                 :     /// Cached open file for the last segment.
      95                 :     ///
      96                 :     /// If Some(file) is open, then it always:
      97                 :     /// - has ".partial" suffix
      98                 :     /// - points to write_lsn, so no seek is needed for writing
      99                 :     /// - doesn't point to the end of the segment
     100                 :     file: Option<File>,
     101                 : 
     102                 :     /// When false, we have just initialized storage using the LSN from find_end_of_wal().
     103                 :     /// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
     104                 :     /// there can be a case with unexpected .partial file.
     105                 :     ///
     106                 :     /// Imagine the following:
     107                 :     /// - 000000010000000000000001
     108                 :     ///   - it was fully written, but the last record is split between 2 segments
     109                 :     ///   - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment
     110                 :     ///   - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0
     111                 :     /// - 000000010000000000000002.partial
     112                 :     ///   - it has only 1 byte written, which is not enough to make a full WAL record
     113                 :     ///
     114                 :     /// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
     115                 :     /// This flag will be set to true after the first truncate_wal() call.
     116                 :     ///
     117                 :     /// [`write_lsn`]: Self::write_lsn
     118                 :     is_truncated_after_restart: bool,
     119                 : }
     120                 : 
     121                 : impl PhysicalStorage {
     122                 :     /// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from
     123                 :     /// the disk. Otherwise, all LSNs are set to zero.
     124 CBC         566 :     pub fn new(
     125             566 :         ttid: &TenantTimelineId,
     126             566 :         timeline_dir: Utf8PathBuf,
     127             566 :         conf: &SafeKeeperConf,
     128             566 :         state: &SafeKeeperState,
     129             566 :     ) -> Result<PhysicalStorage> {
     130             566 :         let wal_seg_size = state.server.wal_seg_size as usize;
     131                 : 
     132                 :         // Find out where stored WAL ends, starting at commit_lsn which is a
     133                 :         // known recent record boundary (unless we don't have WAL at all).
     134                 :         //
     135                 :         // NB: find_end_of_wal MUST be backwards compatible with the previously
     136                 :         // written WAL. If find_end_of_wal fails to read any WAL written by an
     137                 :         // older version of the code, we could lose data forever.
     138             566 :         let write_lsn = if state.commit_lsn == Lsn(0) {
     139             472 :             Lsn(0)
     140                 :         } else {
     141              94 :             let version = state.server.pg_version / 10000;
     142              94 : 
     143              94 :             dispatch_pgversion!(
     144              94 :                 version,
     145                 :                 pgv::xlog_utils::find_end_of_wal(
     146              94 :                     timeline_dir.as_std_path(),
     147              94 :                     wal_seg_size,
     148              94 :                     state.commit_lsn,
     149 UBC           0 :                 )?,
     150               0 :                 bail!("unsupported postgres version: {}", version)
     151                 :             )
     152                 :         };
     153                 : 
     154                 :         // TODO: do we really know that write_lsn is fully flushed to disk?
     155                 :         //      If not, maybe it's better to call fsync() here to be sure?
     156 CBC         566 :         let flush_lsn = write_lsn;
     157             566 : 
     158             566 :         debug!(
     159 UBC           0 :             "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
     160               0 :             ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
     161               0 :         );
     162 CBC         566 :         if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
     163 UBC           0 :             warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id);
     164 CBC         566 :         }
     165                 : 
     166             566 :         Ok(PhysicalStorage {
     167             566 :             metrics: WalStorageMetrics::default(),
     168             566 :             timeline_dir,
     169             566 :             conf: conf.clone(),
     170             566 :             wal_seg_size,
     171             566 :             write_lsn,
     172             566 :             write_record_lsn: write_lsn,
     173             566 :             flush_record_lsn: flush_lsn,
     174             566 :             decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
     175             566 :             file: None,
     176             566 :             is_truncated_after_restart: false,
     177             566 :         })
     178             566 :     }
     179                 : 
     180                 :     /// Get all known state of the storage.
     181               5 :     pub fn internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
     182               5 :         (
     183               5 :             self.write_lsn,
     184               5 :             self.write_record_lsn,
     185               5 :             self.flush_record_lsn,
     186               5 :             self.file.is_some(),
     187               5 :         )
     188               5 :     }
     189                 : 
     190                 :     /// Call fdatasync if config requires so.
     191         1319339 :     async fn fdatasync_file(&mut self, file: &mut File) -> Result<()> {
     192         1319339 :         if !self.conf.no_sync {
     193 UBC           0 :             self.metrics
     194               0 :                 .observe_flush_seconds(time_io_closure(file.sync_data()).await?);
     195 CBC     1319339 :         }
     196         1319339 :         Ok(())
     197         1319339 :     }
     198                 : 
     199                 :     /// Call fsync if config requires so.
     200            1255 :     async fn fsync_file(&mut self, file: &mut File) -> Result<()> {
     201            1255 :         if !self.conf.no_sync {
     202 UBC           0 :             self.metrics
     203               0 :                 .observe_flush_seconds(time_io_closure(file.sync_all()).await?);
     204 CBC        1255 :         }
     205            1255 :         Ok(())
     206            1255 :     }
     207                 : 
     208                 :     /// Open or create WAL segment file. Caller must call seek to the wanted position.
     209                 :     /// Returns `file` and `is_partial`.
     210            1917 :     async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
     211            1917 :         let (wal_file_path, wal_file_partial_path) =
     212            1917 :             wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
     213                 : 
     214                 :         // Try to open already completed segment
     215            1917 :         if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
     216 UBC           0 :             Ok((file, false))
     217 CBC        1917 :         } else if let Ok(file) = OpenOptions::new()
     218            1917 :             .write(true)
     219            1917 :             .open(&wal_file_partial_path)
     220            1896 :             .await
     221                 :         {
     222                 :             // Try to open existing partial file
     223             662 :             Ok((file, true))
     224                 :         } else {
     225                 :             // Create and fill new partial file
     226            1255 :             let mut file = OpenOptions::new()
     227            1255 :                 .create(true)
     228            1255 :                 .write(true)
     229            1255 :                 .open(&wal_file_partial_path)
     230            1240 :                 .await
     231            1255 :                 .with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
     232                 : 
     233         2504013 :             write_zeroes(&mut file, self.wal_seg_size).await?;
     234            1255 :             self.fsync_file(&mut file).await?;
     235            1255 :             Ok((file, true))
     236                 :         }
     237            1917 :     }
     238                 : 
     239                 :     /// Write WAL bytes, which are known to be located in a single WAL segment.
     240         1598081 :     async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
     241         1598081 :         let mut file = if let Some(file) = self.file.take() {
     242         1596733 :             file
     243                 :         } else {
     244         1550306 :             let (mut file, is_partial) = self.open_or_create(segno).await?;
     245            1348 :             assert!(is_partial, "unexpected write into non-partial segment file");
     246            1348 :             file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     247            1348 :             file
     248                 :         };
     249                 : 
     250         1598081 :         file.write_all(buf).await?;
     251                 :         // Note: flush just ensures write above reaches the OS (this is not
     252                 :         // needed in case of sync IO as Write::write there calls directly write
     253                 :         // syscall, but needed in case of async). It does *not* fsyncs the file.
     254         1598081 :         file.flush().await?;
     255                 : 
     256         1598081 :         if xlogoff + buf.len() == self.wal_seg_size {
     257                 :             // If we reached the end of a WAL segment, flush and close it.
     258             785 :             self.fdatasync_file(&mut file).await?;
     259                 : 
     260                 :             // Rename partial file to completed file
     261             785 :             let (wal_file_path, wal_file_partial_path) =
     262             785 :                 wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
     263             785 :             fs::rename(wal_file_partial_path, wal_file_path).await?;
     264         1597296 :         } else {
     265         1597296 :             // otherwise, file can be reused later
     266         1597296 :             self.file = Some(file);
     267         1597296 :         }
     268                 : 
     269         1598081 :         Ok(())
     270         1598081 :     }
     271                 : 
     272                 :     /// Writes WAL to the segment files, until everything is writed. If some segments
     273                 :     /// are fully written, they are flushed to disk. The last (partial) segment can
     274                 :     /// be flushed separately later.
     275                 :     ///
     276                 :     /// Updates `write_lsn`.
     277         1597352 :     async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
     278         1597352 :         if self.write_lsn != pos {
     279                 :             // need to flush the file before discarding it
     280 UBC           0 :             if let Some(mut file) = self.file.take() {
     281               0 :                 self.fdatasync_file(&mut file).await?;
     282               0 :             }
     283                 : 
     284               0 :             self.write_lsn = pos;
     285 CBC     1597352 :         }
     286                 : 
     287         3195433 :         while !buf.is_empty() {
     288                 :             // Extract WAL location for this block
     289         1598081 :             let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size);
     290         1598081 :             let segno = self.write_lsn.segment_number(self.wal_seg_size);
     291                 : 
     292                 :             // If crossing a WAL boundary, only write up until we reach wal segment size.
     293         1598081 :             let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
     294             729 :                 self.wal_seg_size - xlogoff
     295                 :             } else {
     296         1597352 :                 buf.len()
     297                 :             };
     298                 : 
     299         1598081 :             self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
     300         3131744 :                 .await?;
     301         1598081 :             self.write_lsn += bytes_write as u64;
     302         1598081 :             buf = &buf[bytes_write..];
     303                 :         }
     304                 : 
     305         1597352 :         Ok(())
     306         1597352 :     }
     307                 : }
     308                 : 
     309                 : #[async_trait::async_trait]
     310                 : impl Storage for PhysicalStorage {
     311                 :     /// flush_lsn returns LSN of last durably stored WAL record.
     312        10785530 :     fn flush_lsn(&self) -> Lsn {
     313        10785530 :         self.flush_record_lsn
     314        10785530 :     }
     315                 : 
     316                 :     /// Write WAL to disk.
     317         1597352 :     async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
     318                 :         // Disallow any non-sequential writes, which can result in gaps or overwrites.
     319                 :         // If we need to move the pointer, use truncate_wal() instead.
     320         1597352 :         if self.write_lsn > startpos {
     321 UBC           0 :             bail!(
     322               0 :                 "write_wal rewrites WAL written before, write_lsn={}, startpos={}",
     323               0 :                 self.write_lsn,
     324               0 :                 startpos
     325               0 :             );
     326 CBC     1597352 :         }
     327         1597352 :         if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
     328 UBC           0 :             bail!(
     329               0 :                 "write_wal creates gap in written WAL, write_lsn={}, startpos={}",
     330               0 :                 self.write_lsn,
     331               0 :                 startpos
     332               0 :             );
     333 CBC     1597352 :         }
     334                 : 
     335         3131744 :         let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
     336                 :         // WAL is written, updating write metrics
     337         1597352 :         self.metrics.observe_write_seconds(write_seconds);
     338         1597352 :         self.metrics.observe_write_bytes(buf.len());
     339         1597352 : 
     340         1597352 :         // figure out last record's end lsn for reporting (if we got the
     341         1597352 :         // whole record)
     342         1597352 :         if self.decoder.available() != startpos {
     343             479 :             info!(
     344             479 :                 "restart decoder from {} to {}",
     345             479 :                 self.decoder.available(),
     346             479 :                 startpos,
     347             479 :             );
     348             479 :             let pg_version = self.decoder.pg_version;
     349             479 :             self.decoder = WalStreamDecoder::new(startpos, pg_version);
     350         1596873 :         }
     351         1597352 :         self.decoder.feed_bytes(buf);
     352                 :         loop {
     353        97566162 :             match self.decoder.poll_decode()? {
     354         1597352 :                 None => break, // no full record yet
     355        95968810 :                 Some((lsn, _rec)) => {
     356        95968810 :                     self.write_record_lsn = lsn;
     357        95968810 :                 }
     358                 :             }
     359                 :         }
     360                 : 
     361         1597352 :         Ok(())
     362         3194704 :     }
     363                 : 
     364         2461712 :     async fn flush_wal(&mut self) -> Result<()> {
     365         2461712 :         if self.flush_record_lsn == self.write_record_lsn {
     366                 :             // no need to do extra flush
     367         1143723 :             return Ok(());
     368         1317989 :         }
     369                 : 
     370         1317989 :         if let Some(mut unflushed_file) = self.file.take() {
     371         1317973 :             self.fdatasync_file(&mut unflushed_file).await?;
     372         1317973 :             self.file = Some(unflushed_file);
     373                 :         } else {
     374                 :             // We have unflushed data (write_lsn != flush_lsn), but no file.
     375                 :             // This should only happen if last file was fully written and flushed,
     376                 :             // but haven't updated flush_lsn yet.
     377              16 :             if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
     378 UBC           0 :                 bail!(
     379               0 :                     "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
     380               0 :                     self.write_lsn,
     381               0 :                     self.flush_record_lsn
     382               0 :                 );
     383 CBC          16 :             }
     384                 :         }
     385                 : 
     386                 :         // everything is flushed now, let's update flush_lsn
     387         1317989 :         self.flush_record_lsn = self.write_record_lsn;
     388         1317989 :         Ok(())
     389         4923424 :     }
     390                 : 
     391                 :     /// Truncate written WAL by removing all WAL segments after the given LSN.
     392                 :     /// end_pos must point to the end of the WAL record.
     393            1019 :     async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
     394                 :         // Streaming must not create a hole, so truncate cannot be called on non-written lsn
     395            1019 :         if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
     396 UBC           0 :             bail!(
     397               0 :                 "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
     398               0 :                 self.write_lsn,
     399               0 :                 end_pos
     400               0 :             );
     401 CBC        1019 :         }
     402            1019 : 
     403            1019 :         // Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
     404            1019 :         // disk (this happens on each connect).
     405            1019 :         if self.is_truncated_after_restart
     406             462 :             && end_pos == self.write_lsn
     407             450 :             && end_pos == self.flush_record_lsn
     408                 :         {
     409             450 :             return Ok(());
     410             569 :         }
     411                 : 
     412                 :         // Close previously opened file, if any
     413             569 :         if let Some(mut unflushed_file) = self.file.take() {
     414              12 :             self.fdatasync_file(&mut unflushed_file).await?;
     415             557 :         }
     416                 : 
     417             569 :         let xlogoff = end_pos.segment_offset(self.wal_seg_size);
     418             569 :         let segno = end_pos.segment_number(self.wal_seg_size);
     419             569 : 
     420             569 :         // Remove all segments after the given LSN.
     421            1137 :         remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
     422                 : 
     423          958736 :         let (mut file, is_partial) = self.open_or_create(segno).await?;
     424                 : 
     425                 :         // Fill end with zeroes
     426             569 :         file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     427          661101 :         write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
     428             569 :         self.fdatasync_file(&mut file).await?;
     429                 : 
     430             569 :         if !is_partial {
     431                 :             // Make segment partial once again
     432 UBC           0 :             let (wal_file_path, wal_file_partial_path) =
     433               0 :                 wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
     434               0 :             fs::rename(wal_file_path, wal_file_partial_path).await?;
     435 CBC         569 :         }
     436                 : 
     437                 :         // Update LSNs
     438             569 :         self.write_lsn = end_pos;
     439             569 :         self.write_record_lsn = end_pos;
     440             569 :         self.flush_record_lsn = end_pos;
     441             569 :         self.is_truncated_after_restart = true;
     442             569 :         Ok(())
     443            2038 :     }
     444                 : 
     445              35 :     fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
     446              35 :         let timeline_dir = self.timeline_dir.clone();
     447              35 :         let wal_seg_size = self.wal_seg_size;
     448              35 :         Box::pin(async move {
     449              96 :             remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
     450              35 :         })
     451              35 :     }
     452                 : 
     453              31 :     fn close(&mut self) {
     454              31 :         // close happens in destructor
     455              31 :         let _open_file = self.file.take();
     456              31 :     }
     457                 : 
     458              51 :     fn get_metrics(&self) -> WalStorageMetrics {
     459              51 :         self.metrics.clone()
     460              51 :     }
     461                 : }
     462                 : 
     463                 : /// Remove all WAL segments in timeline_dir that match the given predicate.
     464             604 : async fn remove_segments_from_disk(
     465             604 :     timeline_dir: &Utf8Path,
     466             604 :     wal_seg_size: usize,
     467             604 :     remove_predicate: impl Fn(XLogSegNo) -> bool,
     468             604 : ) -> Result<()> {
     469             604 :     let mut n_removed = 0;
     470             604 :     let mut min_removed = u64::MAX;
     471             604 :     let mut max_removed = u64::MIN;
     472                 : 
     473             604 :     let mut entries = fs::read_dir(timeline_dir).await?;
     474            1488 :     while let Some(entry) = entries.next_entry().await? {
     475             884 :         let entry_path = entry.path();
     476             884 :         let fname = entry_path.file_name().unwrap();
     477                 : 
     478             884 :         if let Some(fname_str) = fname.to_str() {
     479                 :             /* Ignore files that are not XLOG segments */
     480             884 :             if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) {
     481             604 :                 continue;
     482             280 :             }
     483             280 :             let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
     484             280 :             if remove_predicate(segno) {
     485              26 :                 remove_file(entry_path).await?;
     486              26 :                 n_removed += 1;
     487              26 :                 min_removed = min(min_removed, segno);
     488              26 :                 max_removed = max(max_removed, segno);
     489              26 :                 REMOVED_WAL_SEGMENTS.inc();
     490             254 :             }
     491 UBC           0 :         }
     492                 :     }
     493                 : 
     494 CBC         604 :     if n_removed > 0 {
     495              13 :         info!(
     496              13 :             "removed {} WAL segments [{}; {}]",
     497              13 :             n_removed, min_removed, max_removed
     498              13 :         );
     499             591 :     }
     500             604 :     Ok(())
     501             604 : }
     502                 : 
     503                 : pub struct WalReader {
     504                 :     workdir: Utf8PathBuf,
     505                 :     timeline_dir: Utf8PathBuf,
     506                 :     wal_seg_size: usize,
     507                 :     pos: Lsn,
     508                 :     wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
     509                 : 
     510                 :     // S3 will be used to read WAL if LSN is not available locally
     511                 :     enable_remote_read: bool,
     512                 : 
     513                 :     // We don't have WAL locally if LSN is less than local_start_lsn
     514                 :     local_start_lsn: Lsn,
     515                 :     // We will respond with zero-ed bytes before this Lsn as long as
     516                 :     // pos is in the same segment as timeline_start_lsn.
     517                 :     timeline_start_lsn: Lsn,
     518                 :     // integer version number of PostgreSQL, e.g. 14; 15; 16
     519                 :     pg_version: u32,
     520                 :     system_id: SystemId,
     521                 :     timeline_start_segment: Option<Bytes>,
     522                 : }
     523                 : 
     524                 : impl WalReader {
     525             783 :     pub fn new(
     526             783 :         workdir: Utf8PathBuf,
     527             783 :         timeline_dir: Utf8PathBuf,
     528             783 :         state: &SafeKeeperState,
     529             783 :         start_pos: Lsn,
     530             783 :         enable_remote_read: bool,
     531             783 :     ) -> Result<Self> {
     532             783 :         if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
     533 UBC           0 :             bail!("state uninitialized, no data to read");
     534 CBC         783 :         }
     535             783 : 
     536             783 :         // TODO: Upgrade to bail!() once we know this couldn't possibly happen
     537             783 :         if state.timeline_start_lsn == Lsn(0) {
     538               8 :             warn!("timeline_start_lsn uninitialized before initializing wal reader");
     539             775 :         }
     540                 : 
     541             783 :         if start_pos
     542             783 :             < state
     543             783 :                 .timeline_start_lsn
     544             783 :                 .segment_lsn(state.server.wal_seg_size as usize)
     545                 :         {
     546 UBC           0 :             bail!(
     547               0 :                 "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
     548               0 :                 start_pos,
     549               0 :                 state.timeline_start_lsn
     550               0 :             );
     551 CBC         783 :         }
     552             783 : 
     553             783 :         Ok(Self {
     554             783 :             workdir,
     555             783 :             timeline_dir,
     556             783 :             wal_seg_size: state.server.wal_seg_size as usize,
     557             783 :             pos: start_pos,
     558             783 :             wal_segment: None,
     559             783 :             enable_remote_read,
     560             783 :             local_start_lsn: state.local_start_lsn,
     561             783 :             timeline_start_lsn: state.timeline_start_lsn,
     562             783 :             pg_version: state.server.pg_version / 10000,
     563             783 :             system_id: state.server.system_id,
     564             783 :             timeline_start_segment: None,
     565             783 :         })
     566             783 :     }
     567                 : 
     568          783639 :     pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
     569          783639 :         // If this timeline is new, we may not have a full segment yet, so
     570          783639 :         // we pad the first bytes of the timeline's first WAL segment with 0s
     571          783639 :         if self.pos < self.timeline_start_lsn {
     572                 :             debug_assert_eq!(
     573             159 :                 self.pos.segment_number(self.wal_seg_size),
     574             159 :                 self.timeline_start_lsn.segment_number(self.wal_seg_size)
     575                 :             );
     576                 : 
     577                 :             // All bytes after timeline_start_lsn are in WAL, but those before
     578                 :             // are not, so we manually construct an empty segment for the bytes
     579                 :             // not available in this timeline.
     580             159 :             if self.timeline_start_segment.is_none() {
     581               3 :                 let it = postgres_ffi::generate_wal_segment(
     582               3 :                     self.timeline_start_lsn.segment_number(self.wal_seg_size),
     583               3 :                     self.system_id,
     584               3 :                     self.pg_version,
     585               3 :                     self.timeline_start_lsn,
     586               3 :                 )?;
     587               3 :                 self.timeline_start_segment = Some(it);
     588             156 :             }
     589                 : 
     590             159 :             assert!(self.timeline_start_segment.is_some());
     591             159 :             let segment = self.timeline_start_segment.take().unwrap();
     592             159 : 
     593             159 :             let seg_bytes = &segment[..];
     594             159 : 
     595             159 :             // How much of the current segment have we already consumed?
     596             159 :             let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
     597             159 : 
     598             159 :             // How many bytes may we consume in total?
     599             159 :             let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
     600                 : 
     601             159 :             debug_assert!(seg_bytes.len() > pos_seg_offset);
     602             159 :             debug_assert!(seg_bytes.len() > tl_start_seg_offset);
     603                 : 
     604                 :             // Copy as many bytes as possible into the buffer
     605             159 :             let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
     606             159 :             buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
     607             159 : 
     608             159 :             self.pos += len as u64;
     609             159 : 
     610             159 :             // If we're done with the segment, we can release it's memory.
     611             159 :             // However, if we're not yet done, store it so that we don't have to
     612             159 :             // construct the segment the next time this function is called.
     613             159 :             if self.pos < self.timeline_start_lsn {
     614             156 :                 self.timeline_start_segment = Some(segment);
     615             156 :             }
     616                 : 
     617             159 :             return Ok(len);
     618          783480 :         }
     619                 : 
     620          783480 :         let mut wal_segment = match self.wal_segment.take() {
     621          782124 :             Some(reader) => reader,
     622            2913 :             None => self.open_segment().await?,
     623                 :         };
     624                 : 
     625                 :         // How much to read and send in message? We cannot cross the WAL file
     626                 :         // boundary, and we don't want send more than provided buffer.
     627          783479 :         let xlogoff = self.pos.segment_offset(self.wal_seg_size);
     628          783479 :         let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
     629          783479 : 
     630          783479 :         // Read some data from the file.
     631          783479 :         let buf = &mut buf[0..send_size];
     632          783479 :         let send_size = wal_segment.read_exact(buf).await?;
     633          783479 :         self.pos += send_size as u64;
     634          783479 : 
     635          783479 :         // Decide whether to reuse this file. If we don't set wal_segment here
     636          783479 :         // a new reader will be opened next time.
     637          783479 :         if self.pos.segment_offset(self.wal_seg_size) != 0 {
     638          782776 :             self.wal_segment = Some(wal_segment);
     639          782776 :         }
     640                 : 
     641          783479 :         Ok(send_size)
     642          783638 :     }
     643                 : 
     644                 :     /// Open WAL segment at the current position of the reader.
     645            1356 :     async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
     646            1356 :         let xlogoff = self.pos.segment_offset(self.wal_seg_size);
     647            1356 :         let segno = self.pos.segment_number(self.wal_seg_size);
     648            1356 :         let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
     649            1356 :         let wal_file_path = self.timeline_dir.join(wal_file_name);
     650            1356 : 
     651            1356 :         // Try to open local file, if we may have WAL locally
     652            1356 :         if self.pos >= self.local_start_lsn {
     653            1555 :             let res = Self::open_wal_file(&wal_file_path).await;
     654            1349 :             match res {
     655            1349 :                 Ok(mut file) => {
     656            1349 :                     file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     657            1349 :                     return Ok(Box::pin(file));
     658                 :                 }
     659 UBC           0 :                 Err(e) => {
     660               0 :                     let is_not_found = e.chain().any(|e| {
     661               0 :                         if let Some(e) = e.downcast_ref::<io::Error>() {
     662               0 :                             e.kind() == io::ErrorKind::NotFound
     663                 :                         } else {
     664               0 :                             false
     665                 :                         }
     666               0 :                     });
     667               0 :                     if !is_not_found {
     668               0 :                         return Err(e);
     669               0 :                     }
     670                 :                     // NotFound is expected, fall through to remote read
     671                 :                 }
     672                 :             };
     673 CBC           6 :         }
     674                 : 
     675                 :         // Try to open remote file, if remote reads are enabled
     676               6 :         if self.enable_remote_read {
     677               6 :             let remote_wal_file_path = wal_file_path
     678               6 :                 .strip_prefix(&self.workdir)
     679               6 :                 .context("Failed to strip workdir prefix")
     680               6 :                 .and_then(RemotePath::new)
     681               6 :                 .with_context(|| {
     682 UBC           0 :                     format!(
     683               0 :                         "Failed to resolve remote part of path {:?} for base {:?}",
     684               0 :                         wal_file_path, self.workdir,
     685               0 :                     )
     686 CBC           6 :                 })?;
     687              23 :             return read_object(&remote_wal_file_path, xlogoff as u64).await;
     688 UBC           0 :         }
     689               0 : 
     690               0 :         bail!("WAL segment is not found")
     691 CBC        1355 :     }
     692                 : 
     693                 :     /// Helper function for opening a wal file.
     694            1350 :     async fn open_wal_file(wal_file_path: &Utf8Path) -> Result<tokio::fs::File> {
     695            1350 :         // First try to open the .partial file.
     696            1350 :         let mut partial_path = wal_file_path.to_owned();
     697            1350 :         partial_path.set_extension("partial");
     698            1355 :         if let Ok(opened_file) = tokio::fs::File::open(&partial_path).await {
     699            1148 :             return Ok(opened_file);
     700             201 :         }
     701             201 : 
     702             201 :         // If that failed, try it without the .partial extension.
     703             201 :         tokio::fs::File::open(&wal_file_path)
     704             200 :             .await
     705             201 :             .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
     706             201 :             .map_err(|e| {
     707 UBC           0 :                 warn!("{}", e);
     708               0 :                 e
     709 CBC         201 :             })
     710            1349 :     }
     711                 : }
     712                 : 
     713                 : /// Zero block for filling created WAL segments.
     714                 : const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
     715                 : 
     716                 : /// Helper for filling file with zeroes.
     717            1824 : async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
     718         3241223 :     while count >= XLOG_BLCKSZ {
     719         3239399 :         file.write_all(ZERO_BLOCK).await?;
     720         3239399 :         count -= XLOG_BLCKSZ;
     721                 :     }
     722            1824 :     file.write_all(&ZERO_BLOCK[0..count]).await?;
     723            1824 :     file.flush().await?;
     724            1824 :     Ok(())
     725            1824 : }
     726                 : 
     727                 : /// Helper returning full path to WAL segment file and its .partial brother.
     728            2702 : fn wal_file_paths(
     729            2702 :     timeline_dir: &Utf8Path,
     730            2702 :     segno: XLogSegNo,
     731            2702 :     wal_seg_size: usize,
     732            2702 : ) -> Result<(Utf8PathBuf, Utf8PathBuf)> {
     733            2702 :     let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     734            2702 :     let wal_file_path = timeline_dir.join(wal_file_name.clone());
     735            2702 :     let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
     736            2702 :     Ok((wal_file_path, wal_file_partial_path))
     737            2702 : }
        

Generated by: LCOV version 2.1-beta