LCOV - code coverage report
Current view: top level - pageserver/src/tenant - upload_queue.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 80.3 % 132 106
Test Date: 2024-02-07 07:37:29 Functions: 55.6 % 18 10

            Line data    Source code
       1              : use super::storage_layer::LayerFileName;
       2              : use super::storage_layer::ResidentLayer;
       3              : use crate::tenant::metadata::TimelineMetadata;
       4              : use crate::tenant::remote_timeline_client::index::IndexPart;
       5              : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
       6              : use std::collections::{HashMap, VecDeque};
       7              : use std::fmt::Debug;
       8              : 
       9              : use chrono::NaiveDateTime;
      10              : use std::sync::Arc;
      11              : use tracing::info;
      12              : use utils::lsn::AtomicLsn;
      13              : 
      14              : use std::sync::atomic::AtomicU32;
      15              : use utils::lsn::Lsn;
      16              : 
      17              : #[cfg(feature = "testing")]
      18              : use utils::generation::Generation;
      19              : 
      20              : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
      21              : // memory for Uninitialized variants. Doesn't matter in practice, there are not
      22              : // that many upload queues in a running pageserver, and most of them are initialized
      23              : // anyway.
      24              : #[allow(clippy::large_enum_variant)]
      25              : pub(super) enum UploadQueue {
      26              :     Uninitialized,
      27              :     Initialized(UploadQueueInitialized),
      28              :     Stopped(UploadQueueStopped),
      29              : }
      30              : 
      31              : impl UploadQueue {
      32            0 :     pub fn as_str(&self) -> &'static str {
      33            0 :         match self {
      34            0 :             UploadQueue::Uninitialized => "Uninitialized",
      35            0 :             UploadQueue::Initialized(_) => "Initialized",
      36            0 :             UploadQueue::Stopped(_) => "Stopped",
      37              :         }
      38            0 :     }
      39              : }
      40              : 
      41              : /// This keeps track of queued and in-progress tasks.
      42              : pub(crate) struct UploadQueueInitialized {
      43              :     /// Counter to assign task IDs
      44              :     pub(crate) task_counter: u64,
      45              : 
      46              :     /// All layer files stored in the remote storage, taking into account all
      47              :     /// in-progress and queued operations
      48              :     pub(crate) latest_files: HashMap<LayerFileName, LayerFileMetadata>,
      49              : 
      50              :     /// How many file uploads or deletions been scheduled, since the
      51              :     /// last (scheduling of) metadata index upload?
      52              :     pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
      53              : 
      54              :     /// Metadata stored in the remote storage, taking into account all
      55              :     /// in-progress and queued operations.
      56              :     /// DANGER: do not return to outside world, e.g., safekeepers.
      57              :     pub(crate) latest_metadata: TimelineMetadata,
      58              : 
      59              :     /// `disk_consistent_lsn` from the last metadata file that was successfully
      60              :     /// uploaded. `Lsn(0)` if nothing was uploaded yet.
      61              :     /// Unlike `latest_files` or `latest_metadata`, this value is never ahead.
      62              :     /// Safekeeper can rely on it to make decisions for WAL storage.
      63              :     ///
      64              :     /// visible_remote_consistent_lsn is only updated after our generation has been validated with
      65              :     /// the control plane (unlesss a timeline's generation is None, in which case
      66              :     /// we skip validation)
      67              :     pub(crate) projected_remote_consistent_lsn: Option<Lsn>,
      68              :     pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
      69              : 
      70              :     // Breakdown of different kinds of tasks currently in-progress
      71              :     pub(crate) num_inprogress_layer_uploads: usize,
      72              :     pub(crate) num_inprogress_metadata_uploads: usize,
      73              :     pub(crate) num_inprogress_deletions: usize,
      74              : 
      75              :     /// Tasks that are currently in-progress. In-progress means that a tokio Task
      76              :     /// has been launched for it. An in-progress task can be busy uploading, but it can
      77              :     /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
      78              :     /// be waiting for retry in `exponential_backoff`.
      79              :     pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
      80              : 
      81              :     /// Queued operations that have not been launched yet. They might depend on previous
      82              :     /// tasks to finish. For example, metadata upload cannot be performed before all
      83              :     /// preceding layer file uploads have completed.
      84              :     pub(crate) queued_operations: VecDeque<UploadOp>,
      85              : 
      86              :     /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
      87              :     /// for error logging.
      88              :     ///
      89              :     /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
      90              :     /// bug causing leaks, then it's better to not leave this enabled for production builds.
      91              :     #[cfg(feature = "testing")]
      92              :     pub(crate) dangling_files: HashMap<LayerFileName, Generation>,
      93              : 
      94              :     /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
      95              :     pub(crate) shutting_down: bool,
      96              : 
      97              :     /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
      98              :     /// wait on until one of them stops the queue. The semaphore is closed when
      99              :     /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
     100              :     pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
     101              : }
     102              : 
     103              : impl UploadQueueInitialized {
     104          189 :     pub(super) fn no_pending_work(&self) -> bool {
     105          189 :         self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
     106          189 :     }
     107              : 
     108       798512 :     pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
     109       798512 :         self.visible_remote_consistent_lsn.load()
     110       798512 :     }
     111              : 
     112         3042 :     pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     113         3042 :         self.projected_remote_consistent_lsn
     114         3042 :     }
     115              : }
     116              : 
     117            0 : #[derive(Clone, Copy)]
     118              : pub(super) enum SetDeletedFlagProgress {
     119              :     NotRunning,
     120              :     InProgress(NaiveDateTime),
     121              :     Successful(NaiveDateTime),
     122              : }
     123              : 
     124              : pub(super) struct UploadQueueStopped {
     125              :     pub(super) upload_queue_for_deletion: UploadQueueInitialized,
     126              :     pub(super) deleted_at: SetDeletedFlagProgress,
     127              : }
     128              : 
     129           14 : #[derive(thiserror::Error, Debug)]
     130              : pub(crate) enum NotInitialized {
     131              :     #[error("queue is in state Uninitialized")]
     132              :     Uninitialized,
     133              :     #[error("queue is in state Stopping")]
     134              :     Stopped,
     135              :     #[error("queue is shutting down")]
     136              :     ShuttingDown,
     137              : }
     138              : 
     139              : impl NotInitialized {
     140            0 :     pub(crate) fn is_stopping(&self) -> bool {
     141            0 :         use NotInitialized::*;
     142            0 :         match self {
     143            0 :             Uninitialized => false,
     144            0 :             Stopped => true,
     145            0 :             ShuttingDown => true,
     146              :         }
     147            0 :     }
     148              : }
     149              : 
     150              : impl UploadQueue {
     151         1144 :     pub(crate) fn initialize_empty_remote(
     152         1144 :         &mut self,
     153         1144 :         metadata: &TimelineMetadata,
     154         1144 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     155         1144 :         match self {
     156         1144 :             UploadQueue::Uninitialized => (),
     157              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     158            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     159              :             }
     160              :         }
     161              : 
     162         1144 :         info!("initializing upload queue for empty remote");
     163              : 
     164         1144 :         let state = UploadQueueInitialized {
     165         1144 :             // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
     166         1144 :             latest_files: HashMap::new(),
     167         1144 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     168         1144 :             latest_metadata: metadata.clone(),
     169         1144 :             projected_remote_consistent_lsn: None,
     170         1144 :             visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
     171         1144 :             // what follows are boring default initializations
     172         1144 :             task_counter: 0,
     173         1144 :             num_inprogress_layer_uploads: 0,
     174         1144 :             num_inprogress_metadata_uploads: 0,
     175         1144 :             num_inprogress_deletions: 0,
     176         1144 :             inprogress_tasks: HashMap::new(),
     177         1144 :             queued_operations: VecDeque::new(),
     178         1144 :             #[cfg(feature = "testing")]
     179         1144 :             dangling_files: HashMap::new(),
     180         1144 :             shutting_down: false,
     181         1144 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     182         1144 :         };
     183         1144 : 
     184         1144 :         *self = UploadQueue::Initialized(state);
     185         1144 :         Ok(self.initialized_mut().expect("we just set it"))
     186         1144 :     }
     187              : 
     188          424 :     pub(crate) fn initialize_with_current_remote_index_part(
     189          424 :         &mut self,
     190          424 :         index_part: &IndexPart,
     191          424 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     192          424 :         match self {
     193          424 :             UploadQueue::Uninitialized => (),
     194              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     195            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     196              :             }
     197              :         }
     198              : 
     199          424 :         let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
     200        57586 :         for (layer_name, layer_metadata) in &index_part.layer_metadata {
     201        57162 :             files.insert(
     202        57162 :                 layer_name.to_owned(),
     203        57162 :                 LayerFileMetadata::from(layer_metadata),
     204        57162 :             );
     205        57162 :         }
     206              : 
     207          424 :         info!(
     208          424 :             "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
     209          424 :             index_part.metadata.disk_consistent_lsn()
     210          424 :         );
     211              : 
     212          424 :         let state = UploadQueueInitialized {
     213          424 :             latest_files: files,
     214          424 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     215          424 :             latest_metadata: index_part.metadata.clone(),
     216          424 :             projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
     217          424 :             visible_remote_consistent_lsn: Arc::new(
     218          424 :                 index_part.metadata.disk_consistent_lsn().into(),
     219          424 :             ),
     220          424 :             // what follows are boring default initializations
     221          424 :             task_counter: 0,
     222          424 :             num_inprogress_layer_uploads: 0,
     223          424 :             num_inprogress_metadata_uploads: 0,
     224          424 :             num_inprogress_deletions: 0,
     225          424 :             inprogress_tasks: HashMap::new(),
     226          424 :             queued_operations: VecDeque::new(),
     227          424 :             #[cfg(feature = "testing")]
     228          424 :             dangling_files: HashMap::new(),
     229          424 :             shutting_down: false,
     230          424 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     231          424 :         };
     232          424 : 
     233          424 :         *self = UploadQueue::Initialized(state);
     234          424 :         Ok(self.initialized_mut().expect("we just set it"))
     235          424 :     }
     236              : 
     237        28221 :     pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
     238        28221 :         use UploadQueue::*;
     239        28221 :         match self {
     240            0 :             Uninitialized => Err(NotInitialized::Uninitialized.into()),
     241        28207 :             Initialized(x) => {
     242        28207 :                 if x.shutting_down {
     243            0 :                     Err(NotInitialized::ShuttingDown.into())
     244              :                 } else {
     245        28207 :                     Ok(x)
     246              :                 }
     247              :             }
     248           14 :             Stopped(_) => Err(NotInitialized::Stopped.into()),
     249              :         }
     250        28221 :     }
     251              : 
     252          569 :     pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> {
     253          569 :         match self {
     254              :             UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
     255            0 :                 anyhow::bail!("queue is in state {}", self.as_str())
     256              :             }
     257          569 :             UploadQueue::Stopped(stopped) => Ok(stopped),
     258              :         }
     259          569 :     }
     260              : }
     261              : 
     262              : /// An in-progress upload or delete task.
     263            0 : #[derive(Debug)]
     264              : pub(crate) struct UploadTask {
     265              :     /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
     266              :     pub(crate) task_id: u64,
     267              :     pub(crate) retries: AtomicU32,
     268              : 
     269              :     pub(crate) op: UploadOp,
     270              : }
     271              : 
     272              : /// A deletion of some layers within the lifetime of a timeline.  This is not used
     273              : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
     274            0 : #[derive(Debug)]
     275              : pub(crate) struct Delete {
     276              :     pub(crate) layers: Vec<(LayerFileName, LayerFileMetadata)>,
     277              : }
     278              : 
     279            0 : #[derive(Debug)]
     280              : pub(crate) enum UploadOp {
     281              :     /// Upload a layer file
     282              :     UploadLayer(ResidentLayer, LayerFileMetadata),
     283              : 
     284              :     /// Upload the metadata file
     285              :     UploadMetadata(IndexPart, Lsn),
     286              : 
     287              :     /// Delete layer files
     288              :     Delete(Delete),
     289              : 
     290              :     /// Barrier. When the barrier operation is reached,
     291              :     Barrier(tokio::sync::watch::Sender<()>),
     292              : 
     293              :     /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
     294              :     /// this is the same as a Barrier.
     295              :     Shutdown,
     296              : }
     297              : 
     298              : impl std::fmt::Display for UploadOp {
     299         4945 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     300         4945 :         match self {
     301         3641 :             UploadOp::UploadLayer(layer, metadata) => {
     302         3641 :                 write!(
     303         3641 :                     f,
     304         3641 :                     "UploadLayer({}, size={:?}, gen={:?})",
     305         3641 :                     layer,
     306         3641 :                     metadata.file_size(),
     307         3641 :                     metadata.generation
     308         3641 :                 )
     309              :             }
     310         1304 :             UploadOp::UploadMetadata(_, lsn) => {
     311         1304 :                 write!(f, "UploadMetadata(lsn: {})", lsn)
     312              :             }
     313            0 :             UploadOp::Delete(delete) => {
     314            0 :                 write!(f, "Delete({} layers)", delete.layers.len())
     315              :             }
     316            0 :             UploadOp::Barrier(_) => write!(f, "Barrier"),
     317            0 :             UploadOp::Shutdown => write!(f, "Shutdown"),
     318              :         }
     319         4945 :     }
     320              : }
        

Generated by: LCOV version 2.1-beta