LCOV - code coverage report
Current view: top level - safekeeper/src - timeline_eviction.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 208 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 29 0

            Line data    Source code
       1              : //! Code related to evicting WAL files to remote storage.
       2              : //!
       3              : //! The actual upload is done by the partial WAL backup code. This file has
       4              : //! code to delete and re-download WAL files, cross-validate with partial WAL
       5              : //! backup if local file is still present.
       6              : 
       7              : use anyhow::Context;
       8              : use camino::Utf8PathBuf;
       9              : use remote_storage::{GenericRemoteStorage, RemotePath};
      10              : use tokio::fs::File;
      11              : use tokio::io::{AsyncRead, AsyncWriteExt};
      12              : use tracing::{debug, info, instrument, warn};
      13              : use utils::crashsafe::durable_rename;
      14              : 
      15              : use crate::metrics::{
      16              :     EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED, EvictionEvent, NUM_EVICTED_TIMELINES,
      17              : };
      18              : use crate::rate_limit::rand_duration;
      19              : use crate::timeline_manager::{Manager, StateSnapshot};
      20              : use crate::wal_backup;
      21              : use crate::wal_backup_partial::{self, PartialRemoteSegment};
      22              : use crate::wal_storage::wal_file_paths;
      23              : 
      24              : impl Manager {
      25              :     /// Returns true if the timeline is ready for eviction.
      26              :     /// Current criteria:
      27              :     /// - no active tasks
      28              :     /// - control file is flushed (no next event scheduled)
      29              :     /// - no WAL residence guards
      30              :     /// - no pushes to the broker
      31              :     /// - last partial WAL segment is uploaded
      32              :     /// - all local segments before the uploaded partial are committed and uploaded
      33            0 :     pub(crate) fn ready_for_eviction(
      34            0 :         &self,
      35            0 :         next_event: &Option<tokio::time::Instant>,
      36            0 :         state: &StateSnapshot,
      37            0 :     ) -> bool {
      38            0 :         self.backup_task.is_none()
      39            0 :             && self.recovery_task.is_none()
      40            0 :             && self.wal_removal_task.is_none()
      41            0 :             && self.partial_backup_task.is_none()
      42            0 :             && next_event.is_none()
      43            0 :             && self.access_service.is_empty()
      44            0 :             && !self.tli_broker_active.get()
      45              :             // Partial segment of current flush_lsn is uploaded up to this flush_lsn.
      46            0 :             && !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded)
      47              :             // And it is the next one after the last removed. Given that local
      48              :             // WAL is removed only after it is uploaded to s3 (and pageserver
      49              :             // advancing remote_consistent_lsn) which happens only after WAL is
      50              :             // committed, true means all this is done.
      51              :             //
      52              :             // This also works for the first segment despite last_removed_segno
      53              :             // being 0 on init because this 0 triggers run of wal_removal_task
      54              :             // on success of which manager updates the horizon.
      55              :             //
      56              :             // **Note** pull_timeline functionality assumes that evicted timelines always have
      57              :             // a partial segment: if we ever change this condition, must also update that code.
      58            0 :             && self
      59            0 :                 .partial_backup_uploaded
      60            0 :                 .as_ref()
      61            0 :                 .unwrap()
      62            0 :                 .flush_lsn
      63            0 :                 .segment_number(self.wal_seg_size)
      64            0 :                 == self.last_removed_segno + 1
      65            0 :     }
      66              : 
      67              :     /// Evict the timeline to remote storage. Returns whether the eviction was successful.
      68              :     #[instrument(name = "evict_timeline", skip_all)]
      69              :     pub(crate) async fn evict_timeline(&mut self) -> bool {
      70              :         assert!(!self.is_offloaded);
      71              :         let Some(storage) = self.wal_backup.get_storage() else {
      72              :             warn!("no remote storage configured, skipping uneviction");
      73              :             return false;
      74              :         };
      75              :         let partial_backup_uploaded = match &self.partial_backup_uploaded {
      76              :             Some(p) => p.clone(),
      77              :             None => {
      78              :                 warn!("no partial backup uploaded, skipping eviction");
      79              :                 return false;
      80              :             }
      81              :         };
      82              : 
      83              :         info!("starting eviction, using {:?}", partial_backup_uploaded);
      84              : 
      85              :         EVICTION_EVENTS_STARTED
      86              :             .with_label_values(&[EvictionEvent::Evict.into()])
      87              :             .inc();
      88            0 :         let _guard = scopeguard::guard((), |_| {
      89            0 :             EVICTION_EVENTS_COMPLETED
      90            0 :                 .with_label_values(&[EvictionEvent::Evict.into()])
      91            0 :                 .inc();
      92            0 :         });
      93              : 
      94              :         if let Err(e) = do_eviction(self, &partial_backup_uploaded, &storage).await {
      95              :             warn!("failed to evict timeline: {:?}", e);
      96              :             return false;
      97              :         }
      98              : 
      99              :         info!("successfully evicted timeline");
     100              :         NUM_EVICTED_TIMELINES.inc();
     101              :         true
     102              :     }
     103              : 
     104              :     /// Attempt to restore evicted timeline from remote storage; it must be
     105              :     /// offloaded.
     106              :     #[instrument(name = "unevict_timeline", skip_all)]
     107              :     pub(crate) async fn unevict_timeline(&mut self) {
     108              :         assert!(self.is_offloaded);
     109              :         let Some(storage) = self.wal_backup.get_storage() else {
     110              :             warn!("no remote storage configured, skipping uneviction");
     111              :             return;
     112              :         };
     113              :         let partial_backup_uploaded = match &self.partial_backup_uploaded {
     114              :             Some(p) => p.clone(),
     115              :             None => {
     116              :                 warn!("no partial backup uploaded, cannot unevict");
     117              :                 return;
     118              :             }
     119              :         };
     120              : 
     121              :         info!("starting uneviction, using {:?}", partial_backup_uploaded);
     122              : 
     123              :         EVICTION_EVENTS_STARTED
     124              :             .with_label_values(&[EvictionEvent::Restore.into()])
     125              :             .inc();
     126            0 :         let _guard = scopeguard::guard((), |_| {
     127            0 :             EVICTION_EVENTS_COMPLETED
     128            0 :                 .with_label_values(&[EvictionEvent::Restore.into()])
     129            0 :                 .inc();
     130            0 :         });
     131              : 
     132              :         if let Err(e) = do_uneviction(self, &partial_backup_uploaded, &storage).await {
     133              :             warn!("failed to unevict timeline: {:?}", e);
     134              :             return;
     135              :         }
     136              : 
     137              :         self.evict_not_before =
     138              :             tokio::time::Instant::now() + rand_duration(&self.conf.eviction_min_resident);
     139              : 
     140              :         info!("successfully restored evicted timeline");
     141              :         NUM_EVICTED_TIMELINES.dec();
     142              :     }
     143              : }
     144              : 
     145              : /// Ensure that content matches the remote partial backup, if local segment exists.
     146              : /// Then change state in control file and in-memory. If `delete_offloaded_wal` is set,
     147              : /// delete the local segment.
     148            0 : async fn do_eviction(
     149            0 :     mgr: &mut Manager,
     150            0 :     partial: &PartialRemoteSegment,
     151            0 :     storage: &GenericRemoteStorage,
     152            0 : ) -> anyhow::Result<()> {
     153            0 :     compare_local_segment_with_remote(mgr, partial, storage).await?;
     154              : 
     155            0 :     mgr.tli.switch_to_offloaded(partial).await?;
     156              :     // switch manager state as soon as possible
     157            0 :     mgr.is_offloaded = true;
     158              : 
     159            0 :     if mgr.conf.delete_offloaded_wal {
     160            0 :         delete_local_segment(mgr, partial).await?;
     161            0 :     }
     162              : 
     163            0 :     Ok(())
     164            0 : }
     165              : 
     166              : /// Ensure that content matches the remote partial backup, if local segment exists.
     167              : /// Then download segment to local disk and change state in control file and in-memory.
     168            0 : async fn do_uneviction(
     169            0 :     mgr: &mut Manager,
     170            0 :     partial: &PartialRemoteSegment,
     171            0 :     storage: &GenericRemoteStorage,
     172            0 : ) -> anyhow::Result<()> {
     173              :     // if the local segment is present, validate it
     174            0 :     compare_local_segment_with_remote(mgr, partial, storage).await?;
     175              : 
     176              :     // atomically download the partial segment
     177            0 :     redownload_partial_segment(mgr, partial, storage).await?;
     178              : 
     179            0 :     mgr.tli.switch_to_present().await?;
     180              :     // switch manager state as soon as possible
     181            0 :     mgr.is_offloaded = false;
     182              : 
     183            0 :     Ok(())
     184            0 : }
     185              : 
     186              : /// Delete local WAL segment.
     187            0 : async fn delete_local_segment(mgr: &Manager, partial: &PartialRemoteSegment) -> anyhow::Result<()> {
     188            0 :     let local_path = local_segment_path(mgr, partial);
     189              : 
     190            0 :     info!("deleting WAL file to evict: {}", local_path);
     191            0 :     tokio::fs::remove_file(&local_path).await?;
     192            0 :     Ok(())
     193            0 : }
     194              : 
     195              : /// Redownload partial segment from remote storage.
     196              : /// The segment is downloaded to a temporary file and then renamed to the final path.
     197            0 : async fn redownload_partial_segment(
     198            0 :     mgr: &Manager,
     199            0 :     partial: &PartialRemoteSegment,
     200            0 :     storage: &GenericRemoteStorage,
     201            0 : ) -> anyhow::Result<()> {
     202            0 :     let tmp_file = mgr.tli.timeline_dir().join("remote_partial.tmp");
     203            0 :     let remote_segfile = remote_segment_path(mgr, partial);
     204              : 
     205            0 :     debug!(
     206            0 :         "redownloading partial segment: {} -> {}",
     207              :         remote_segfile, tmp_file
     208              :     );
     209              : 
     210            0 :     let mut reader = wal_backup::read_object(storage, &remote_segfile, 0).await?;
     211            0 :     let mut file = File::create(&tmp_file).await?;
     212              : 
     213            0 :     let actual_len = tokio::io::copy(&mut reader, &mut file).await?;
     214            0 :     let expected_len = partial.flush_lsn.segment_offset(mgr.wal_seg_size);
     215              : 
     216            0 :     if actual_len != expected_len as u64 {
     217            0 :         anyhow::bail!(
     218            0 :             "partial downloaded {} bytes, expected {}",
     219              :             actual_len,
     220              :             expected_len
     221              :         );
     222            0 :     }
     223              : 
     224            0 :     if actual_len > mgr.wal_seg_size as u64 {
     225            0 :         anyhow::bail!(
     226            0 :             "remote segment is too long: {} bytes, expected {}",
     227              :             actual_len,
     228              :             mgr.wal_seg_size
     229              :         );
     230            0 :     }
     231            0 :     file.set_len(mgr.wal_seg_size as u64).await?;
     232            0 :     file.flush().await?;
     233              : 
     234            0 :     let final_path = local_segment_path(mgr, partial);
     235            0 :     info!("downloaded {actual_len} bytes, renaming to {final_path}");
     236            0 :     if let Err(e) = durable_rename(&tmp_file, &final_path, !mgr.conf.no_sync).await {
     237              :         // Probably rename succeeded, but fsync of it failed. Remove
     238              :         // the file then to avoid using it.
     239            0 :         tokio::fs::remove_file(tmp_file)
     240            0 :             .await
     241            0 :             .or_else(utils::fs_ext::ignore_not_found)?;
     242            0 :         return Err(e.into());
     243            0 :     }
     244              : 
     245            0 :     Ok(())
     246            0 : }
     247              : 
     248              : /// Compare local WAL segment with partial WAL backup in remote storage.
     249              : /// If the local segment is not present, the function does nothing.
     250              : /// If the local segment is present, it compares the local segment with the remote one.
     251            0 : async fn compare_local_segment_with_remote(
     252            0 :     mgr: &Manager,
     253            0 :     partial: &PartialRemoteSegment,
     254            0 :     storage: &GenericRemoteStorage,
     255            0 : ) -> anyhow::Result<()> {
     256            0 :     let local_path = local_segment_path(mgr, partial);
     257              : 
     258            0 :     match File::open(&local_path).await {
     259            0 :         Ok(mut local_file) => {
     260            0 :             do_validation(mgr, &mut local_file, mgr.wal_seg_size, partial, storage)
     261            0 :                 .await
     262            0 :                 .context("validation failed")
     263              :         }
     264              :         Err(_) => {
     265            0 :             info!(
     266            0 :                 "local WAL file {} is not present, skipping validation",
     267              :                 local_path
     268              :             );
     269            0 :             Ok(())
     270              :         }
     271              :     }
     272            0 : }
     273              : 
     274              : /// Compare opened local WAL segment with partial WAL backup in remote storage.
     275              : /// Validate full content of both files.
     276            0 : async fn do_validation(
     277            0 :     mgr: &Manager,
     278            0 :     file: &mut File,
     279            0 :     wal_seg_size: usize,
     280            0 :     partial: &PartialRemoteSegment,
     281            0 :     storage: &GenericRemoteStorage,
     282            0 : ) -> anyhow::Result<()> {
     283            0 :     let local_size = file.metadata().await?.len() as usize;
     284            0 :     if local_size != wal_seg_size {
     285            0 :         anyhow::bail!(
     286            0 :             "local segment size is invalid: found {}, expected {}",
     287              :             local_size,
     288              :             wal_seg_size
     289              :         );
     290            0 :     }
     291              : 
     292            0 :     let remote_segfile = remote_segment_path(mgr, partial);
     293            0 :     let mut remote_reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> =
     294            0 :         wal_backup::read_object(storage, &remote_segfile, 0).await?;
     295              : 
     296              :     // remote segment should have bytes excatly up to `flush_lsn`
     297            0 :     let expected_remote_size = partial.flush_lsn.segment_offset(mgr.wal_seg_size);
     298              :     // let's compare the first `expected_remote_size` bytes
     299            0 :     compare_n_bytes(&mut remote_reader, file, expected_remote_size).await?;
     300              :     // and check that the remote segment ends here
     301            0 :     check_end(&mut remote_reader).await?;
     302              : 
     303              :     // if local segment is longer, the rest should be zeroes
     304            0 :     read_n_zeroes(file, mgr.wal_seg_size - expected_remote_size).await?;
     305              :     // and check that the local segment ends here
     306            0 :     check_end(file).await?;
     307              : 
     308            0 :     Ok(())
     309            0 : }
     310              : 
     311            0 : fn local_segment_path(mgr: &Manager, partial: &PartialRemoteSegment) -> Utf8PathBuf {
     312            0 :     let flush_lsn = partial.flush_lsn;
     313            0 :     let segno = flush_lsn.segment_number(mgr.wal_seg_size);
     314            0 :     let (_, local_partial_segfile) =
     315            0 :         wal_file_paths(mgr.tli.timeline_dir(), segno, mgr.wal_seg_size);
     316            0 :     local_partial_segfile
     317            0 : }
     318              : 
     319            0 : fn remote_segment_path(mgr: &Manager, partial: &PartialRemoteSegment) -> RemotePath {
     320            0 :     partial.remote_path(&mgr.tli.remote_path)
     321            0 : }
     322              : 
     323              : /// Compare first `n` bytes of two readers. If the bytes differ, return an error.
     324              : /// If the readers are shorter than `n`, return an error.
     325            0 : async fn compare_n_bytes<R1, R2>(reader1: &mut R1, reader2: &mut R2, n: usize) -> anyhow::Result<()>
     326            0 : where
     327            0 :     R1: AsyncRead + Unpin,
     328            0 :     R2: AsyncRead + Unpin,
     329            0 : {
     330              :     use tokio::io::AsyncReadExt;
     331              : 
     332              :     const BUF_SIZE: usize = 32 * 1024;
     333              : 
     334            0 :     let mut buffer1 = vec![0u8; BUF_SIZE];
     335            0 :     let mut buffer2 = vec![0u8; BUF_SIZE];
     336              : 
     337            0 :     let mut offset = 0;
     338              : 
     339            0 :     while offset < n {
     340            0 :         let bytes_to_read = std::cmp::min(BUF_SIZE, n - offset);
     341              : 
     342            0 :         let bytes_read1 = reader1
     343            0 :             .read(&mut buffer1[..bytes_to_read])
     344            0 :             .await
     345            0 :             .with_context(|| format!("failed to read from reader1 at offset {offset}"))?;
     346            0 :         if bytes_read1 == 0 {
     347            0 :             anyhow::bail!("unexpected EOF from reader1 at offset {}", offset);
     348            0 :         }
     349              : 
     350            0 :         let bytes_read2 = reader2
     351            0 :             .read_exact(&mut buffer2[..bytes_read1])
     352            0 :             .await
     353            0 :             .with_context(|| {
     354            0 :                 format!("failed to read {bytes_read1} bytes from reader2 at offset {offset}")
     355            0 :             })?;
     356            0 :         assert!(bytes_read2 == bytes_read1);
     357              : 
     358            0 :         if buffer1[..bytes_read1] != buffer2[..bytes_read2] {
     359            0 :             let diff_offset = buffer1[..bytes_read1]
     360            0 :                 .iter()
     361            0 :                 .zip(buffer2[..bytes_read2].iter())
     362            0 :                 .position(|(a, b)| a != b)
     363            0 :                 .expect("mismatched buffers, but no difference found");
     364            0 :             anyhow::bail!("mismatch at offset {}", offset + diff_offset);
     365            0 :         }
     366              : 
     367            0 :         offset += bytes_read1;
     368              :     }
     369              : 
     370            0 :     Ok(())
     371            0 : }
     372              : 
     373            0 : async fn check_end<R>(mut reader: R) -> anyhow::Result<()>
     374            0 : where
     375            0 :     R: AsyncRead + Unpin,
     376            0 : {
     377              :     use tokio::io::AsyncReadExt;
     378              : 
     379            0 :     let mut buffer = [0u8; 1];
     380            0 :     let bytes_read = reader.read(&mut buffer).await?;
     381            0 :     if bytes_read != 0 {
     382            0 :         anyhow::bail!("expected EOF, found bytes");
     383            0 :     }
     384            0 :     Ok(())
     385            0 : }
     386              : 
     387            0 : async fn read_n_zeroes<R>(reader: &mut R, n: usize) -> anyhow::Result<()>
     388            0 : where
     389            0 :     R: AsyncRead + Unpin,
     390            0 : {
     391              :     use tokio::io::AsyncReadExt;
     392              : 
     393              :     const BUF_SIZE: usize = 32 * 1024;
     394            0 :     let mut buffer = vec![0u8; BUF_SIZE];
     395            0 :     let mut offset = 0;
     396              : 
     397            0 :     while offset < n {
     398            0 :         let bytes_to_read = std::cmp::min(BUF_SIZE, n - offset);
     399              : 
     400            0 :         let bytes_read = reader
     401            0 :             .read(&mut buffer[..bytes_to_read])
     402            0 :             .await
     403            0 :             .context("expected zeroes, got read error")?;
     404            0 :         if bytes_read == 0 {
     405            0 :             anyhow::bail!("expected zeroes, got EOF");
     406            0 :         }
     407              : 
     408            0 :         if buffer[..bytes_read].iter().all(|&b| b == 0) {
     409            0 :             offset += bytes_read;
     410            0 :         } else {
     411            0 :             anyhow::bail!("non-zero byte found");
     412              :         }
     413              :     }
     414              : 
     415            0 :     Ok(())
     416            0 : }
        

Generated by: LCOV version 2.1-beta