LCOV - code coverage report
Current view: top level - pageserver/src/tenant - upload_queue.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 62.2 % 127 79
Test Date: 2024-11-25 17:48:16 Functions: 41.7 % 12 5

            Line data    Source code
       1              : use super::storage_layer::LayerName;
       2              : use super::storage_layer::ResidentLayer;
       3              : use crate::tenant::metadata::TimelineMetadata;
       4              : use crate::tenant::remote_timeline_client::index::IndexPart;
       5              : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
       6              : use std::collections::HashSet;
       7              : use std::collections::{HashMap, VecDeque};
       8              : use std::fmt::Debug;
       9              : 
      10              : use chrono::NaiveDateTime;
      11              : use std::sync::Arc;
      12              : use tracing::info;
      13              : use utils::lsn::AtomicLsn;
      14              : 
      15              : use std::sync::atomic::AtomicU32;
      16              : use utils::lsn::Lsn;
      17              : 
      18              : use utils::generation::Generation;
      19              : 
      20              : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
      21              : // memory for Uninitialized variants. Doesn't matter in practice, there are not
      22              : // that many upload queues in a running pageserver, and most of them are initialized
      23              : // anyway.
      24              : #[allow(clippy::large_enum_variant)]
      25              : pub(super) enum UploadQueue {
      26              :     Uninitialized,
      27              :     Initialized(UploadQueueInitialized),
      28              :     Stopped(UploadQueueStopped),
      29              : }
      30              : 
      31              : impl UploadQueue {
      32            0 :     pub fn as_str(&self) -> &'static str {
      33            0 :         match self {
      34            0 :             UploadQueue::Uninitialized => "Uninitialized",
      35            0 :             UploadQueue::Initialized(_) => "Initialized",
      36            0 :             UploadQueue::Stopped(_) => "Stopped",
      37              :         }
      38            0 :     }
      39              : }
      40              : 
      41              : #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
      42              : pub(crate) enum OpType {
      43              :     MayReorder,
      44              :     FlushDeletion,
      45              : }
      46              : 
      47              : /// This keeps track of queued and in-progress tasks.
      48              : pub(crate) struct UploadQueueInitialized {
      49              :     /// Counter to assign task IDs
      50              :     pub(crate) task_counter: u64,
      51              : 
      52              :     /// The next uploaded index_part.json; assumed to be dirty.
      53              :     ///
      54              :     /// Should not be read, directly except for layer file updates. Instead you should add a
      55              :     /// projected field.
      56              :     pub(crate) dirty: IndexPart,
      57              : 
      58              :     /// The latest remote persisted IndexPart.
      59              :     ///
      60              :     /// Each completed metadata upload will update this. The second item is the task_id which last
      61              :     /// updated the value, used to ensure we never store an older value over a newer one.
      62              :     pub(crate) clean: (IndexPart, Option<u64>),
      63              : 
      64              :     /// How many file uploads or deletions been scheduled, since the
      65              :     /// last (scheduling of) metadata index upload?
      66              :     pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
      67              : 
      68              :     /// The Lsn is only updated after our generation has been validated with
      69              :     /// the control plane (unlesss a timeline's generation is None, in which case
      70              :     /// we skip validation)
      71              :     pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
      72              : 
      73              :     // Breakdown of different kinds of tasks currently in-progress
      74              :     pub(crate) num_inprogress_layer_uploads: usize,
      75              :     pub(crate) num_inprogress_metadata_uploads: usize,
      76              :     pub(crate) num_inprogress_deletions: usize,
      77              : 
      78              :     /// Tasks that are currently in-progress. In-progress means that a tokio Task
      79              :     /// has been launched for it. An in-progress task can be busy uploading, but it can
      80              :     /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
      81              :     /// be waiting for retry in `exponential_backoff`.
      82              :     pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
      83              : 
      84              :     /// Queued operations that have not been launched yet. They might depend on previous
      85              :     /// tasks to finish. For example, metadata upload cannot be performed before all
      86              :     /// preceding layer file uploads have completed.
      87              :     pub(crate) queued_operations: VecDeque<UploadOp>,
      88              : 
      89              :     /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
      90              :     /// for error logging.
      91              :     ///
      92              :     /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
      93              :     /// bug causing leaks, then it's better to not leave this enabled for production builds.
      94              :     #[cfg(feature = "testing")]
      95              :     pub(crate) dangling_files: HashMap<LayerName, Generation>,
      96              : 
      97              :     /// Ensure we order file operations correctly.
      98              :     pub(crate) recently_deleted: HashSet<(LayerName, Generation)>,
      99              : 
     100              :     /// Deletions that are blocked by the tenant configuration
     101              :     pub(crate) blocked_deletions: Vec<Delete>,
     102              : 
     103              :     /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
     104              :     pub(crate) shutting_down: bool,
     105              : 
     106              :     /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
     107              :     /// wait on until one of them stops the queue. The semaphore is closed when
     108              :     /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
     109              :     pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
     110              : }
     111              : 
     112              : impl UploadQueueInitialized {
     113            8 :     pub(super) fn no_pending_work(&self) -> bool {
     114            8 :         self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
     115            8 :     }
     116              : 
     117            0 :     pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
     118            0 :         self.visible_remote_consistent_lsn.load()
     119            0 :     }
     120              : 
     121            0 :     pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     122            0 :         let lsn = self.clean.0.metadata.disk_consistent_lsn();
     123            0 :         self.clean.1.map(|_| lsn)
     124            0 :     }
     125              : }
     126              : 
     127              : #[derive(Clone, Copy)]
     128              : pub(super) enum SetDeletedFlagProgress {
     129              :     NotRunning,
     130              :     InProgress(NaiveDateTime),
     131              :     Successful(NaiveDateTime),
     132              : }
     133              : 
     134              : pub(super) struct UploadQueueStoppedDeletable {
     135              :     pub(super) upload_queue_for_deletion: UploadQueueInitialized,
     136              :     pub(super) deleted_at: SetDeletedFlagProgress,
     137              : }
     138              : 
     139              : pub(super) enum UploadQueueStopped {
     140              :     Deletable(UploadQueueStoppedDeletable),
     141              :     Uninitialized,
     142              : }
     143              : 
     144            0 : #[derive(thiserror::Error, Debug)]
     145              : pub enum NotInitialized {
     146              :     #[error("queue is in state Uninitialized")]
     147              :     Uninitialized,
     148              :     #[error("queue is in state Stopped")]
     149              :     Stopped,
     150              :     #[error("queue is shutting down")]
     151              :     ShuttingDown,
     152              : }
     153              : 
     154              : impl NotInitialized {
     155            0 :     pub(crate) fn is_stopping(&self) -> bool {
     156              :         use NotInitialized::*;
     157            0 :         match self {
     158            0 :             Uninitialized => false,
     159            0 :             Stopped => true,
     160            0 :             ShuttingDown => true,
     161              :         }
     162            0 :     }
     163              : }
     164              : 
     165              : impl UploadQueue {
     166          412 :     pub(crate) fn initialize_empty_remote(
     167          412 :         &mut self,
     168          412 :         metadata: &TimelineMetadata,
     169          412 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     170          412 :         match self {
     171          412 :             UploadQueue::Uninitialized => (),
     172              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     173            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     174              :             }
     175              :         }
     176              : 
     177          412 :         info!("initializing upload queue for empty remote");
     178              : 
     179          412 :         let index_part = IndexPart::empty(metadata.clone());
     180          412 : 
     181          412 :         let state = UploadQueueInitialized {
     182          412 :             dirty: index_part.clone(),
     183          412 :             clean: (index_part, None),
     184          412 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     185          412 :             visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
     186          412 :             // what follows are boring default initializations
     187          412 :             task_counter: 0,
     188          412 :             num_inprogress_layer_uploads: 0,
     189          412 :             num_inprogress_metadata_uploads: 0,
     190          412 :             num_inprogress_deletions: 0,
     191          412 :             inprogress_tasks: HashMap::new(),
     192          412 :             queued_operations: VecDeque::new(),
     193          412 :             #[cfg(feature = "testing")]
     194          412 :             dangling_files: HashMap::new(),
     195          412 :             recently_deleted: HashSet::new(),
     196          412 :             blocked_deletions: Vec::new(),
     197          412 :             shutting_down: false,
     198          412 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     199          412 :         };
     200          412 : 
     201          412 :         *self = UploadQueue::Initialized(state);
     202          412 :         Ok(self.initialized_mut().expect("we just set it"))
     203          412 :     }
     204              : 
     205            6 :     pub(crate) fn initialize_with_current_remote_index_part(
     206            6 :         &mut self,
     207            6 :         index_part: &IndexPart,
     208            6 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     209            6 :         match self {
     210            6 :             UploadQueue::Uninitialized => (),
     211              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     212            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     213              :             }
     214              :         }
     215              : 
     216            6 :         info!(
     217            0 :             "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
     218            0 :             index_part.metadata.disk_consistent_lsn()
     219              :         );
     220              : 
     221            6 :         let state = UploadQueueInitialized {
     222            6 :             dirty: index_part.clone(),
     223            6 :             clean: (index_part.clone(), None),
     224            6 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     225            6 :             visible_remote_consistent_lsn: Arc::new(
     226            6 :                 index_part.metadata.disk_consistent_lsn().into(),
     227            6 :             ),
     228            6 :             // what follows are boring default initializations
     229            6 :             task_counter: 0,
     230            6 :             num_inprogress_layer_uploads: 0,
     231            6 :             num_inprogress_metadata_uploads: 0,
     232            6 :             num_inprogress_deletions: 0,
     233            6 :             inprogress_tasks: HashMap::new(),
     234            6 :             queued_operations: VecDeque::new(),
     235            6 :             #[cfg(feature = "testing")]
     236            6 :             dangling_files: HashMap::new(),
     237            6 :             recently_deleted: HashSet::new(),
     238            6 :             blocked_deletions: Vec::new(),
     239            6 :             shutting_down: false,
     240            6 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     241            6 :         };
     242            6 : 
     243            6 :         *self = UploadQueue::Initialized(state);
     244            6 :         Ok(self.initialized_mut().expect("we just set it"))
     245            6 :     }
     246              : 
     247         5257 :     pub(crate) fn initialized_mut(
     248         5257 :         &mut self,
     249         5257 :     ) -> Result<&mut UploadQueueInitialized, NotInitialized> {
     250              :         use UploadQueue::*;
     251         5257 :         match self {
     252            0 :             Uninitialized => Err(NotInitialized::Uninitialized),
     253         5257 :             Initialized(x) => {
     254         5257 :                 if x.shutting_down {
     255            0 :                     Err(NotInitialized::ShuttingDown)
     256              :                 } else {
     257         5257 :                     Ok(x)
     258              :                 }
     259              :             }
     260            0 :             Stopped(_) => Err(NotInitialized::Stopped),
     261              :         }
     262         5257 :     }
     263              : 
     264            2 :     pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
     265            2 :         match self {
     266              :             UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
     267            0 :                 anyhow::bail!("queue is in state {}", self.as_str())
     268              :             }
     269              :             UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
     270            0 :                 anyhow::bail!("queue is in state Stopped(Uninitialized)")
     271              :             }
     272            2 :             UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
     273              :         }
     274            2 :     }
     275              : }
     276              : 
     277              : /// An in-progress upload or delete task.
     278              : #[derive(Debug)]
     279              : pub(crate) struct UploadTask {
     280              :     /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
     281              :     pub(crate) task_id: u64,
     282              :     pub(crate) retries: AtomicU32,
     283              : 
     284              :     pub(crate) op: UploadOp,
     285              : }
     286              : 
     287              : /// A deletion of some layers within the lifetime of a timeline.  This is not used
     288              : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
     289              : #[derive(Debug, Clone)]
     290              : pub(crate) struct Delete {
     291              :     pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>,
     292              : }
     293              : 
     294              : #[derive(Debug)]
     295              : pub(crate) enum UploadOp {
     296              :     /// Upload a layer file. The last field indicates the last operation for thie file.
     297              :     UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
     298              : 
     299              :     /// Upload a index_part.json file
     300              :     UploadMetadata {
     301              :         /// The next [`UploadQueueInitialized::clean`] after this upload succeeds.
     302              :         uploaded: Box<IndexPart>,
     303              :     },
     304              : 
     305              :     /// Delete layer files
     306              :     Delete(Delete),
     307              : 
     308              :     /// Barrier. When the barrier operation is reached, the channel is closed.
     309              :     Barrier(tokio::sync::watch::Sender<()>),
     310              : 
     311              :     /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
     312              :     /// this is the same as a Barrier.
     313              :     Shutdown,
     314              : }
     315              : 
     316              : impl std::fmt::Display for UploadOp {
     317            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     318            0 :         match self {
     319            0 :             UploadOp::UploadLayer(layer, metadata, mode) => {
     320            0 :                 write!(
     321            0 :                     f,
     322            0 :                     "UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
     323            0 :                     layer, metadata.file_size, metadata.generation, mode
     324            0 :                 )
     325              :             }
     326            0 :             UploadOp::UploadMetadata { uploaded, .. } => {
     327            0 :                 write!(
     328            0 :                     f,
     329            0 :                     "UploadMetadata(lsn: {})",
     330            0 :                     uploaded.metadata.disk_consistent_lsn()
     331            0 :                 )
     332              :             }
     333            0 :             UploadOp::Delete(delete) => {
     334            0 :                 write!(f, "Delete({} layers)", delete.layers.len())
     335              :             }
     336            0 :             UploadOp::Barrier(_) => write!(f, "Barrier"),
     337            0 :             UploadOp::Shutdown => write!(f, "Shutdown"),
     338              :         }
     339            0 :     }
     340              : }
        

Generated by: LCOV version 2.1-beta