LCOV - code coverage report
Current view: top level - pageserver/src/tenant - upload_queue.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 86.0 % 107 92
Test Date: 2023-09-06 10:18:01 Functions: 61.5 % 13 8

            Line data    Source code
       1              : use crate::metrics::RemoteOpFileKind;
       2              : 
       3              : use super::storage_layer::LayerFileName;
       4              : use super::Generation;
       5              : use crate::tenant::metadata::TimelineMetadata;
       6              : use crate::tenant::remote_timeline_client::index::IndexPart;
       7              : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
       8              : use std::collections::{HashMap, VecDeque};
       9              : use std::fmt::Debug;
      10              : 
      11              : use chrono::NaiveDateTime;
      12              : use std::sync::Arc;
      13              : use tracing::info;
      14              : 
      15              : use std::sync::atomic::AtomicU32;
      16              : use utils::lsn::Lsn;
      17              : 
      18              : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
      19              : // memory for Uninitialized variants. Doesn't matter in practice, there are not
      20              : // that many upload queues in a running pageserver, and most of them are initialized
      21              : // anyway.
      22              : #[allow(clippy::large_enum_variant)]
      23              : pub(super) enum UploadQueue {
      24              :     Uninitialized,
      25              :     Initialized(UploadQueueInitialized),
      26              :     Stopped(UploadQueueStopped),
      27              : }
      28              : 
      29              : impl UploadQueue {
      30            0 :     pub fn as_str(&self) -> &'static str {
      31            0 :         match self {
      32            0 :             UploadQueue::Uninitialized => "Uninitialized",
      33            0 :             UploadQueue::Initialized(_) => "Initialized",
      34            0 :             UploadQueue::Stopped(_) => "Stopped",
      35              :         }
      36            0 :     }
      37              : }
      38              : 
      39              : /// This keeps track of queued and in-progress tasks.
      40              : pub(crate) struct UploadQueueInitialized {
      41              :     /// Counter to assign task IDs
      42              :     pub(crate) task_counter: u64,
      43              : 
      44              :     /// All layer files stored in the remote storage, taking into account all
      45              :     /// in-progress and queued operations
      46              :     pub(crate) latest_files: HashMap<LayerFileName, LayerFileMetadata>,
      47              : 
      48              :     /// How many file uploads or deletions been scheduled, since the
      49              :     /// last (scheduling of) metadata index upload?
      50              :     pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
      51              : 
      52              :     /// Metadata stored in the remote storage, taking into account all
      53              :     /// in-progress and queued operations.
      54              :     /// DANGER: do not return to outside world, e.g., safekeepers.
      55              :     pub(crate) latest_metadata: TimelineMetadata,
      56              : 
      57              :     /// `disk_consistent_lsn` from the last metadata file that was successfully
      58              :     /// uploaded. `Lsn(0)` if nothing was uploaded yet.
      59              :     /// Unlike `latest_files` or `latest_metadata`, this value is never ahead.
      60              :     /// Safekeeper can rely on it to make decisions for WAL storage.
      61              :     pub(crate) last_uploaded_consistent_lsn: Lsn,
      62              : 
      63              :     // Breakdown of different kinds of tasks currently in-progress
      64              :     pub(crate) num_inprogress_layer_uploads: usize,
      65              :     pub(crate) num_inprogress_metadata_uploads: usize,
      66              :     pub(crate) num_inprogress_deletions: usize,
      67              : 
      68              :     /// Tasks that are currently in-progress. In-progress means that a tokio Task
      69              :     /// has been launched for it. An in-progress task can be busy uploading, but it can
      70              :     /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
      71              :     /// be waiting for retry in `exponential_backoff`.
      72              :     pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
      73              : 
      74              :     /// Queued operations that have not been launched yet. They might depend on previous
      75              :     /// tasks to finish. For example, metadata upload cannot be performed before all
      76              :     /// preceding layer file uploads have completed.
      77              :     pub(crate) queued_operations: VecDeque<UploadOp>,
      78              : }
      79              : 
      80              : impl UploadQueueInitialized {
      81          203 :     pub(super) fn no_pending_work(&self) -> bool {
      82          203 :         self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
      83          203 :     }
      84              : }
      85              : 
      86            0 : #[derive(Clone, Copy)]
      87              : pub(super) enum SetDeletedFlagProgress {
      88              :     NotRunning,
      89              :     InProgress(NaiveDateTime),
      90              :     Successful(NaiveDateTime),
      91              : }
      92              : 
      93              : pub(super) struct UploadQueueStopped {
      94              :     pub(super) upload_queue_for_deletion: UploadQueueInitialized,
      95              :     pub(super) deleted_at: SetDeletedFlagProgress,
      96              : }
      97              : 
      98              : impl UploadQueue {
      99          564 :     pub(crate) fn initialize_empty_remote(
     100          564 :         &mut self,
     101          564 :         metadata: &TimelineMetadata,
     102          564 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     103          564 :         match self {
     104          564 :             UploadQueue::Uninitialized => (),
     105              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     106            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     107              :             }
     108              :         }
     109              : 
     110          564 :         info!("initializing upload queue for empty remote");
     111              : 
     112          564 :         let state = UploadQueueInitialized {
     113          564 :             // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
     114          564 :             latest_files: HashMap::new(),
     115          564 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     116          564 :             latest_metadata: metadata.clone(),
     117          564 :             // We haven't uploaded anything yet, so, `last_uploaded_consistent_lsn` must be 0 to prevent
     118          564 :             // safekeepers from garbage-collecting anything.
     119          564 :             last_uploaded_consistent_lsn: Lsn(0),
     120          564 :             // what follows are boring default initializations
     121          564 :             task_counter: 0,
     122          564 :             num_inprogress_layer_uploads: 0,
     123          564 :             num_inprogress_metadata_uploads: 0,
     124          564 :             num_inprogress_deletions: 0,
     125          564 :             inprogress_tasks: HashMap::new(),
     126          564 :             queued_operations: VecDeque::new(),
     127          564 :         };
     128          564 : 
     129          564 :         *self = UploadQueue::Initialized(state);
     130          564 :         Ok(self.initialized_mut().expect("we just set it"))
     131          564 :     }
     132              : 
     133          199 :     pub(crate) fn initialize_with_current_remote_index_part(
     134          199 :         &mut self,
     135          199 :         index_part: &IndexPart,
     136          199 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     137          199 :         match self {
     138          199 :             UploadQueue::Uninitialized => (),
     139              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     140            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     141              :             }
     142              :         }
     143              : 
     144          199 :         let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
     145         6481 :         for (layer_name, layer_metadata) in &index_part.layer_metadata {
     146         6282 :             files.insert(
     147         6282 :                 layer_name.to_owned(),
     148         6282 :                 LayerFileMetadata::from(layer_metadata),
     149         6282 :             );
     150         6282 :         }
     151              : 
     152          199 :         info!(
     153          199 :             "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
     154          199 :             index_part.metadata.disk_consistent_lsn()
     155          199 :         );
     156              : 
     157          199 :         let state = UploadQueueInitialized {
     158          199 :             latest_files: files,
     159          199 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     160          199 :             latest_metadata: index_part.metadata.clone(),
     161          199 :             last_uploaded_consistent_lsn: index_part.metadata.disk_consistent_lsn(),
     162          199 :             // what follows are boring default initializations
     163          199 :             task_counter: 0,
     164          199 :             num_inprogress_layer_uploads: 0,
     165          199 :             num_inprogress_metadata_uploads: 0,
     166          199 :             num_inprogress_deletions: 0,
     167          199 :             inprogress_tasks: HashMap::new(),
     168          199 :             queued_operations: VecDeque::new(),
     169          199 :         };
     170          199 : 
     171          199 :         *self = UploadQueue::Initialized(state);
     172          199 :         Ok(self.initialized_mut().expect("we just set it"))
     173          199 :     }
     174              : 
     175        19325 :     pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
     176        19325 :         match self {
     177              :             UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
     178            0 :                 anyhow::bail!("queue is in state {}", self.as_str())
     179              :             }
     180        19325 :             UploadQueue::Initialized(x) => Ok(x),
     181              :         }
     182        19325 :     }
     183              : 
     184          614 :     pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> {
     185          614 :         match self {
     186              :             UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
     187            0 :                 anyhow::bail!("queue is in state {}", self.as_str())
     188              :             }
     189          614 :             UploadQueue::Stopped(stopped) => Ok(stopped),
     190              :         }
     191          614 :     }
     192              : }
     193              : 
     194              : /// An in-progress upload or delete task.
     195            0 : #[derive(Debug)]
     196              : pub(crate) struct UploadTask {
     197              :     /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
     198              :     pub(crate) task_id: u64,
     199              :     pub(crate) retries: AtomicU32,
     200              : 
     201              :     pub(crate) op: UploadOp,
     202              : }
     203              : 
     204            0 : #[derive(Debug)]
     205              : pub(crate) struct Delete {
     206              :     pub(crate) file_kind: RemoteOpFileKind,
     207              :     pub(crate) layer_file_name: LayerFileName,
     208              :     pub(crate) scheduled_from_timeline_delete: bool,
     209              :     pub(crate) generation: Generation,
     210              : }
     211              : 
     212            0 : #[derive(Debug)]
     213              : pub(crate) enum UploadOp {
     214              :     /// Upload a layer file
     215              :     UploadLayer(LayerFileName, LayerFileMetadata),
     216              : 
     217              :     /// Upload the metadata file
     218              :     UploadMetadata(IndexPart, Lsn),
     219              : 
     220              :     /// Delete a layer file
     221              :     Delete(Delete),
     222              : 
     223              :     /// Barrier. When the barrier operation is reached,
     224              :     Barrier(tokio::sync::watch::Sender<()>),
     225              : }
     226              : 
     227              : impl std::fmt::Display for UploadOp {
     228         9152 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     229         9152 :         match self {
     230         3818 :             UploadOp::UploadLayer(path, metadata) => {
     231         3818 :                 write!(
     232         3818 :                     f,
     233         3818 :                     "UploadLayer({}, size={:?}, gen={:?})",
     234         3818 :                     path.file_name(),
     235         3818 :                     metadata.file_size(),
     236         3818 :                     metadata.generation,
     237         3818 :                 )
     238              :             }
     239         1440 :             UploadOp::UploadMetadata(_, lsn) => {
     240         1440 :                 write!(f, "UploadMetadata(lsn: {})", lsn)
     241              :             }
     242         3894 :             UploadOp::Delete(delete) => write!(
     243         3894 :                 f,
     244         3894 :                 "Delete(path: {}, scheduled_from_timeline_delete: {}, gen: {:?})",
     245         3894 :                 delete.layer_file_name.file_name(),
     246         3894 :                 delete.scheduled_from_timeline_delete,
     247         3894 :                 delete.generation
     248         3894 :             ),
     249            0 :             UploadOp::Barrier(_) => write!(f, "Barrier"),
     250              :         }
     251         9152 :     }
     252              : }
        

Generated by: LCOV version 2.1-beta