LCOV - code coverage report
Current view: top level - pageserver/src/tenant - gc_block.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 20.5 % 122 25
Test Date: 2025-03-12 00:01:28 Functions: 26.7 % 15 4

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::sync::Arc;
       3              : 
       4              : use utils::id::TimelineId;
       5              : 
       6              : use super::remote_timeline_client::index::GcBlockingReason;
       7              : 
       8              : type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
       9              : 
      10              : /// GcBlock provides persistent (per-timeline) gc blocking.
      11              : #[derive(Default)]
      12              : pub(crate) struct GcBlock {
      13              :     /// The timelines which have current reasons to block gc.
      14              :     ///
      15              :     /// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
      16              :     /// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
      17              :     reasons: std::sync::Mutex<Storage>,
      18              : 
      19              :     /// GC background task or manually run `Tenant::gc_iteration` holds a lock on this.
      20              :     ///
      21              :     /// Do not add any more features taking and forbidding taking this lock. It should be
      22              :     /// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
      23              :     /// synchronizes with gc attempts by locking and unlocking this mutex.
      24              :     blocking: Arc<tokio::sync::Mutex<()>>,
      25              : }
      26              : 
      27              : impl GcBlock {
      28              :     /// Start another gc iteration.
      29              :     ///
      30              :     /// Returns a guard to be held for the duration of gc iteration to allow synchronizing with
      31              :     /// it's ending, or if not currently possible, a value describing the reasons why not.
      32              :     ///
      33              :     /// Cancellation safe.
      34            8 :     pub(super) async fn start(&self) -> Result<Guard, BlockingReasons> {
      35            8 :         let reasons = {
      36            8 :             let g = self.reasons.lock().unwrap();
      37            8 : 
      38            8 :             // TODO: the assumption is that this method gets called periodically. in prod, we use 1h, in
      39            8 :             // tests, we use everything. we should warn if the gc has been consecutively blocked
      40            8 :             // for more than 1h (within single tenant session?).
      41            8 :             BlockingReasons::clean_and_summarize(g)
      42              :         };
      43              : 
      44            8 :         if let Some(reasons) = reasons {
      45            0 :             Err(reasons)
      46              :         } else {
      47              :             Ok(Guard {
      48            8 :                 _inner: self.blocking.clone().lock_owned().await,
      49              :             })
      50              :         }
      51            8 :     }
      52              : 
      53              :     /// Describe the current gc blocking reasons.
      54              :     ///
      55              :     /// TODO: make this json serializable.
      56            0 :     pub(crate) fn summary(&self) -> Option<BlockingReasons> {
      57            0 :         let g = self.reasons.lock().unwrap();
      58            0 : 
      59            0 :         BlockingReasons::summarize(&g)
      60            0 :     }
      61              : 
      62              :     /// Start blocking gc for this one timeline for the given reason.
      63              :     ///
      64              :     /// This is not a guard based API but instead it mimics set API. The returned future will not
      65              :     /// resolve until an existing gc round has completed.
      66              :     ///
      67              :     /// Returns true if this block was new, false if gc was already blocked for this reason.
      68              :     ///
      69              :     /// Cancellation safe: cancelling after first poll will keep the reason to block gc, but will
      70              :     /// keep the gc blocking reason.
      71            0 :     pub(crate) async fn insert(
      72            0 :         &self,
      73            0 :         timeline: &super::Timeline,
      74            0 :         reason: GcBlockingReason,
      75            0 :     ) -> anyhow::Result<bool> {
      76            0 :         let (added, uploaded) = {
      77            0 :             let mut g = self.reasons.lock().unwrap();
      78            0 :             let set = g.entry(timeline.timeline_id).or_default();
      79            0 :             let added = set.insert(reason);
      80              : 
      81              :             // LOCK ORDER: intentionally hold the lock, see self.reasons.
      82            0 :             let uploaded = timeline
      83            0 :                 .remote_client
      84            0 :                 .schedule_insert_gc_block_reason(reason)?;
      85              : 
      86            0 :             (added, uploaded)
      87            0 :         };
      88            0 : 
      89            0 :         uploaded.await?;
      90              : 
      91              :         // ensure that any ongoing gc iteration has completed
      92            0 :         drop(self.blocking.lock().await);
      93              : 
      94            0 :         Ok(added)
      95            0 :     }
      96              : 
      97              :     /// Remove blocking gc for this one timeline and the given reason.
      98            0 :     pub(crate) async fn remove(
      99            0 :         &self,
     100            0 :         timeline: &super::Timeline,
     101            0 :         reason: GcBlockingReason,
     102            0 :     ) -> anyhow::Result<()> {
     103              :         use std::collections::hash_map::Entry;
     104              : 
     105            0 :         super::span::debug_assert_current_span_has_tenant_and_timeline_id();
     106              : 
     107            0 :         let (remaining_blocks, uploaded) = {
     108            0 :             let mut g = self.reasons.lock().unwrap();
     109            0 :             match g.entry(timeline.timeline_id) {
     110            0 :                 Entry::Occupied(mut oe) => {
     111            0 :                     let set = oe.get_mut();
     112            0 :                     set.remove(reason);
     113            0 :                     if set.is_empty() {
     114            0 :                         oe.remove();
     115            0 :                     }
     116              :                 }
     117            0 :                 Entry::Vacant(_) => {
     118            0 :                     // we must still do the index_part.json update regardless, in case we had earlier
     119            0 :                     // been cancelled
     120            0 :                 }
     121              :             }
     122              : 
     123            0 :             let remaining_blocks = g.len();
     124              : 
     125              :             // LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
     126            0 :             let uploaded = timeline
     127            0 :                 .remote_client
     128            0 :                 .schedule_remove_gc_block_reason(reason)?;
     129              : 
     130            0 :             (remaining_blocks, uploaded)
     131            0 :         };
     132            0 :         uploaded.await?;
     133              : 
     134              :         // no need to synchronize with gc iteration again
     135              : 
     136            0 :         if remaining_blocks > 0 {
     137            0 :             tracing::info!(remaining_blocks, removed=?reason, "gc blocking removed, but gc remains blocked");
     138              :         } else {
     139            0 :             tracing::info!("gc is now unblocked for the tenant");
     140              :         }
     141              : 
     142            0 :         Ok(())
     143            0 :     }
     144              : 
     145            0 :     pub(crate) fn before_delete(&self, timeline_id: &super::TimelineId) {
     146            0 :         let unblocked = {
     147            0 :             let mut g = self.reasons.lock().unwrap();
     148            0 :             if g.is_empty() {
     149            0 :                 return;
     150            0 :             }
     151            0 : 
     152            0 :             g.remove(timeline_id);
     153            0 : 
     154            0 :             BlockingReasons::clean_and_summarize(g).is_none()
     155            0 :         };
     156            0 : 
     157            0 :         if unblocked {
     158            0 :             tracing::info!("gc is now unblocked following deletion");
     159            0 :         }
     160            0 :     }
     161              : 
     162              :     /// Initialize with the non-deleted timelines of this tenant.
     163          452 :     pub(crate) fn set_scanned(&self, scanned: Storage) {
     164          452 :         let mut g = self.reasons.lock().unwrap();
     165          452 :         assert!(g.is_empty());
     166          452 :         g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
     167              : 
     168          452 :         if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
     169            0 :             tracing::info!(summary=?reasons, "initialized with gc blocked");
     170          452 :         }
     171          452 :     }
     172              : }
     173              : 
     174              : pub(crate) struct Guard {
     175              :     _inner: tokio::sync::OwnedMutexGuard<()>,
     176              : }
     177              : 
     178              : #[derive(Debug)]
     179              : pub(crate) struct BlockingReasons {
     180              :     timelines: usize,
     181              :     reasons: enumset::EnumSet<GcBlockingReason>,
     182              : }
     183              : 
     184              : impl std::fmt::Display for BlockingReasons {
     185            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     186            0 :         write!(
     187            0 :             f,
     188            0 :             "{} timelines block for {:?}",
     189            0 :             self.timelines, self.reasons
     190            0 :         )
     191            0 :     }
     192              : }
     193              : 
     194              : impl BlockingReasons {
     195          460 :     fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
     196          460 :         let mut reasons = enumset::EnumSet::empty();
     197          460 :         g.retain(|_key, value| {
     198            0 :             reasons = reasons.union(*value);
     199            0 :             !value.is_empty()
     200          460 :         });
     201          460 :         if !g.is_empty() {
     202            0 :             Some(BlockingReasons {
     203            0 :                 timelines: g.len(),
     204            0 :                 reasons,
     205            0 :             })
     206              :         } else {
     207          460 :             None
     208              :         }
     209          460 :     }
     210              : 
     211            0 :     fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
     212            0 :         if g.is_empty() {
     213            0 :             None
     214              :         } else {
     215            0 :             let reasons = g
     216            0 :                 .values()
     217            0 :                 .fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
     218            0 :             Some(BlockingReasons {
     219            0 :                 timelines: g.len(),
     220            0 :                 reasons,
     221            0 :             })
     222              :         }
     223            0 :     }
     224              : }
        

Generated by: LCOV version 2.1-beta