LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - upload_queue.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 88.5 % 122 108 14 108
Current Date: 2024-01-09 02:06:09 Functions: 73.3 % 15 11 4 11
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use super::storage_layer::LayerFileName;
       2                 : use super::storage_layer::ResidentLayer;
       3                 : use crate::tenant::metadata::TimelineMetadata;
       4                 : use crate::tenant::remote_timeline_client::index::IndexPart;
       5                 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
       6                 : use std::collections::{HashMap, VecDeque};
       7                 : use std::fmt::Debug;
       8                 : 
       9                 : use chrono::NaiveDateTime;
      10                 : use std::sync::Arc;
      11                 : use tracing::info;
      12                 : use utils::lsn::AtomicLsn;
      13                 : 
      14                 : use std::sync::atomic::AtomicU32;
      15                 : use utils::lsn::Lsn;
      16                 : 
      17                 : #[cfg(feature = "testing")]
      18                 : use utils::generation::Generation;
      19                 : 
      20                 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
      21                 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
      22                 : // that many upload queues in a running pageserver, and most of them are initialized
      23                 : // anyway.
      24                 : #[allow(clippy::large_enum_variant)]
      25                 : pub(super) enum UploadQueue {
      26                 :     Uninitialized,
      27                 :     Initialized(UploadQueueInitialized),
      28                 :     Stopped(UploadQueueStopped),
      29                 : }
      30                 : 
      31                 : impl UploadQueue {
      32 CBC           7 :     pub fn as_str(&self) -> &'static str {
      33               7 :         match self {
      34 UBC           0 :             UploadQueue::Uninitialized => "Uninitialized",
      35               0 :             UploadQueue::Initialized(_) => "Initialized",
      36 CBC           7 :             UploadQueue::Stopped(_) => "Stopped",
      37                 :         }
      38               7 :     }
      39                 : }
      40                 : 
      41                 : /// This keeps track of queued and in-progress tasks.
      42                 : pub(crate) struct UploadQueueInitialized {
      43                 :     /// Counter to assign task IDs
      44                 :     pub(crate) task_counter: u64,
      45                 : 
      46                 :     /// All layer files stored in the remote storage, taking into account all
      47                 :     /// in-progress and queued operations
      48                 :     pub(crate) latest_files: HashMap<LayerFileName, LayerFileMetadata>,
      49                 : 
      50                 :     /// How many file uploads or deletions been scheduled, since the
      51                 :     /// last (scheduling of) metadata index upload?
      52                 :     pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
      53                 : 
      54                 :     /// Metadata stored in the remote storage, taking into account all
      55                 :     /// in-progress and queued operations.
      56                 :     /// DANGER: do not return to outside world, e.g., safekeepers.
      57                 :     pub(crate) latest_metadata: TimelineMetadata,
      58                 : 
      59                 :     /// `disk_consistent_lsn` from the last metadata file that was successfully
      60                 :     /// uploaded. `Lsn(0)` if nothing was uploaded yet.
      61                 :     /// Unlike `latest_files` or `latest_metadata`, this value is never ahead.
      62                 :     /// Safekeeper can rely on it to make decisions for WAL storage.
      63                 :     ///
      64                 :     /// visible_remote_consistent_lsn is only updated after our generation has been validated with
      65                 :     /// the control plane (unlesss a timeline's generation is None, in which case
      66                 :     /// we skip validation)
      67                 :     pub(crate) projected_remote_consistent_lsn: Option<Lsn>,
      68                 :     pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
      69                 : 
      70                 :     // Breakdown of different kinds of tasks currently in-progress
      71                 :     pub(crate) num_inprogress_layer_uploads: usize,
      72                 :     pub(crate) num_inprogress_metadata_uploads: usize,
      73                 :     pub(crate) num_inprogress_deletions: usize,
      74                 : 
      75                 :     /// Tasks that are currently in-progress. In-progress means that a tokio Task
      76                 :     /// has been launched for it. An in-progress task can be busy uploading, but it can
      77                 :     /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
      78                 :     /// be waiting for retry in `exponential_backoff`.
      79                 :     pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
      80                 : 
      81                 :     /// Queued operations that have not been launched yet. They might depend on previous
      82                 :     /// tasks to finish. For example, metadata upload cannot be performed before all
      83                 :     /// preceding layer file uploads have completed.
      84                 :     pub(crate) queued_operations: VecDeque<UploadOp>,
      85                 : 
      86                 :     /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
      87                 :     /// for error logging.
      88                 :     ///
      89                 :     /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
      90                 :     /// bug causing leaks, then it's better to not leave this enabled for production builds.
      91                 :     #[cfg(feature = "testing")]
      92                 :     pub(crate) dangling_files: HashMap<LayerFileName, Generation>,
      93                 : 
      94                 :     /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
      95                 :     pub(crate) shutting_down: bool,
      96                 : 
      97                 :     /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
      98                 :     /// wait on until one of them stops the queue. The semaphore is closed when
      99                 :     /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
     100                 :     pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
     101                 : }
     102                 : 
     103                 : impl UploadQueueInitialized {
     104             171 :     pub(super) fn no_pending_work(&self) -> bool {
     105             171 :         self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
     106             171 :     }
     107                 : 
     108          568864 :     pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
     109          568864 :         self.visible_remote_consistent_lsn.load()
     110          568864 :     }
     111                 : 
     112            2993 :     pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     113            2993 :         self.projected_remote_consistent_lsn
     114            2993 :     }
     115                 : }
     116                 : 
     117 UBC           0 : #[derive(Clone, Copy)]
     118                 : pub(super) enum SetDeletedFlagProgress {
     119                 :     NotRunning,
     120                 :     InProgress(NaiveDateTime),
     121                 :     Successful(NaiveDateTime),
     122                 : }
     123                 : 
     124                 : pub(super) struct UploadQueueStopped {
     125                 :     pub(super) upload_queue_for_deletion: UploadQueueInitialized,
     126                 :     pub(super) deleted_at: SetDeletedFlagProgress,
     127                 : }
     128                 : 
     129                 : impl UploadQueue {
     130 CBC         921 :     pub(crate) fn initialize_empty_remote(
     131             921 :         &mut self,
     132             921 :         metadata: &TimelineMetadata,
     133             921 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     134             921 :         match self {
     135             921 :             UploadQueue::Uninitialized => (),
     136                 :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     137 UBC           0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     138                 :             }
     139                 :         }
     140                 : 
     141 CBC         921 :         info!("initializing upload queue for empty remote");
     142                 : 
     143             921 :         let state = UploadQueueInitialized {
     144             921 :             // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
     145             921 :             latest_files: HashMap::new(),
     146             921 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     147             921 :             latest_metadata: metadata.clone(),
     148             921 :             projected_remote_consistent_lsn: None,
     149             921 :             visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
     150             921 :             // what follows are boring default initializations
     151             921 :             task_counter: 0,
     152             921 :             num_inprogress_layer_uploads: 0,
     153             921 :             num_inprogress_metadata_uploads: 0,
     154             921 :             num_inprogress_deletions: 0,
     155             921 :             inprogress_tasks: HashMap::new(),
     156             921 :             queued_operations: VecDeque::new(),
     157             921 :             #[cfg(feature = "testing")]
     158             921 :             dangling_files: HashMap::new(),
     159             921 :             shutting_down: false,
     160             921 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     161             921 :         };
     162             921 : 
     163             921 :         *self = UploadQueue::Initialized(state);
     164             921 :         Ok(self.initialized_mut().expect("we just set it"))
     165             921 :     }
     166                 : 
     167             369 :     pub(crate) fn initialize_with_current_remote_index_part(
     168             369 :         &mut self,
     169             369 :         index_part: &IndexPart,
     170             369 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     171             369 :         match self {
     172             369 :             UploadQueue::Uninitialized => (),
     173                 :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     174 UBC           0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     175                 :             }
     176                 :         }
     177                 : 
     178 CBC         369 :         let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
     179           58550 :         for (layer_name, layer_metadata) in &index_part.layer_metadata {
     180           58181 :             files.insert(
     181           58181 :                 layer_name.to_owned(),
     182           58181 :                 LayerFileMetadata::from(layer_metadata),
     183           58181 :             );
     184           58181 :         }
     185                 : 
     186             369 :         info!(
     187             369 :             "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
     188             369 :             index_part.metadata.disk_consistent_lsn()
     189             369 :         );
     190                 : 
     191             369 :         let state = UploadQueueInitialized {
     192             369 :             latest_files: files,
     193             369 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     194             369 :             latest_metadata: index_part.metadata.clone(),
     195             369 :             projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
     196             369 :             visible_remote_consistent_lsn: Arc::new(
     197             369 :                 index_part.metadata.disk_consistent_lsn().into(),
     198             369 :             ),
     199             369 :             // what follows are boring default initializations
     200             369 :             task_counter: 0,
     201             369 :             num_inprogress_layer_uploads: 0,
     202             369 :             num_inprogress_metadata_uploads: 0,
     203             369 :             num_inprogress_deletions: 0,
     204             369 :             inprogress_tasks: HashMap::new(),
     205             369 :             queued_operations: VecDeque::new(),
     206             369 :             #[cfg(feature = "testing")]
     207             369 :             dangling_files: HashMap::new(),
     208             369 :             shutting_down: false,
     209             369 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     210             369 :         };
     211             369 : 
     212             369 :         *self = UploadQueue::Initialized(state);
     213             369 :         Ok(self.initialized_mut().expect("we just set it"))
     214             369 :     }
     215                 : 
     216           24314 :     pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
     217           24314 :         match self {
     218                 :             UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
     219               7 :                 anyhow::bail!("queue is in state {}", self.as_str())
     220                 :             }
     221           24307 :             UploadQueue::Initialized(x) => {
     222           24307 :                 if !x.shutting_down {
     223           24307 :                     Ok(x)
     224                 :                 } else {
     225 UBC           0 :                     anyhow::bail!("queue is shutting down")
     226                 :                 }
     227                 :             }
     228                 :         }
     229 CBC       24314 :     }
     230                 : 
     231             515 :     pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> {
     232             515 :         match self {
     233                 :             UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
     234 UBC           0 :                 anyhow::bail!("queue is in state {}", self.as_str())
     235                 :             }
     236 CBC         515 :             UploadQueue::Stopped(stopped) => Ok(stopped),
     237                 :         }
     238             515 :     }
     239                 : }
     240                 : 
     241                 : /// An in-progress upload or delete task.
     242 UBC           0 : #[derive(Debug)]
     243                 : pub(crate) struct UploadTask {
     244                 :     /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
     245                 :     pub(crate) task_id: u64,
     246                 :     pub(crate) retries: AtomicU32,
     247                 : 
     248                 :     pub(crate) op: UploadOp,
     249                 : }
     250                 : 
     251                 : /// A deletion of some layers within the lifetime of a timeline.  This is not used
     252                 : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
     253               0 : #[derive(Debug)]
     254                 : pub(crate) struct Delete {
     255                 :     pub(crate) layers: Vec<(LayerFileName, LayerFileMetadata)>,
     256                 : }
     257                 : 
     258               0 : #[derive(Debug)]
     259                 : pub(crate) enum UploadOp {
     260                 :     /// Upload a layer file
     261                 :     UploadLayer(ResidentLayer, LayerFileMetadata),
     262                 : 
     263                 :     /// Upload the metadata file
     264                 :     UploadMetadata(IndexPart, Lsn),
     265                 : 
     266                 :     /// Delete layer files
     267                 :     Delete(Delete),
     268                 : 
     269                 :     /// Barrier. When the barrier operation is reached,
     270                 :     Barrier(tokio::sync::watch::Sender<()>),
     271                 : 
     272                 :     /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
     273                 :     /// this is the same as a Barrier.
     274                 :     Shutdown,
     275                 : }
     276                 : 
     277                 : impl std::fmt::Display for UploadOp {
     278 CBC        4676 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     279            4676 :         match self {
     280            3424 :             UploadOp::UploadLayer(layer, metadata) => {
     281            3424 :                 write!(
     282            3424 :                     f,
     283            3424 :                     "UploadLayer({}, size={:?}, gen={:?})",
     284            3424 :                     layer,
     285            3424 :                     metadata.file_size(),
     286            3424 :                     metadata.generation
     287            3424 :                 )
     288                 :             }
     289            1252 :             UploadOp::UploadMetadata(_, lsn) => {
     290            1252 :                 write!(f, "UploadMetadata(lsn: {})", lsn)
     291                 :             }
     292 UBC           0 :             UploadOp::Delete(delete) => {
     293               0 :                 write!(f, "Delete({} layers)", delete.layers.len())
     294                 :             }
     295               0 :             UploadOp::Barrier(_) => write!(f, "Barrier"),
     296               0 :             UploadOp::Shutdown => write!(f, "Shutdown"),
     297                 :         }
     298 CBC        4676 :     }
     299                 : }
        

Generated by: LCOV version 2.1-beta