LCOV - code coverage report
Current view: top level - pageserver/src/tenant - upload_queue.rs (source / functions) Coverage Total Hit
Test: 496e96cdfff2df79370229591d6427cda12fde29.info Lines: 59.8 % 132 79
Test Date: 2024-05-21 18:28:29 Functions: 27.3 % 11 3

            Line data    Source code
       1              : use super::storage_layer::LayerName;
       2              : use super::storage_layer::ResidentLayer;
       3              : use crate::tenant::metadata::TimelineMetadata;
       4              : use crate::tenant::remote_timeline_client::index::IndexPart;
       5              : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
       6              : use crate::tenant::remote_timeline_client::index::Lineage;
       7              : use std::collections::{HashMap, VecDeque};
       8              : use std::fmt::Debug;
       9              : 
      10              : use chrono::NaiveDateTime;
      11              : use pageserver_api::models::AuxFilePolicy;
      12              : use std::sync::Arc;
      13              : use tracing::info;
      14              : use utils::lsn::AtomicLsn;
      15              : 
      16              : use std::sync::atomic::AtomicU32;
      17              : use utils::lsn::Lsn;
      18              : 
      19              : #[cfg(feature = "testing")]
      20              : use utils::generation::Generation;
      21              : 
      22              : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
      23              : // memory for Uninitialized variants. Doesn't matter in practice, there are not
      24              : // that many upload queues in a running pageserver, and most of them are initialized
      25              : // anyway.
      26              : #[allow(clippy::large_enum_variant)]
      27              : pub(super) enum UploadQueue {
      28              :     Uninitialized,
      29              :     Initialized(UploadQueueInitialized),
      30              :     Stopped(UploadQueueStopped),
      31              : }
      32              : 
      33              : impl UploadQueue {
      34            0 :     pub fn as_str(&self) -> &'static str {
      35            0 :         match self {
      36            0 :             UploadQueue::Uninitialized => "Uninitialized",
      37            0 :             UploadQueue::Initialized(_) => "Initialized",
      38            0 :             UploadQueue::Stopped(_) => "Stopped",
      39              :         }
      40            0 :     }
      41              : }
      42              : 
      43              : /// This keeps track of queued and in-progress tasks.
      44              : pub(crate) struct UploadQueueInitialized {
      45              :     /// Counter to assign task IDs
      46              :     pub(crate) task_counter: u64,
      47              : 
      48              :     /// All layer files stored in the remote storage, taking into account all
      49              :     /// in-progress and queued operations
      50              :     pub(crate) latest_files: HashMap<LayerName, LayerFileMetadata>,
      51              : 
      52              :     /// How many file uploads or deletions been scheduled, since the
      53              :     /// last (scheduling of) metadata index upload?
      54              :     pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
      55              : 
      56              :     /// Metadata stored in the remote storage, taking into account all
      57              :     /// in-progress and queued operations.
      58              :     /// DANGER: do not return to outside world, e.g., safekeepers.
      59              :     pub(crate) latest_metadata: TimelineMetadata,
      60              : 
      61              :     /// Part of the flattened "next" `index_part.json`.
      62              :     pub(crate) latest_lineage: Lineage,
      63              : 
      64              :     /// The last aux file policy used on this timeline.
      65              :     pub(crate) last_aux_file_policy: Option<AuxFilePolicy>,
      66              : 
      67              :     /// `disk_consistent_lsn` from the last metadata file that was successfully
      68              :     /// uploaded. `Lsn(0)` if nothing was uploaded yet.
      69              :     /// Unlike `latest_files` or `latest_metadata`, this value is never ahead.
      70              :     /// Safekeeper can rely on it to make decisions for WAL storage.
      71              :     ///
      72              :     /// visible_remote_consistent_lsn is only updated after our generation has been validated with
      73              :     /// the control plane (unlesss a timeline's generation is None, in which case
      74              :     /// we skip validation)
      75              :     pub(crate) projected_remote_consistent_lsn: Option<Lsn>,
      76              :     pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
      77              : 
      78              :     // Breakdown of different kinds of tasks currently in-progress
      79              :     pub(crate) num_inprogress_layer_uploads: usize,
      80              :     pub(crate) num_inprogress_metadata_uploads: usize,
      81              :     pub(crate) num_inprogress_deletions: usize,
      82              : 
      83              :     /// Tasks that are currently in-progress. In-progress means that a tokio Task
      84              :     /// has been launched for it. An in-progress task can be busy uploading, but it can
      85              :     /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
      86              :     /// be waiting for retry in `exponential_backoff`.
      87              :     pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
      88              : 
      89              :     /// Queued operations that have not been launched yet. They might depend on previous
      90              :     /// tasks to finish. For example, metadata upload cannot be performed before all
      91              :     /// preceding layer file uploads have completed.
      92              :     pub(crate) queued_operations: VecDeque<UploadOp>,
      93              : 
      94              :     /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
      95              :     /// for error logging.
      96              :     ///
      97              :     /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
      98              :     /// bug causing leaks, then it's better to not leave this enabled for production builds.
      99              :     #[cfg(feature = "testing")]
     100              :     pub(crate) dangling_files: HashMap<LayerName, Generation>,
     101              : 
     102              :     /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
     103              :     pub(crate) shutting_down: bool,
     104              : 
     105              :     /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
     106              :     /// wait on until one of them stops the queue. The semaphore is closed when
     107              :     /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
     108              :     pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
     109              : }
     110              : 
     111              : impl UploadQueueInitialized {
     112            0 :     pub(super) fn no_pending_work(&self) -> bool {
     113            0 :         self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
     114            0 :     }
     115              : 
     116            0 :     pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
     117            0 :         self.visible_remote_consistent_lsn.load()
     118            0 :     }
     119              : 
     120            0 :     pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     121            0 :         self.projected_remote_consistent_lsn
     122            0 :     }
     123              : }
     124              : 
     125              : #[derive(Clone, Copy)]
     126              : pub(super) enum SetDeletedFlagProgress {
     127              :     NotRunning,
     128              :     InProgress(NaiveDateTime),
     129              :     Successful(NaiveDateTime),
     130              : }
     131              : 
     132              : pub(super) struct UploadQueueStoppedDeletable {
     133              :     pub(super) upload_queue_for_deletion: UploadQueueInitialized,
     134              :     pub(super) deleted_at: SetDeletedFlagProgress,
     135              : }
     136              : 
     137              : pub(super) enum UploadQueueStopped {
     138              :     Deletable(UploadQueueStoppedDeletable),
     139              :     Uninitialized,
     140              : }
     141              : 
     142            0 : #[derive(thiserror::Error, Debug)]
     143              : pub(crate) enum NotInitialized {
     144              :     #[error("queue is in state Uninitialized")]
     145              :     Uninitialized,
     146              :     #[error("queue is in state Stopped")]
     147              :     Stopped,
     148              :     #[error("queue is shutting down")]
     149              :     ShuttingDown,
     150              : }
     151              : 
     152              : impl NotInitialized {
     153            0 :     pub(crate) fn is_stopping(&self) -> bool {
     154            0 :         use NotInitialized::*;
     155            0 :         match self {
     156            0 :             Uninitialized => false,
     157            0 :             Stopped => true,
     158            0 :             ShuttingDown => true,
     159              :         }
     160            0 :     }
     161              : }
     162              : 
     163              : impl UploadQueue {
     164          346 :     pub(crate) fn initialize_empty_remote(
     165          346 :         &mut self,
     166          346 :         metadata: &TimelineMetadata,
     167          346 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     168          346 :         match self {
     169          346 :             UploadQueue::Uninitialized => (),
     170              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     171            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     172              :             }
     173              :         }
     174              : 
     175          346 :         info!("initializing upload queue for empty remote");
     176              : 
     177          346 :         let state = UploadQueueInitialized {
     178          346 :             // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
     179          346 :             latest_files: HashMap::new(),
     180          346 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     181          346 :             latest_metadata: metadata.clone(),
     182          346 :             latest_lineage: Lineage::default(),
     183          346 :             projected_remote_consistent_lsn: None,
     184          346 :             visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
     185          346 :             // what follows are boring default initializations
     186          346 :             task_counter: 0,
     187          346 :             num_inprogress_layer_uploads: 0,
     188          346 :             num_inprogress_metadata_uploads: 0,
     189          346 :             num_inprogress_deletions: 0,
     190          346 :             inprogress_tasks: HashMap::new(),
     191          346 :             queued_operations: VecDeque::new(),
     192          346 :             #[cfg(feature = "testing")]
     193          346 :             dangling_files: HashMap::new(),
     194          346 :             shutting_down: false,
     195          346 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     196          346 :             last_aux_file_policy: Default::default(),
     197          346 :         };
     198          346 : 
     199          346 :         *self = UploadQueue::Initialized(state);
     200          346 :         Ok(self.initialized_mut().expect("we just set it"))
     201          346 :     }
     202              : 
     203            6 :     pub(crate) fn initialize_with_current_remote_index_part(
     204            6 :         &mut self,
     205            6 :         index_part: &IndexPart,
     206            6 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     207            6 :         match self {
     208            6 :             UploadQueue::Uninitialized => (),
     209              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     210            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     211              :             }
     212              :         }
     213              : 
     214            6 :         let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
     215           22 :         for (layer_name, layer_metadata) in &index_part.layer_metadata {
     216           16 :             files.insert(
     217           16 :                 layer_name.to_owned(),
     218           16 :                 LayerFileMetadata::from(layer_metadata),
     219           16 :             );
     220           16 :         }
     221              : 
     222            6 :         info!(
     223            0 :             "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
     224            0 :             index_part.metadata.disk_consistent_lsn()
     225              :         );
     226              : 
     227            6 :         let state = UploadQueueInitialized {
     228            6 :             latest_files: files,
     229            6 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     230            6 :             latest_metadata: index_part.metadata.clone(),
     231            6 :             latest_lineage: index_part.lineage.clone(),
     232            6 :             projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
     233            6 :             visible_remote_consistent_lsn: Arc::new(
     234            6 :                 index_part.metadata.disk_consistent_lsn().into(),
     235            6 :             ),
     236            6 :             // what follows are boring default initializations
     237            6 :             task_counter: 0,
     238            6 :             num_inprogress_layer_uploads: 0,
     239            6 :             num_inprogress_metadata_uploads: 0,
     240            6 :             num_inprogress_deletions: 0,
     241            6 :             inprogress_tasks: HashMap::new(),
     242            6 :             queued_operations: VecDeque::new(),
     243            6 :             #[cfg(feature = "testing")]
     244            6 :             dangling_files: HashMap::new(),
     245            6 :             shutting_down: false,
     246            6 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     247            6 :             last_aux_file_policy: index_part.last_aux_file_policy(),
     248            6 :         };
     249            6 : 
     250            6 :         *self = UploadQueue::Initialized(state);
     251            6 :         Ok(self.initialized_mut().expect("we just set it"))
     252            6 :     }
     253              : 
     254         3549 :     pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
     255         3549 :         use UploadQueue::*;
     256         3549 :         match self {
     257            0 :             Uninitialized => Err(NotInitialized::Uninitialized.into()),
     258         3549 :             Initialized(x) => {
     259         3549 :                 if x.shutting_down {
     260            0 :                     Err(NotInitialized::ShuttingDown.into())
     261              :                 } else {
     262         3549 :                     Ok(x)
     263              :                 }
     264              :             }
     265            0 :             Stopped(_) => Err(NotInitialized::Stopped.into()),
     266              :         }
     267         3549 :     }
     268              : 
     269            0 :     pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
     270            0 :         match self {
     271              :             UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
     272            0 :                 anyhow::bail!("queue is in state {}", self.as_str())
     273              :             }
     274              :             UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
     275            0 :                 anyhow::bail!("queue is in state Stopped(Uninitialized)")
     276              :             }
     277            0 :             UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
     278              :         }
     279            0 :     }
     280              : }
     281              : 
     282              : /// An in-progress upload or delete task.
     283              : #[derive(Debug)]
     284              : pub(crate) struct UploadTask {
     285              :     /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
     286              :     pub(crate) task_id: u64,
     287              :     pub(crate) retries: AtomicU32,
     288              : 
     289              :     pub(crate) op: UploadOp,
     290              : }
     291              : 
     292              : /// A deletion of some layers within the lifetime of a timeline.  This is not used
     293              : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
     294              : #[derive(Debug)]
     295              : pub(crate) struct Delete {
     296              :     pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>,
     297              : }
     298              : 
     299              : #[derive(Debug)]
     300              : pub(crate) enum UploadOp {
     301              :     /// Upload a layer file
     302              :     UploadLayer(ResidentLayer, LayerFileMetadata),
     303              : 
     304              :     /// Upload the metadata file
     305              :     UploadMetadata(Box<IndexPart>, Lsn),
     306              : 
     307              :     /// Delete layer files
     308              :     Delete(Delete),
     309              : 
     310              :     /// Barrier. When the barrier operation is reached,
     311              :     Barrier(tokio::sync::watch::Sender<()>),
     312              : 
     313              :     /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
     314              :     /// this is the same as a Barrier.
     315              :     Shutdown,
     316              : }
     317              : 
     318              : impl std::fmt::Display for UploadOp {
     319            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     320            0 :         match self {
     321            0 :             UploadOp::UploadLayer(layer, metadata) => {
     322            0 :                 write!(
     323            0 :                     f,
     324            0 :                     "UploadLayer({}, size={:?}, gen={:?})",
     325            0 :                     layer,
     326            0 :                     metadata.file_size(),
     327            0 :                     metadata.generation
     328            0 :                 )
     329              :             }
     330            0 :             UploadOp::UploadMetadata(_, lsn) => {
     331            0 :                 write!(f, "UploadMetadata(lsn: {})", lsn)
     332              :             }
     333            0 :             UploadOp::Delete(delete) => {
     334            0 :                 write!(f, "Delete({} layers)", delete.layers.len())
     335              :             }
     336            0 :             UploadOp::Barrier(_) => write!(f, "Barrier"),
     337            0 :             UploadOp::Shutdown => write!(f, "Shutdown"),
     338              :         }
     339            0 :     }
     340              : }
        

Generated by: LCOV version 2.1-beta