LCOV - code coverage report
Current view: top level - pageserver/src/tenant - gc_block.rs (source / functions) Coverage Total Hit
Test: b9d67f908f91f00e353a27440ba89f642a869959.info Lines: 20.5 % 122 25
Test Date: 2024-11-19 21:44:13 Functions: 26.7 % 15 4

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : 
       3              : use utils::id::TimelineId;
       4              : 
       5              : use super::remote_timeline_client::index::GcBlockingReason;
       6              : 
       7              : type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
       8              : 
       9              : /// GcBlock provides persistent (per-timeline) gc blocking.
      10              : #[derive(Default)]
      11              : pub(crate) struct GcBlock {
      12              :     /// The timelines which have current reasons to block gc.
      13              :     ///
      14              :     /// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
      15              :     /// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
      16              :     reasons: std::sync::Mutex<Storage>,
      17              : 
      18              :     /// GC background task or manually run `Tenant::gc_iteration` holds a lock on this.
      19              :     ///
      20              :     /// Do not add any more features taking and forbidding taking this lock. It should be
      21              :     /// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
      22              :     /// synchronizes with gc attempts by locking and unlocking this mutex.
      23              :     blocking: tokio::sync::Mutex<()>,
      24              : }
      25              : 
      26              : impl GcBlock {
      27              :     /// Start another gc iteration.
      28              :     ///
      29              :     /// Returns a guard to be held for the duration of gc iteration to allow synchronizing with
      30              :     /// it's ending, or if not currently possible, a value describing the reasons why not.
      31              :     ///
      32              :     /// Cancellation safe.
      33            4 :     pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
      34            4 :         let reasons = {
      35            4 :             let g = self.reasons.lock().unwrap();
      36            4 : 
      37            4 :             // TODO: the assumption is that this method gets called periodically. in prod, we use 1h, in
      38            4 :             // tests, we use everything. we should warn if the gc has been consecutively blocked
      39            4 :             // for more than 1h (within single tenant session?).
      40            4 :             BlockingReasons::clean_and_summarize(g)
      41              :         };
      42              : 
      43            4 :         if let Some(reasons) = reasons {
      44            0 :             Err(reasons)
      45              :         } else {
      46              :             Ok(Guard {
      47            4 :                 _inner: self.blocking.lock().await,
      48              :             })
      49              :         }
      50            4 :     }
      51              : 
      52              :     /// Describe the current gc blocking reasons.
      53              :     ///
      54              :     /// TODO: make this json serializable.
      55            0 :     pub(crate) fn summary(&self) -> Option<BlockingReasons> {
      56            0 :         let g = self.reasons.lock().unwrap();
      57            0 : 
      58            0 :         BlockingReasons::summarize(&g)
      59            0 :     }
      60              : 
      61              :     /// Start blocking gc for this one timeline for the given reason.
      62              :     ///
      63              :     /// This is not a guard based API but instead it mimics set API. The returned future will not
      64              :     /// resolve until an existing gc round has completed.
      65              :     ///
      66              :     /// Returns true if this block was new, false if gc was already blocked for this reason.
      67              :     ///
      68              :     /// Cancellation safe: cancelling after first poll will keep the reason to block gc, but will
      69              :     /// keep the gc blocking reason.
      70            0 :     pub(crate) async fn insert(
      71            0 :         &self,
      72            0 :         timeline: &super::Timeline,
      73            0 :         reason: GcBlockingReason,
      74            0 :     ) -> anyhow::Result<bool> {
      75            0 :         let (added, uploaded) = {
      76            0 :             let mut g = self.reasons.lock().unwrap();
      77            0 :             let set = g.entry(timeline.timeline_id).or_default();
      78            0 :             let added = set.insert(reason);
      79              : 
      80              :             // LOCK ORDER: intentionally hold the lock, see self.reasons.
      81            0 :             let uploaded = timeline
      82            0 :                 .remote_client
      83            0 :                 .schedule_insert_gc_block_reason(reason)?;
      84              : 
      85            0 :             (added, uploaded)
      86            0 :         };
      87            0 : 
      88            0 :         uploaded.await?;
      89              : 
      90              :         // ensure that any ongoing gc iteration has completed
      91            0 :         drop(self.blocking.lock().await);
      92              : 
      93            0 :         Ok(added)
      94            0 :     }
      95              : 
      96              :     /// Remove blocking gc for this one timeline and the given reason.
      97            0 :     pub(crate) async fn remove(
      98            0 :         &self,
      99            0 :         timeline: &super::Timeline,
     100            0 :         reason: GcBlockingReason,
     101            0 :     ) -> anyhow::Result<()> {
     102              :         use std::collections::hash_map::Entry;
     103              : 
     104            0 :         super::span::debug_assert_current_span_has_tenant_and_timeline_id();
     105              : 
     106            0 :         let (remaining_blocks, uploaded) = {
     107            0 :             let mut g = self.reasons.lock().unwrap();
     108            0 :             match g.entry(timeline.timeline_id) {
     109            0 :                 Entry::Occupied(mut oe) => {
     110            0 :                     let set = oe.get_mut();
     111            0 :                     set.remove(reason);
     112            0 :                     if set.is_empty() {
     113            0 :                         oe.remove();
     114            0 :                     }
     115              :                 }
     116            0 :                 Entry::Vacant(_) => {
     117            0 :                     // we must still do the index_part.json update regardless, in case we had earlier
     118            0 :                     // been cancelled
     119            0 :                 }
     120              :             }
     121              : 
     122            0 :             let remaining_blocks = g.len();
     123              : 
     124              :             // LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
     125            0 :             let uploaded = timeline
     126            0 :                 .remote_client
     127            0 :                 .schedule_remove_gc_block_reason(reason)?;
     128              : 
     129            0 :             (remaining_blocks, uploaded)
     130            0 :         };
     131            0 :         uploaded.await?;
     132              : 
     133              :         // no need to synchronize with gc iteration again
     134              : 
     135            0 :         if remaining_blocks > 0 {
     136            0 :             tracing::info!(remaining_blocks, removed=?reason, "gc blocking removed, but gc remains blocked");
     137              :         } else {
     138            0 :             tracing::info!("gc is now unblocked for the tenant");
     139              :         }
     140              : 
     141            0 :         Ok(())
     142            0 :     }
     143              : 
     144            0 :     pub(crate) fn before_delete(&self, timeline_id: &super::TimelineId) {
     145            0 :         let unblocked = {
     146            0 :             let mut g = self.reasons.lock().unwrap();
     147            0 :             if g.is_empty() {
     148            0 :                 return;
     149            0 :             }
     150            0 : 
     151            0 :             g.remove(timeline_id);
     152            0 : 
     153            0 :             BlockingReasons::clean_and_summarize(g).is_none()
     154            0 :         };
     155            0 : 
     156            0 :         if unblocked {
     157            0 :             tracing::info!("gc is now unblocked following deletion");
     158            0 :         }
     159            0 :     }
     160              : 
     161              :     /// Initialize with the non-deleted timelines of this tenant.
     162          192 :     pub(crate) fn set_scanned(&self, scanned: Storage) {
     163          192 :         let mut g = self.reasons.lock().unwrap();
     164          192 :         assert!(g.is_empty());
     165          192 :         g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
     166              : 
     167          192 :         if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
     168            0 :             tracing::info!(summary=?reasons, "initialized with gc blocked");
     169          192 :         }
     170          192 :     }
     171              : }
     172              : 
     173              : pub(super) struct Guard<'a> {
     174              :     _inner: tokio::sync::MutexGuard<'a, ()>,
     175              : }
     176              : 
     177              : #[derive(Debug)]
     178              : pub(crate) struct BlockingReasons {
     179              :     timelines: usize,
     180              :     reasons: enumset::EnumSet<GcBlockingReason>,
     181              : }
     182              : 
     183              : impl std::fmt::Display for BlockingReasons {
     184            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     185            0 :         write!(
     186            0 :             f,
     187            0 :             "{} timelines block for {:?}",
     188            0 :             self.timelines, self.reasons
     189            0 :         )
     190            0 :     }
     191              : }
     192              : 
     193              : impl BlockingReasons {
     194          196 :     fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
     195          196 :         let mut reasons = enumset::EnumSet::empty();
     196          196 :         g.retain(|_key, value| {
     197            0 :             reasons = reasons.union(*value);
     198            0 :             !value.is_empty()
     199          196 :         });
     200          196 :         if !g.is_empty() {
     201            0 :             Some(BlockingReasons {
     202            0 :                 timelines: g.len(),
     203            0 :                 reasons,
     204            0 :             })
     205              :         } else {
     206          196 :             None
     207              :         }
     208          196 :     }
     209              : 
     210            0 :     fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
     211            0 :         if g.is_empty() {
     212            0 :             None
     213              :         } else {
     214            0 :             let reasons = g
     215            0 :                 .values()
     216            0 :                 .fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
     217            0 :             Some(BlockingReasons {
     218            0 :                 timelines: g.len(),
     219            0 :                 reasons,
     220            0 :             })
     221              :         }
     222            0 :     }
     223              : }
        

Generated by: LCOV version 2.1-beta