LCOV - code coverage report
Current view: top level - safekeeper/src - wal_storage.rs (source / functions) Coverage Total Hit
Test: 2453312769e0b6b061a2008879e6693300d0b938.info Lines: 0.0 % 462 0
Test Date: 2024-09-06 16:40:18 Functions: 0.0 % 47 0

            Line data    Source code
       1              : //! This module has everything to deal with WAL -- reading and writing to disk.
       2              : //!
       3              : //! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
       4              : //! PG timeline is always 1, so WAL segments are usually have names like this:
       5              : //! - 000000010000000000000001
       6              : //! - 000000010000000000000002.partial
       7              : //!
       8              : //! Note that last file has `.partial` suffix, that's different from postgres.
       9              : 
      10              : use anyhow::{bail, Context, Result};
      11              : use bytes::Bytes;
      12              : use camino::{Utf8Path, Utf8PathBuf};
      13              : use futures::future::BoxFuture;
      14              : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
      15              : use postgres_ffi::{dispatch_pgversion, XLogSegNo, PG_TLI};
      16              : use remote_storage::RemotePath;
      17              : use std::cmp::{max, min};
      18              : use std::future::Future;
      19              : use std::io::{self, SeekFrom};
      20              : use std::pin::Pin;
      21              : use tokio::fs::{self, remove_file, File, OpenOptions};
      22              : use tokio::io::{AsyncRead, AsyncWriteExt};
      23              : use tokio::io::{AsyncReadExt, AsyncSeekExt};
      24              : use tracing::*;
      25              : use utils::crashsafe::durable_rename;
      26              : 
      27              : use crate::metrics::{
      28              :     time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS,
      29              : };
      30              : use crate::state::TimelinePersistentState;
      31              : use crate::wal_backup::{read_object, remote_timeline_path};
      32              : use crate::SafeKeeperConf;
      33              : use postgres_ffi::waldecoder::WalStreamDecoder;
      34              : use postgres_ffi::XLogFileName;
      35              : use postgres_ffi::XLOG_BLCKSZ;
      36              : use pq_proto::SystemId;
      37              : use utils::{id::TenantTimelineId, lsn::Lsn};
      38              : 
      39              : pub trait Storage {
      40              :     // Last written LSN.
      41              :     fn write_lsn(&self) -> Lsn;
      42              :     /// LSN of last durably stored WAL record.
      43              :     fn flush_lsn(&self) -> Lsn;
      44              : 
      45              :     /// Initialize segment by creating proper long header at the beginning of
      46              :     /// the segment and short header at the page of given LSN. This is only used
      47              :     /// for timeline initialization because compute will stream data only since
      48              :     /// init_lsn. Other segment headers are included in compute stream.
      49              :     fn initialize_first_segment(
      50              :         &mut self,
      51              :         init_lsn: Lsn,
      52              :     ) -> impl Future<Output = Result<()>> + Send;
      53              : 
      54              :     /// Write piece of WAL from buf to disk, but not necessarily sync it.
      55              :     fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> impl Future<Output = Result<()>> + Send;
      56              : 
      57              :     /// Truncate WAL at specified LSN, which must be the end of WAL record.
      58              :     fn truncate_wal(&mut self, end_pos: Lsn) -> impl Future<Output = Result<()>> + Send;
      59              : 
      60              :     /// Durably store WAL on disk, up to the last written WAL record.
      61              :     fn flush_wal(&mut self) -> impl Future<Output = Result<()>> + Send;
      62              : 
      63              :     /// Remove all segments <= given segno. Returns function doing that as we
      64              :     /// want to perform it without timeline lock.
      65              :     fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
      66              : 
      67              :     /// Release resources associated with the storage -- technically, close FDs.
      68              :     /// Currently we don't remove timelines until restart (#3146), so need to
      69              :     /// spare descriptors. This would be useful for temporary tli detach as
      70              :     /// well.
      71            0 :     fn close(&mut self) {}
      72              : 
      73              :     /// Get metrics for this timeline.
      74              :     fn get_metrics(&self) -> WalStorageMetrics;
      75              : }
      76              : 
      77              : /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
      78              : /// for better performance. Storage is initialized in the constructor.
      79              : ///
      80              : /// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
      81              : /// its filename and may be not fully flushed.
      82              : ///
      83              : /// Relationship of LSNs:
      84              : /// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
      85              : ///
      86              : /// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
      87              : pub struct PhysicalStorage {
      88              :     metrics: WalStorageMetrics,
      89              :     timeline_dir: Utf8PathBuf,
      90              :     conf: SafeKeeperConf,
      91              : 
      92              :     /// Size of WAL segment in bytes.
      93              :     wal_seg_size: usize,
      94              :     pg_version: u32,
      95              :     system_id: u64,
      96              : 
      97              :     /// Written to disk, but possibly still in the cache and not fully persisted.
      98              :     /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
      99              :     write_lsn: Lsn,
     100              : 
     101              :     /// The LSN of the last WAL record written to disk. Still can be not fully flushed.
     102              :     write_record_lsn: Lsn,
     103              : 
     104              :     /// The LSN of the last WAL record flushed to disk.
     105              :     flush_record_lsn: Lsn,
     106              : 
     107              :     /// Decoder is required for detecting boundaries of WAL records.
     108              :     decoder: WalStreamDecoder,
     109              : 
     110              :     /// Cached open file for the last segment.
     111              :     ///
     112              :     /// If Some(file) is open, then it always:
     113              :     /// - has ".partial" suffix
     114              :     /// - points to write_lsn, so no seek is needed for writing
     115              :     /// - doesn't point to the end of the segment
     116              :     file: Option<File>,
     117              : 
     118              :     /// When false, we have just initialized storage using the LSN from find_end_of_wal().
     119              :     /// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
     120              :     /// there can be a case with unexpected .partial file.
     121              :     ///
     122              :     /// Imagine the following:
     123              :     /// - 000000010000000000000001
     124              :     ///   - it was fully written, but the last record is split between 2 segments
     125              :     ///   - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment
     126              :     ///   - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0
     127              :     /// - 000000010000000000000002.partial
     128              :     ///   - it has only 1 byte written, which is not enough to make a full WAL record
     129              :     ///
     130              :     /// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
     131              :     /// This flag will be set to true after the first truncate_wal() call.
     132              :     ///
     133              :     /// [`write_lsn`]: Self::write_lsn
     134              :     is_truncated_after_restart: bool,
     135              : }
     136              : 
     137              : impl PhysicalStorage {
     138              :     /// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from
     139              :     /// the disk. Otherwise, all LSNs are set to zero.
     140            0 :     pub fn new(
     141            0 :         ttid: &TenantTimelineId,
     142            0 :         timeline_dir: Utf8PathBuf,
     143            0 :         conf: &SafeKeeperConf,
     144            0 :         state: &TimelinePersistentState,
     145            0 :     ) -> Result<PhysicalStorage> {
     146            0 :         let wal_seg_size = state.server.wal_seg_size as usize;
     147              : 
     148              :         // Find out where stored WAL ends, starting at commit_lsn which is a
     149              :         // known recent record boundary (unless we don't have WAL at all).
     150              :         //
     151              :         // NB: find_end_of_wal MUST be backwards compatible with the previously
     152              :         // written WAL. If find_end_of_wal fails to read any WAL written by an
     153              :         // older version of the code, we could lose data forever.
     154            0 :         let write_lsn = if state.commit_lsn == Lsn(0) {
     155            0 :             Lsn(0)
     156              :         } else {
     157            0 :             let version = state.server.pg_version / 10000;
     158            0 : 
     159            0 :             dispatch_pgversion!(
     160            0 :                 version,
     161            0 :                 pgv::xlog_utils::find_end_of_wal(
     162            0 :                     timeline_dir.as_std_path(),
     163            0 :                     wal_seg_size,
     164            0 :                     state.commit_lsn,
     165            0 :                 )?,
     166            0 :                 bail!("unsupported postgres version: {}", version)
     167              :             )
     168              :         };
     169              : 
     170              :         // TODO: do we really know that write_lsn is fully flushed to disk?
     171              :         //      If not, maybe it's better to call fsync() here to be sure?
     172            0 :         let flush_lsn = write_lsn;
     173            0 : 
     174            0 :         debug!(
     175            0 :             "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
     176              :             ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
     177              :         );
     178            0 :         if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
     179            0 :             warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id);
     180            0 :         }
     181              : 
     182            0 :         Ok(PhysicalStorage {
     183            0 :             metrics: WalStorageMetrics::default(),
     184            0 :             timeline_dir,
     185            0 :             conf: conf.clone(),
     186            0 :             wal_seg_size,
     187            0 :             pg_version: state.server.pg_version,
     188            0 :             system_id: state.server.system_id,
     189            0 :             write_lsn,
     190            0 :             write_record_lsn: write_lsn,
     191            0 :             flush_record_lsn: flush_lsn,
     192            0 :             decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
     193            0 :             file: None,
     194            0 :             is_truncated_after_restart: false,
     195            0 :         })
     196            0 :     }
     197              : 
     198              :     /// Get all known state of the storage.
     199            0 :     pub fn internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
     200            0 :         (
     201            0 :             self.write_lsn,
     202            0 :             self.write_record_lsn,
     203            0 :             self.flush_record_lsn,
     204            0 :             self.file.is_some(),
     205            0 :         )
     206            0 :     }
     207              : 
     208              :     /// Call fdatasync if config requires so.
     209            0 :     async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
     210            0 :         if !self.conf.no_sync {
     211            0 :             self.metrics
     212            0 :                 .observe_flush_seconds(time_io_closure(file.sync_data()).await?);
     213            0 :         }
     214            0 :         Ok(())
     215            0 :     }
     216              : 
     217              :     /// Open or create WAL segment file. Caller must call seek to the wanted position.
     218              :     /// Returns `file` and `is_partial`.
     219            0 :     async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
     220            0 :         let (wal_file_path, wal_file_partial_path) =
     221            0 :             wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
     222              : 
     223              :         // Try to open already completed segment
     224            0 :         if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
     225            0 :             Ok((file, false))
     226            0 :         } else if let Ok(file) = OpenOptions::new()
     227            0 :             .write(true)
     228            0 :             .open(&wal_file_partial_path)
     229            0 :             .await
     230              :         {
     231              :             // Try to open existing partial file
     232            0 :             Ok((file, true))
     233              :         } else {
     234              :             // Create and fill new partial file
     235              :             //
     236              :             // We're using fdatasync during WAL writing, so file size must not
     237              :             // change; to this end it is filled with zeros here. To avoid using
     238              :             // half initialized segment, first bake it under tmp filename and
     239              :             // then rename.
     240            0 :             let tmp_path = self.timeline_dir.join("waltmp");
     241            0 :             let mut file = File::create(&tmp_path)
     242            0 :                 .await
     243            0 :                 .with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?;
     244              : 
     245            0 :             write_zeroes(&mut file, self.wal_seg_size).await?;
     246              : 
     247              :             // Note: this doesn't get into observe_flush_seconds metric. But
     248              :             // segment init should be separate metric, if any.
     249            0 :             if let Err(e) =
     250            0 :                 durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
     251              :             {
     252              :                 // Probably rename succeeded, but fsync of it failed. Remove
     253              :                 // the file then to avoid using it.
     254            0 :                 remove_file(wal_file_partial_path)
     255            0 :                     .await
     256            0 :                     .or_else(utils::fs_ext::ignore_not_found)?;
     257            0 :                 return Err(e.into());
     258            0 :             }
     259            0 :             Ok((file, true))
     260              :         }
     261            0 :     }
     262              : 
     263              :     /// Write WAL bytes, which are known to be located in a single WAL segment.
     264            0 :     async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
     265            0 :         let mut file = if let Some(file) = self.file.take() {
     266            0 :             file
     267              :         } else {
     268            0 :             let (mut file, is_partial) = self.open_or_create(segno).await?;
     269            0 :             assert!(is_partial, "unexpected write into non-partial segment file");
     270            0 :             file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     271            0 :             file
     272              :         };
     273              : 
     274            0 :         file.write_all(buf).await?;
     275              :         // Note: flush just ensures write above reaches the OS (this is not
     276              :         // needed in case of sync IO as Write::write there calls directly write
     277              :         // syscall, but needed in case of async). It does *not* fsyncs the file.
     278            0 :         file.flush().await?;
     279              : 
     280            0 :         if xlogoff + buf.len() == self.wal_seg_size {
     281              :             // If we reached the end of a WAL segment, flush and close it.
     282            0 :             self.fdatasync_file(&file).await?;
     283              : 
     284              :             // Rename partial file to completed file
     285            0 :             let (wal_file_path, wal_file_partial_path) =
     286            0 :                 wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
     287            0 :             fs::rename(wal_file_partial_path, wal_file_path).await?;
     288            0 :         } else {
     289            0 :             // otherwise, file can be reused later
     290            0 :             self.file = Some(file);
     291            0 :         }
     292              : 
     293            0 :         Ok(())
     294            0 :     }
     295              : 
     296              :     /// Writes WAL to the segment files, until everything is writed. If some segments
     297              :     /// are fully written, they are flushed to disk. The last (partial) segment can
     298              :     /// be flushed separately later.
     299              :     ///
     300              :     /// Updates `write_lsn`.
     301            0 :     async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
     302            0 :         if self.write_lsn != pos {
     303              :             // need to flush the file before discarding it
     304            0 :             if let Some(file) = self.file.take() {
     305            0 :                 self.fdatasync_file(&file).await?;
     306            0 :             }
     307              : 
     308            0 :             self.write_lsn = pos;
     309            0 :         }
     310              : 
     311            0 :         while !buf.is_empty() {
     312              :             // Extract WAL location for this block
     313            0 :             let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size);
     314            0 :             let segno = self.write_lsn.segment_number(self.wal_seg_size);
     315              : 
     316              :             // If crossing a WAL boundary, only write up until we reach wal segment size.
     317            0 :             let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
     318            0 :                 self.wal_seg_size - xlogoff
     319              :             } else {
     320            0 :                 buf.len()
     321              :             };
     322              : 
     323            0 :             self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
     324            0 :                 .await?;
     325            0 :             self.write_lsn += bytes_write as u64;
     326            0 :             buf = &buf[bytes_write..];
     327              :         }
     328              : 
     329            0 :         Ok(())
     330            0 :     }
     331              : }
     332              : 
     333              : impl Storage for PhysicalStorage {
     334              :     // Last written LSN.
     335            0 :     fn write_lsn(&self) -> Lsn {
     336            0 :         self.write_lsn
     337            0 :     }
     338              :     /// flush_lsn returns LSN of last durably stored WAL record.
     339            0 :     fn flush_lsn(&self) -> Lsn {
     340            0 :         self.flush_record_lsn
     341            0 :     }
     342              : 
     343            0 :     async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()> {
     344            0 :         let _timer = WAL_STORAGE_OPERATION_SECONDS
     345            0 :             .with_label_values(&["initialize_first_segment"])
     346            0 :             .start_timer();
     347            0 : 
     348            0 :         let segno = init_lsn.segment_number(self.wal_seg_size);
     349            0 :         let (mut file, _) = self.open_or_create(segno).await?;
     350            0 :         let major_pg_version = self.pg_version / 10000;
     351            0 :         let wal_seg =
     352            0 :             postgres_ffi::generate_wal_segment(segno, self.system_id, major_pg_version, init_lsn)?;
     353            0 :         file.seek(SeekFrom::Start(0)).await?;
     354            0 :         file.write_all(&wal_seg).await?;
     355            0 :         file.flush().await?;
     356            0 :         info!("initialized segno {} at lsn {}", segno, init_lsn);
     357              :         // note: file is *not* fsynced
     358            0 :         Ok(())
     359            0 :     }
     360              : 
     361              :     /// Write WAL to disk.
     362            0 :     async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
     363            0 :         // Disallow any non-sequential writes, which can result in gaps or overwrites.
     364            0 :         // If we need to move the pointer, use truncate_wal() instead.
     365            0 :         if self.write_lsn > startpos {
     366            0 :             bail!(
     367            0 :                 "write_wal rewrites WAL written before, write_lsn={}, startpos={}",
     368            0 :                 self.write_lsn,
     369            0 :                 startpos
     370            0 :             );
     371            0 :         }
     372            0 :         if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
     373            0 :             bail!(
     374            0 :                 "write_wal creates gap in written WAL, write_lsn={}, startpos={}",
     375            0 :                 self.write_lsn,
     376            0 :                 startpos
     377            0 :             );
     378            0 :         }
     379              : 
     380            0 :         let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
     381              :         // WAL is written, updating write metrics
     382            0 :         self.metrics.observe_write_seconds(write_seconds);
     383            0 :         self.metrics.observe_write_bytes(buf.len());
     384            0 : 
     385            0 :         // figure out last record's end lsn for reporting (if we got the
     386            0 :         // whole record)
     387            0 :         if self.decoder.available() != startpos {
     388            0 :             info!(
     389            0 :                 "restart decoder from {} to {}",
     390            0 :                 self.decoder.available(),
     391              :                 startpos,
     392              :             );
     393            0 :             let pg_version = self.decoder.pg_version;
     394            0 :             self.decoder = WalStreamDecoder::new(startpos, pg_version);
     395            0 :         }
     396            0 :         self.decoder.feed_bytes(buf);
     397              :         loop {
     398            0 :             match self.decoder.poll_decode()? {
     399            0 :                 None => break, // no full record yet
     400            0 :                 Some((lsn, _rec)) => {
     401            0 :                     self.write_record_lsn = lsn;
     402            0 :                 }
     403              :             }
     404              :         }
     405              : 
     406            0 :         Ok(())
     407            0 :     }
     408              : 
     409            0 :     async fn flush_wal(&mut self) -> Result<()> {
     410            0 :         if self.flush_record_lsn == self.write_record_lsn {
     411              :             // no need to do extra flush
     412            0 :             return Ok(());
     413            0 :         }
     414              : 
     415            0 :         if let Some(unflushed_file) = self.file.take() {
     416            0 :             self.fdatasync_file(&unflushed_file).await?;
     417            0 :             self.file = Some(unflushed_file);
     418              :         } else {
     419              :             // We have unflushed data (write_lsn != flush_lsn), but no file.
     420              :             // This should only happen if last file was fully written and flushed,
     421              :             // but haven't updated flush_lsn yet.
     422            0 :             if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
     423            0 :                 bail!(
     424            0 :                     "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
     425            0 :                     self.write_lsn,
     426            0 :                     self.flush_record_lsn
     427            0 :                 );
     428            0 :             }
     429              :         }
     430              : 
     431              :         // everything is flushed now, let's update flush_lsn
     432            0 :         self.flush_record_lsn = self.write_record_lsn;
     433            0 :         Ok(())
     434            0 :     }
     435              : 
     436              :     /// Truncate written WAL by removing all WAL segments after the given LSN.
     437              :     /// end_pos must point to the end of the WAL record.
     438            0 :     async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
     439            0 :         let _timer = WAL_STORAGE_OPERATION_SECONDS
     440            0 :             .with_label_values(&["truncate_wal"])
     441            0 :             .start_timer();
     442            0 : 
     443            0 :         // Streaming must not create a hole, so truncate cannot be called on non-written lsn
     444            0 :         if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
     445            0 :             bail!(
     446            0 :                 "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
     447            0 :                 self.write_lsn,
     448            0 :                 end_pos
     449            0 :             );
     450            0 :         }
     451            0 : 
     452            0 :         // Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
     453            0 :         // disk (this happens on each connect).
     454            0 :         if self.is_truncated_after_restart
     455            0 :             && end_pos == self.write_lsn
     456            0 :             && end_pos == self.flush_record_lsn
     457              :         {
     458            0 :             return Ok(());
     459            0 :         }
     460              : 
     461              :         // Close previously opened file, if any
     462            0 :         if let Some(unflushed_file) = self.file.take() {
     463            0 :             self.fdatasync_file(&unflushed_file).await?;
     464            0 :         }
     465              : 
     466            0 :         let xlogoff = end_pos.segment_offset(self.wal_seg_size);
     467            0 :         let segno = end_pos.segment_number(self.wal_seg_size);
     468            0 : 
     469            0 :         // Remove all segments after the given LSN.
     470            0 :         remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
     471              : 
     472            0 :         let (mut file, is_partial) = self.open_or_create(segno).await?;
     473              : 
     474              :         // Fill end with zeroes
     475            0 :         file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     476            0 :         write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
     477            0 :         self.fdatasync_file(&file).await?;
     478              : 
     479            0 :         if !is_partial {
     480              :             // Make segment partial once again
     481            0 :             let (wal_file_path, wal_file_partial_path) =
     482            0 :                 wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
     483            0 :             fs::rename(wal_file_path, wal_file_partial_path).await?;
     484            0 :         }
     485              : 
     486              :         // Update LSNs
     487            0 :         self.write_lsn = end_pos;
     488            0 :         self.write_record_lsn = end_pos;
     489            0 :         self.flush_record_lsn = end_pos;
     490            0 :         self.is_truncated_after_restart = true;
     491            0 :         Ok(())
     492            0 :     }
     493              : 
     494            0 :     fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
     495            0 :         let timeline_dir = self.timeline_dir.clone();
     496            0 :         let wal_seg_size = self.wal_seg_size;
     497            0 :         Box::pin(async move {
     498            0 :             remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
     499            0 :         })
     500            0 :     }
     501              : 
     502            0 :     fn close(&mut self) {
     503            0 :         // close happens in destructor
     504            0 :         let _open_file = self.file.take();
     505            0 :     }
     506              : 
     507            0 :     fn get_metrics(&self) -> WalStorageMetrics {
     508            0 :         self.metrics.clone()
     509            0 :     }
     510              : }
     511              : 
     512              : /// Remove all WAL segments in timeline_dir that match the given predicate.
     513            0 : async fn remove_segments_from_disk(
     514            0 :     timeline_dir: &Utf8Path,
     515            0 :     wal_seg_size: usize,
     516            0 :     remove_predicate: impl Fn(XLogSegNo) -> bool,
     517            0 : ) -> Result<()> {
     518            0 :     let _timer = WAL_STORAGE_OPERATION_SECONDS
     519            0 :         .with_label_values(&["remove_segments_from_disk"])
     520            0 :         .start_timer();
     521            0 : 
     522            0 :     let mut n_removed = 0;
     523            0 :     let mut min_removed = u64::MAX;
     524            0 :     let mut max_removed = u64::MIN;
     525              : 
     526            0 :     let mut entries = fs::read_dir(timeline_dir).await?;
     527            0 :     while let Some(entry) = entries.next_entry().await? {
     528            0 :         let entry_path = entry.path();
     529            0 :         let fname = entry_path.file_name().unwrap();
     530              : 
     531            0 :         if let Some(fname_str) = fname.to_str() {
     532              :             /* Ignore files that are not XLOG segments */
     533            0 :             if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) {
     534            0 :                 continue;
     535            0 :             }
     536            0 :             let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
     537            0 :             if remove_predicate(segno) {
     538            0 :                 remove_file(entry_path).await?;
     539            0 :                 n_removed += 1;
     540            0 :                 min_removed = min(min_removed, segno);
     541            0 :                 max_removed = max(max_removed, segno);
     542            0 :                 REMOVED_WAL_SEGMENTS.inc();
     543            0 :             }
     544            0 :         }
     545              :     }
     546              : 
     547            0 :     if n_removed > 0 {
     548            0 :         info!(
     549            0 :             "removed {} WAL segments [{}; {}]",
     550              :             n_removed, min_removed, max_removed
     551              :         );
     552            0 :     }
     553            0 :     Ok(())
     554            0 : }
     555              : 
     556              : pub struct WalReader {
     557              :     remote_path: RemotePath,
     558              :     timeline_dir: Utf8PathBuf,
     559              :     wal_seg_size: usize,
     560              :     pos: Lsn,
     561              :     wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
     562              : 
     563              :     // S3 will be used to read WAL if LSN is not available locally
     564              :     enable_remote_read: bool,
     565              : 
     566              :     // We don't have WAL locally if LSN is less than local_start_lsn
     567              :     local_start_lsn: Lsn,
     568              :     // We will respond with zero-ed bytes before this Lsn as long as
     569              :     // pos is in the same segment as timeline_start_lsn.
     570              :     timeline_start_lsn: Lsn,
     571              :     // integer version number of PostgreSQL, e.g. 14; 15; 16
     572              :     pg_version: u32,
     573              :     system_id: SystemId,
     574              :     timeline_start_segment: Option<Bytes>,
     575              : }
     576              : 
     577              : impl WalReader {
     578            0 :     pub fn new(
     579            0 :         ttid: &TenantTimelineId,
     580            0 :         timeline_dir: Utf8PathBuf,
     581            0 :         state: &TimelinePersistentState,
     582            0 :         start_pos: Lsn,
     583            0 :         enable_remote_read: bool,
     584            0 :     ) -> Result<Self> {
     585            0 :         if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
     586            0 :             bail!("state uninitialized, no data to read");
     587            0 :         }
     588            0 : 
     589            0 :         // TODO: Upgrade to bail!() once we know this couldn't possibly happen
     590            0 :         if state.timeline_start_lsn == Lsn(0) {
     591            0 :             warn!("timeline_start_lsn uninitialized before initializing wal reader");
     592            0 :         }
     593              : 
     594            0 :         if start_pos
     595            0 :             < state
     596            0 :                 .timeline_start_lsn
     597            0 :                 .segment_lsn(state.server.wal_seg_size as usize)
     598              :         {
     599            0 :             bail!(
     600            0 :                 "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
     601            0 :                 start_pos,
     602            0 :                 state.timeline_start_lsn
     603            0 :             );
     604            0 :         }
     605            0 : 
     606            0 :         Ok(Self {
     607            0 :             remote_path: remote_timeline_path(ttid)?,
     608            0 :             timeline_dir,
     609            0 :             wal_seg_size: state.server.wal_seg_size as usize,
     610            0 :             pos: start_pos,
     611            0 :             wal_segment: None,
     612            0 :             enable_remote_read,
     613            0 :             local_start_lsn: state.local_start_lsn,
     614            0 :             timeline_start_lsn: state.timeline_start_lsn,
     615            0 :             pg_version: state.server.pg_version / 10000,
     616            0 :             system_id: state.server.system_id,
     617            0 :             timeline_start_segment: None,
     618              :         })
     619            0 :     }
     620              : 
     621              :     /// Read WAL at current position into provided buf, returns number of bytes
     622              :     /// read. It can be smaller than buf size only if segment boundary is
     623              :     /// reached.
     624            0 :     pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
     625            0 :         // If this timeline is new, we may not have a full segment yet, so
     626            0 :         // we pad the first bytes of the timeline's first WAL segment with 0s
     627            0 :         if self.pos < self.timeline_start_lsn {
     628            0 :             debug_assert_eq!(
     629            0 :                 self.pos.segment_number(self.wal_seg_size),
     630            0 :                 self.timeline_start_lsn.segment_number(self.wal_seg_size)
     631              :             );
     632              : 
     633              :             // All bytes after timeline_start_lsn are in WAL, but those before
     634              :             // are not, so we manually construct an empty segment for the bytes
     635              :             // not available in this timeline.
     636            0 :             if self.timeline_start_segment.is_none() {
     637            0 :                 let it = postgres_ffi::generate_wal_segment(
     638            0 :                     self.timeline_start_lsn.segment_number(self.wal_seg_size),
     639            0 :                     self.system_id,
     640            0 :                     self.pg_version,
     641            0 :                     self.timeline_start_lsn,
     642            0 :                 )?;
     643            0 :                 self.timeline_start_segment = Some(it);
     644            0 :             }
     645              : 
     646            0 :             assert!(self.timeline_start_segment.is_some());
     647            0 :             let segment = self.timeline_start_segment.take().unwrap();
     648            0 : 
     649            0 :             let seg_bytes = &segment[..];
     650            0 : 
     651            0 :             // How much of the current segment have we already consumed?
     652            0 :             let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
     653            0 : 
     654            0 :             // How many bytes may we consume in total?
     655            0 :             let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
     656            0 : 
     657            0 :             debug_assert!(seg_bytes.len() > pos_seg_offset);
     658            0 :             debug_assert!(seg_bytes.len() > tl_start_seg_offset);
     659              : 
     660              :             // Copy as many bytes as possible into the buffer
     661            0 :             let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
     662            0 :             buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
     663            0 : 
     664            0 :             self.pos += len as u64;
     665            0 : 
     666            0 :             // If we're done with the segment, we can release it's memory.
     667            0 :             // However, if we're not yet done, store it so that we don't have to
     668            0 :             // construct the segment the next time this function is called.
     669            0 :             if self.pos < self.timeline_start_lsn {
     670            0 :                 self.timeline_start_segment = Some(segment);
     671            0 :             }
     672              : 
     673            0 :             return Ok(len);
     674            0 :         }
     675              : 
     676            0 :         let mut wal_segment = match self.wal_segment.take() {
     677            0 :             Some(reader) => reader,
     678            0 :             None => self.open_segment().await?,
     679              :         };
     680              : 
     681              :         // How much to read and send in message? We cannot cross the WAL file
     682              :         // boundary, and we don't want send more than provided buffer.
     683            0 :         let xlogoff = self.pos.segment_offset(self.wal_seg_size);
     684            0 :         let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
     685            0 : 
     686            0 :         // Read some data from the file.
     687            0 :         let buf = &mut buf[0..send_size];
     688            0 :         let send_size = wal_segment.read_exact(buf).await?;
     689            0 :         self.pos += send_size as u64;
     690            0 : 
     691            0 :         // Decide whether to reuse this file. If we don't set wal_segment here
     692            0 :         // a new reader will be opened next time.
     693            0 :         if self.pos.segment_offset(self.wal_seg_size) != 0 {
     694            0 :             self.wal_segment = Some(wal_segment);
     695            0 :         }
     696              : 
     697            0 :         Ok(send_size)
     698            0 :     }
     699              : 
     700              :     /// Open WAL segment at the current position of the reader.
     701            0 :     async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
     702            0 :         let xlogoff = self.pos.segment_offset(self.wal_seg_size);
     703            0 :         let segno = self.pos.segment_number(self.wal_seg_size);
     704            0 :         let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
     705            0 : 
     706            0 :         // Try to open local file, if we may have WAL locally
     707            0 :         if self.pos >= self.local_start_lsn {
     708            0 :             let res = open_wal_file(&self.timeline_dir, segno, self.wal_seg_size).await;
     709            0 :             match res {
     710            0 :                 Ok((mut file, _)) => {
     711            0 :                     file.seek(SeekFrom::Start(xlogoff as u64)).await?;
     712            0 :                     return Ok(Box::pin(file));
     713              :                 }
     714            0 :                 Err(e) => {
     715            0 :                     let is_not_found = e.chain().any(|e| {
     716            0 :                         if let Some(e) = e.downcast_ref::<io::Error>() {
     717            0 :                             e.kind() == io::ErrorKind::NotFound
     718              :                         } else {
     719            0 :                             false
     720              :                         }
     721            0 :                     });
     722            0 :                     if !is_not_found {
     723            0 :                         return Err(e);
     724            0 :                     }
     725              :                     // NotFound is expected, fall through to remote read
     726              :                 }
     727              :             };
     728            0 :         }
     729              : 
     730              :         // Try to open remote file, if remote reads are enabled
     731            0 :         if self.enable_remote_read {
     732            0 :             let remote_wal_file_path = self.remote_path.join(&wal_file_name);
     733            0 :             return read_object(&remote_wal_file_path, xlogoff as u64).await;
     734            0 :         }
     735            0 : 
     736            0 :         bail!("WAL segment is not found")
     737            0 :     }
     738              : }
     739              : 
     740              : /// Zero block for filling created WAL segments.
     741              : const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
     742              : 
     743              : /// Helper for filling file with zeroes.
     744            0 : async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
     745            0 :     fail::fail_point!("sk-write-zeroes", |_| {
     746            0 :         info!("write_zeroes hit failpoint");
     747            0 :         Err(anyhow::anyhow!("failpoint: sk-write-zeroes"))
     748            0 :     });
     749              : 
     750            0 :     while count >= XLOG_BLCKSZ {
     751            0 :         file.write_all(ZERO_BLOCK).await?;
     752            0 :         count -= XLOG_BLCKSZ;
     753              :     }
     754            0 :     file.write_all(&ZERO_BLOCK[0..count]).await?;
     755            0 :     file.flush().await?;
     756            0 :     Ok(())
     757            0 : }
     758              : 
     759              : /// Helper function for opening WAL segment `segno` in `dir`. Returns file and
     760              : /// whether it is .partial.
     761            0 : pub(crate) async fn open_wal_file(
     762            0 :     timeline_dir: &Utf8Path,
     763            0 :     segno: XLogSegNo,
     764            0 :     wal_seg_size: usize,
     765            0 : ) -> Result<(tokio::fs::File, bool)> {
     766            0 :     let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size);
     767            0 : 
     768            0 :     // First try to open the .partial file.
     769            0 :     let mut partial_path = wal_file_path.to_owned();
     770            0 :     partial_path.set_extension("partial");
     771            0 :     if let Ok(opened_file) = tokio::fs::File::open(&wal_file_partial_path).await {
     772            0 :         return Ok((opened_file, true));
     773            0 :     }
     774              : 
     775              :     // If that failed, try it without the .partial extension.
     776            0 :     let pf = tokio::fs::File::open(&wal_file_path)
     777            0 :         .await
     778            0 :         .with_context(|| format!("failed to open WAL file {:#}", wal_file_path))
     779            0 :         .map_err(|e| {
     780            0 :             warn!("{}", e);
     781            0 :             e
     782            0 :         })?;
     783              : 
     784            0 :     Ok((pf, false))
     785            0 : }
     786              : 
     787              : /// Helper returning full path to WAL segment file and its .partial brother.
     788            0 : pub fn wal_file_paths(
     789            0 :     timeline_dir: &Utf8Path,
     790            0 :     segno: XLogSegNo,
     791            0 :     wal_seg_size: usize,
     792            0 : ) -> (Utf8PathBuf, Utf8PathBuf) {
     793            0 :     let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     794            0 :     let wal_file_path = timeline_dir.join(wal_file_name.clone());
     795            0 :     let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
     796            0 :     (wal_file_path, wal_file_partial_path)
     797            0 : }
        

Generated by: LCOV version 2.1-beta