LCOV - code coverage report
Current view: top level - pageserver/src/tenant - gc_block.rs (source / functions) Coverage Total Hit
Test: ccf45ed1c149555259baec52d6229a81013dcd6a.info Lines: 20.2 % 124 25
Test Date: 2024-08-21 17:32:46 Functions: 26.7 % 15 4

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : 
       3              : use utils::id::TimelineId;
       4              : 
       5              : use super::remote_timeline_client::index::GcBlockingReason;
       6              : 
       7              : type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
       8              : 
       9              : #[derive(Default)]
      10              : pub(crate) struct GcBlock {
      11              :     /// The timelines which have current reasons to block gc.
      12              :     ///
      13              :     /// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
      14              :     /// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
      15              :     reasons: std::sync::Mutex<Storage>,
      16              :     blocking: tokio::sync::Mutex<()>,
      17              : }
      18              : 
      19              : impl GcBlock {
      20              :     /// Start another gc iteration.
      21              :     ///
      22              :     /// Returns a guard to be held for the duration of gc iteration to allow synchronizing with
      23              :     /// it's ending, or if not currently possible, a value describing the reasons why not.
      24              :     ///
      25              :     /// Cancellation safe.
      26          754 :     pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
      27          754 :         let reasons = {
      28          754 :             let g = self.reasons.lock().unwrap();
      29          754 : 
      30          754 :             // TODO: the assumption is that this method gets called periodically. in prod, we use 1h, in
      31          754 :             // tests, we use everything. we should warn if the gc has been consecutively blocked
      32          754 :             // for more than 1h (within single tenant session?).
      33          754 :             BlockingReasons::clean_and_summarize(g)
      34              :         };
      35              : 
      36          754 :         if let Some(reasons) = reasons {
      37            0 :             Err(reasons)
      38              :         } else {
      39              :             Ok(Guard {
      40          754 :                 _inner: self.blocking.lock().await,
      41              :             })
      42              :         }
      43          754 :     }
      44              : 
      45            0 :     pub(crate) fn summary(&self) -> Option<BlockingReasons> {
      46            0 :         let g = self.reasons.lock().unwrap();
      47            0 : 
      48            0 :         BlockingReasons::summarize(&g)
      49            0 :     }
      50              : 
      51              :     /// Start blocking gc for this one timeline for the given reason.
      52              :     ///
      53              :     /// This is not a guard based API but instead it mimics set API. The returned future will not
      54              :     /// resolve until an existing gc round has completed.
      55              :     ///
      56              :     /// Returns true if this block was new, false if gc was already blocked for this reason.
      57              :     ///
      58              :     /// Cancellation safe: cancelling after first poll will keep the reason to block gc, but will
      59              :     /// keep the gc blocking reason.
      60            0 :     pub(crate) async fn insert(
      61            0 :         &self,
      62            0 :         timeline: &super::Timeline,
      63            0 :         reason: GcBlockingReason,
      64            0 :     ) -> anyhow::Result<bool> {
      65            0 :         let (added, uploaded) = {
      66            0 :             let mut g = self.reasons.lock().unwrap();
      67            0 :             let set = g.entry(timeline.timeline_id).or_default();
      68            0 :             let added = set.insert(reason);
      69              : 
      70              :             // LOCK ORDER: intentionally hold the lock, see self.reasons.
      71            0 :             let uploaded = timeline
      72            0 :                 .remote_client
      73            0 :                 .schedule_insert_gc_block_reason(reason)?;
      74              : 
      75            0 :             (added, uploaded)
      76            0 :         };
      77            0 : 
      78            0 :         uploaded.await?;
      79              : 
      80              :         // ensure that any ongoing gc iteration has completed
      81            0 :         drop(self.blocking.lock().await);
      82              : 
      83            0 :         Ok(added)
      84            0 :     }
      85              : 
      86              :     /// Remove blocking gc for this one timeline and the given reason.
      87            0 :     pub(crate) async fn remove(
      88            0 :         &self,
      89            0 :         timeline: &super::Timeline,
      90            0 :         reason: GcBlockingReason,
      91            0 :     ) -> anyhow::Result<()> {
      92            0 :         use std::collections::hash_map::Entry;
      93            0 : 
      94            0 :         super::span::debug_assert_current_span_has_tenant_and_timeline_id();
      95              : 
      96            0 :         let (remaining_blocks, uploaded) = {
      97            0 :             let mut g = self.reasons.lock().unwrap();
      98            0 :             match g.entry(timeline.timeline_id) {
      99            0 :                 Entry::Occupied(mut oe) => {
     100            0 :                     let set = oe.get_mut();
     101            0 :                     set.remove(reason);
     102            0 :                     if set.is_empty() {
     103            0 :                         oe.remove();
     104            0 :                     }
     105              :                 }
     106            0 :                 Entry::Vacant(_) => {
     107            0 :                     // we must still do the index_part.json update regardless, in case we had earlier
     108            0 :                     // been cancelled
     109            0 :                 }
     110              :             }
     111              : 
     112            0 :             let remaining_blocks = g.len();
     113              : 
     114              :             // LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
     115            0 :             let uploaded = timeline
     116            0 :                 .remote_client
     117            0 :                 .schedule_remove_gc_block_reason(reason)?;
     118              : 
     119            0 :             (remaining_blocks, uploaded)
     120            0 :         };
     121            0 :         uploaded.await?;
     122              : 
     123              :         // no need to synchronize with gc iteration again
     124              : 
     125            0 :         if remaining_blocks > 0 {
     126            0 :             tracing::info!(remaining_blocks, removed=?reason, "gc blocking removed, but gc remains blocked");
     127              :         } else {
     128            0 :             tracing::info!("gc is now unblocked for the tenant");
     129              :         }
     130              : 
     131            0 :         Ok(())
     132            0 :     }
     133              : 
     134            0 :     pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
     135            0 :         let unblocked = {
     136            0 :             let mut g = self.reasons.lock().unwrap();
     137            0 :             if g.is_empty() {
     138            0 :                 return;
     139            0 :             }
     140            0 : 
     141            0 :             g.remove(&timeline.timeline_id);
     142            0 : 
     143            0 :             BlockingReasons::clean_and_summarize(g).is_none()
     144            0 :         };
     145            0 : 
     146            0 :         if unblocked {
     147            0 :             tracing::info!("gc is now unblocked following deletion");
     148            0 :         }
     149            0 :     }
     150              : 
     151              :     /// Initialize with the non-deleted timelines of this tenant.
     152          182 :     pub(crate) fn set_scanned(&self, scanned: Storage) {
     153          182 :         let mut g = self.reasons.lock().unwrap();
     154          182 :         assert!(g.is_empty());
     155          182 :         g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
     156              : 
     157          182 :         if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
     158            0 :             tracing::info!(summary=?reasons, "initialized with gc blocked");
     159          182 :         }
     160          182 :     }
     161              : }
     162              : 
     163              : pub(super) struct Guard<'a> {
     164              :     _inner: tokio::sync::MutexGuard<'a, ()>,
     165              : }
     166              : 
     167              : #[derive(Debug)]
     168              : pub(crate) struct BlockingReasons {
     169              :     timelines: usize,
     170              :     reasons: enumset::EnumSet<GcBlockingReason>,
     171              : }
     172              : 
     173              : impl std::fmt::Display for BlockingReasons {
     174            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     175            0 :         write!(
     176            0 :             f,
     177            0 :             "{} timelines block for {:?}",
     178            0 :             self.timelines, self.reasons
     179            0 :         )
     180            0 :     }
     181              : }
     182              : 
     183              : impl BlockingReasons {
     184          936 :     fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
     185          936 :         let mut reasons = enumset::EnumSet::empty();
     186          936 :         g.retain(|_key, value| {
     187            0 :             reasons = reasons.union(*value);
     188            0 :             !value.is_empty()
     189          936 :         });
     190          936 :         if !g.is_empty() {
     191            0 :             Some(BlockingReasons {
     192            0 :                 timelines: g.len(),
     193            0 :                 reasons,
     194            0 :             })
     195              :         } else {
     196          936 :             None
     197              :         }
     198          936 :     }
     199              : 
     200            0 :     fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
     201            0 :         if g.is_empty() {
     202            0 :             None
     203              :         } else {
     204            0 :             let reasons = g
     205            0 :                 .values()
     206            0 :                 .fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
     207            0 :             Some(BlockingReasons {
     208            0 :                 timelines: g.len(),
     209            0 :                 reasons,
     210            0 :             })
     211              :         }
     212            0 :     }
     213              : }
        

Generated by: LCOV version 2.1-beta