LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - upload_queue.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 84.7 % 118 100 18 100
Current Date: 2023-10-19 02:04:12 Functions: 68.8 % 16 11 5 11
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use super::storage_layer::LayerFileName;
       2                 : use super::Generation;
       3                 : use crate::tenant::metadata::TimelineMetadata;
       4                 : use crate::tenant::remote_timeline_client::index::IndexPart;
       5                 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
       6                 : use std::collections::{HashMap, VecDeque};
       7                 : use std::fmt::Debug;
       8                 : 
       9                 : use chrono::NaiveDateTime;
      10                 : use std::sync::Arc;
      11                 : use tracing::info;
      12                 : use utils::lsn::AtomicLsn;
      13                 : 
      14                 : use std::sync::atomic::AtomicU32;
      15                 : use utils::lsn::Lsn;
      16                 : 
      17                 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
      18                 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
      19                 : // that many upload queues in a running pageserver, and most of them are initialized
      20                 : // anyway.
      21                 : #[allow(clippy::large_enum_variant)]
      22                 : pub(super) enum UploadQueue {
      23                 :     Uninitialized,
      24                 :     Initialized(UploadQueueInitialized),
      25                 :     Stopped(UploadQueueStopped),
      26                 : }
      27                 : 
      28                 : impl UploadQueue {
      29 UBC           0 :     pub fn as_str(&self) -> &'static str {
      30               0 :         match self {
      31               0 :             UploadQueue::Uninitialized => "Uninitialized",
      32               0 :             UploadQueue::Initialized(_) => "Initialized",
      33               0 :             UploadQueue::Stopped(_) => "Stopped",
      34                 :         }
      35               0 :     }
      36                 : }
      37                 : 
      38                 : /// This keeps track of queued and in-progress tasks.
      39                 : pub(crate) struct UploadQueueInitialized {
      40                 :     /// Counter to assign task IDs
      41                 :     pub(crate) task_counter: u64,
      42                 : 
      43                 :     /// All layer files stored in the remote storage, taking into account all
      44                 :     /// in-progress and queued operations
      45                 :     pub(crate) latest_files: HashMap<LayerFileName, LayerFileMetadata>,
      46                 : 
      47                 :     /// How many file uploads or deletions been scheduled, since the
      48                 :     /// last (scheduling of) metadata index upload?
      49                 :     pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
      50                 : 
      51                 :     /// Metadata stored in the remote storage, taking into account all
      52                 :     /// in-progress and queued operations.
      53                 :     /// DANGER: do not return to outside world, e.g., safekeepers.
      54                 :     pub(crate) latest_metadata: TimelineMetadata,
      55                 : 
      56                 :     /// `disk_consistent_lsn` from the last metadata file that was successfully
      57                 :     /// uploaded. `Lsn(0)` if nothing was uploaded yet.
      58                 :     /// Unlike `latest_files` or `latest_metadata`, this value is never ahead.
      59                 :     /// Safekeeper can rely on it to make decisions for WAL storage.
      60                 :     ///
      61                 :     /// visible_remote_consistent_lsn is only updated after our generation has been validated with
      62                 :     /// the control plane (unlesss a timeline's generation is None, in which case
      63                 :     /// we skip validation)
      64                 :     pub(crate) projected_remote_consistent_lsn: Option<Lsn>,
      65                 :     pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
      66                 : 
      67                 :     // Breakdown of different kinds of tasks currently in-progress
      68                 :     pub(crate) num_inprogress_layer_uploads: usize,
      69                 :     pub(crate) num_inprogress_metadata_uploads: usize,
      70                 :     pub(crate) num_inprogress_deletions: usize,
      71                 : 
      72                 :     /// Tasks that are currently in-progress. In-progress means that a tokio Task
      73                 :     /// has been launched for it. An in-progress task can be busy uploading, but it can
      74                 :     /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
      75                 :     /// be waiting for retry in `exponential_backoff`.
      76                 :     pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
      77                 : 
      78                 :     /// Queued operations that have not been launched yet. They might depend on previous
      79                 :     /// tasks to finish. For example, metadata upload cannot be performed before all
      80                 :     /// preceding layer file uploads have completed.
      81                 :     pub(crate) queued_operations: VecDeque<UploadOp>,
      82                 : }
      83                 : 
      84                 : impl UploadQueueInitialized {
      85 CBC         214 :     pub(super) fn no_pending_work(&self) -> bool {
      86             214 :         self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
      87             214 :     }
      88                 : 
      89          779573 :     pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
      90          779573 :         self.visible_remote_consistent_lsn.load()
      91          779573 :     }
      92                 : 
      93            2671 :     pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
      94            2671 :         self.projected_remote_consistent_lsn
      95            2671 :     }
      96                 : }
      97                 : 
      98 UBC           0 : #[derive(Clone, Copy)]
      99                 : pub(super) enum SetDeletedFlagProgress {
     100                 :     NotRunning,
     101                 :     InProgress(NaiveDateTime),
     102                 :     Successful(NaiveDateTime),
     103                 : }
     104                 : 
     105                 : pub(super) struct UploadQueueStopped {
     106                 :     pub(super) upload_queue_for_deletion: UploadQueueInitialized,
     107                 :     pub(super) deleted_at: SetDeletedFlagProgress,
     108                 : }
     109                 : 
     110                 : impl UploadQueue {
     111 CBC         967 :     pub(crate) fn initialize_empty_remote(
     112             967 :         &mut self,
     113             967 :         metadata: &TimelineMetadata,
     114             967 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     115             967 :         match self {
     116             967 :             UploadQueue::Uninitialized => (),
     117                 :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     118 UBC           0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     119                 :             }
     120                 :         }
     121                 : 
     122 CBC         967 :         info!("initializing upload queue for empty remote");
     123                 : 
     124             967 :         let state = UploadQueueInitialized {
     125             967 :             // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
     126             967 :             latest_files: HashMap::new(),
     127             967 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     128             967 :             latest_metadata: metadata.clone(),
     129             967 :             projected_remote_consistent_lsn: None,
     130             967 :             visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
     131             967 :             // what follows are boring default initializations
     132             967 :             task_counter: 0,
     133             967 :             num_inprogress_layer_uploads: 0,
     134             967 :             num_inprogress_metadata_uploads: 0,
     135             967 :             num_inprogress_deletions: 0,
     136             967 :             inprogress_tasks: HashMap::new(),
     137             967 :             queued_operations: VecDeque::new(),
     138             967 :         };
     139             967 : 
     140             967 :         *self = UploadQueue::Initialized(state);
     141             967 :         Ok(self.initialized_mut().expect("we just set it"))
     142             967 :     }
     143                 : 
     144             335 :     pub(crate) fn initialize_with_current_remote_index_part(
     145             335 :         &mut self,
     146             335 :         index_part: &IndexPart,
     147             335 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     148             335 :         match self {
     149             335 :             UploadQueue::Uninitialized => (),
     150                 :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     151 UBC           0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     152                 :             }
     153                 :         }
     154                 : 
     155 CBC         335 :         let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
     156            9113 :         for (layer_name, layer_metadata) in &index_part.layer_metadata {
     157            8778 :             files.insert(
     158            8778 :                 layer_name.to_owned(),
     159            8778 :                 LayerFileMetadata::from(layer_metadata),
     160            8778 :             );
     161            8778 :         }
     162                 : 
     163             335 :         info!(
     164             335 :             "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
     165             335 :             index_part.metadata.disk_consistent_lsn()
     166             335 :         );
     167                 : 
     168             335 :         let state = UploadQueueInitialized {
     169             335 :             latest_files: files,
     170             335 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     171             335 :             latest_metadata: index_part.metadata.clone(),
     172             335 :             projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
     173             335 :             visible_remote_consistent_lsn: Arc::new(
     174             335 :                 index_part.metadata.disk_consistent_lsn().into(),
     175             335 :             ),
     176             335 :             // what follows are boring default initializations
     177             335 :             task_counter: 0,
     178             335 :             num_inprogress_layer_uploads: 0,
     179             335 :             num_inprogress_metadata_uploads: 0,
     180             335 :             num_inprogress_deletions: 0,
     181             335 :             inprogress_tasks: HashMap::new(),
     182             335 :             queued_operations: VecDeque::new(),
     183             335 :         };
     184             335 : 
     185             335 :         *self = UploadQueue::Initialized(state);
     186             335 :         Ok(self.initialized_mut().expect("we just set it"))
     187             335 :     }
     188                 : 
     189           30195 :     pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
     190           30195 :         match self {
     191                 :             UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
     192 UBC           0 :                 anyhow::bail!("queue is in state {}", self.as_str())
     193                 :             }
     194 CBC       30195 :             UploadQueue::Initialized(x) => Ok(x),
     195                 :         }
     196           30195 :     }
     197                 : 
     198             647 :     pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> {
     199             647 :         match self {
     200                 :             UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
     201 UBC           0 :                 anyhow::bail!("queue is in state {}", self.as_str())
     202                 :             }
     203 CBC         647 :             UploadQueue::Stopped(stopped) => Ok(stopped),
     204                 :         }
     205             647 :     }
     206                 : 
     207             591 :     pub(crate) fn get_layer_metadata(
     208             591 :         &self,
     209             591 :         name: &LayerFileName,
     210             591 :     ) -> anyhow::Result<Option<LayerFileMetadata>> {
     211             591 :         match self {
     212                 :             UploadQueue::Stopped(_) | UploadQueue::Uninitialized => {
     213 UBC           0 :                 anyhow::bail!("queue is in state {}", self.as_str())
     214                 :             }
     215 CBC         591 :             UploadQueue::Initialized(inner) => Ok(inner.latest_files.get(name).cloned()),
     216                 :         }
     217             591 :     }
     218                 : }
     219                 : 
     220                 : /// An in-progress upload or delete task.
     221 UBC           0 : #[derive(Debug)]
     222                 : pub(crate) struct UploadTask {
     223                 :     /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
     224                 :     pub(crate) task_id: u64,
     225                 :     pub(crate) retries: AtomicU32,
     226                 : 
     227                 :     pub(crate) op: UploadOp,
     228                 : }
     229                 : 
     230                 : /// A deletion of some layers within the lifetime of a timeline.  This is not used
     231                 : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
     232               0 : #[derive(Debug)]
     233                 : pub(crate) struct Delete {
     234                 :     pub(crate) layers: Vec<(LayerFileName, Generation)>,
     235                 : }
     236                 : 
     237               0 : #[derive(Debug)]
     238                 : pub(crate) enum UploadOp {
     239                 :     /// Upload a layer file
     240                 :     UploadLayer(LayerFileName, LayerFileMetadata),
     241                 : 
     242                 :     /// Upload the metadata file
     243                 :     UploadMetadata(IndexPart, Lsn),
     244                 : 
     245                 :     /// Delete layer files
     246                 :     Delete(Delete),
     247                 : 
     248                 :     /// Barrier. When the barrier operation is reached,
     249                 :     Barrier(tokio::sync::watch::Sender<()>),
     250                 : }
     251                 : 
     252                 : impl std::fmt::Display for UploadOp {
     253 CBC        5146 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     254            5146 :         match self {
     255            3701 :             UploadOp::UploadLayer(path, metadata) => {
     256            3701 :                 write!(
     257            3701 :                     f,
     258            3701 :                     "UploadLayer({}, size={:?}, gen={:?})",
     259            3701 :                     path.file_name(),
     260            3701 :                     metadata.file_size(),
     261            3701 :                     metadata.generation,
     262            3701 :                 )
     263                 :             }
     264            1445 :             UploadOp::UploadMetadata(_, lsn) => {
     265            1445 :                 write!(f, "UploadMetadata(lsn: {})", lsn)
     266                 :             }
     267 UBC           0 :             UploadOp::Delete(delete) => {
     268               0 :                 write!(f, "Delete({} layers)", delete.layers.len())
     269                 :             }
     270               0 :             UploadOp::Barrier(_) => write!(f, "Barrier"),
     271                 :         }
     272 CBC        5146 :     }
     273                 : }
        

Generated by: LCOV version 2.1-beta