LCOV - code coverage report
Current view: top level - pageserver/src/tenant - upload_queue.rs (source / functions) Coverage Total Hit
Test: 52d9d4a58355424a48c56cb9ba9670a073f618b9.info Lines: 61.6 % 125 77
Test Date: 2024-11-21 08:31:22 Functions: 41.7 % 12 5

            Line data    Source code
       1              : use super::storage_layer::LayerName;
       2              : use super::storage_layer::ResidentLayer;
       3              : use crate::tenant::metadata::TimelineMetadata;
       4              : use crate::tenant::remote_timeline_client::index::IndexPart;
       5              : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
       6              : use std::collections::{HashMap, VecDeque};
       7              : use std::fmt::Debug;
       8              : 
       9              : use chrono::NaiveDateTime;
      10              : use std::sync::Arc;
      11              : use tracing::info;
      12              : use utils::lsn::AtomicLsn;
      13              : 
      14              : use std::sync::atomic::AtomicU32;
      15              : use utils::lsn::Lsn;
      16              : 
      17              : #[cfg(feature = "testing")]
      18              : use utils::generation::Generation;
      19              : 
      20              : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
      21              : // memory for Uninitialized variants. Doesn't matter in practice, there are not
      22              : // that many upload queues in a running pageserver, and most of them are initialized
      23              : // anyway.
      24              : #[allow(clippy::large_enum_variant)]
      25              : pub(super) enum UploadQueue {
      26              :     Uninitialized,
      27              :     Initialized(UploadQueueInitialized),
      28              :     Stopped(UploadQueueStopped),
      29              : }
      30              : 
      31              : impl UploadQueue {
      32            0 :     pub fn as_str(&self) -> &'static str {
      33            0 :         match self {
      34            0 :             UploadQueue::Uninitialized => "Uninitialized",
      35            0 :             UploadQueue::Initialized(_) => "Initialized",
      36            0 :             UploadQueue::Stopped(_) => "Stopped",
      37              :         }
      38            0 :     }
      39              : }
      40              : 
      41              : /// This keeps track of queued and in-progress tasks.
      42              : pub(crate) struct UploadQueueInitialized {
      43              :     /// Counter to assign task IDs
      44              :     pub(crate) task_counter: u64,
      45              : 
      46              :     /// The next uploaded index_part.json; assumed to be dirty.
      47              :     ///
      48              :     /// Should not be read, directly except for layer file updates. Instead you should add a
      49              :     /// projected field.
      50              :     pub(crate) dirty: IndexPart,
      51              : 
      52              :     /// The latest remote persisted IndexPart.
      53              :     ///
      54              :     /// Each completed metadata upload will update this. The second item is the task_id which last
      55              :     /// updated the value, used to ensure we never store an older value over a newer one.
      56              :     pub(crate) clean: (IndexPart, Option<u64>),
      57              : 
      58              :     /// How many file uploads or deletions been scheduled, since the
      59              :     /// last (scheduling of) metadata index upload?
      60              :     pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
      61              : 
      62              :     /// The Lsn is only updated after our generation has been validated with
      63              :     /// the control plane (unlesss a timeline's generation is None, in which case
      64              :     /// we skip validation)
      65              :     pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
      66              : 
      67              :     // Breakdown of different kinds of tasks currently in-progress
      68              :     pub(crate) num_inprogress_layer_uploads: usize,
      69              :     pub(crate) num_inprogress_metadata_uploads: usize,
      70              :     pub(crate) num_inprogress_deletions: usize,
      71              : 
      72              :     /// Tasks that are currently in-progress. In-progress means that a tokio Task
      73              :     /// has been launched for it. An in-progress task can be busy uploading, but it can
      74              :     /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
      75              :     /// be waiting for retry in `exponential_backoff`.
      76              :     pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
      77              : 
      78              :     /// Queued operations that have not been launched yet. They might depend on previous
      79              :     /// tasks to finish. For example, metadata upload cannot be performed before all
      80              :     /// preceding layer file uploads have completed.
      81              :     pub(crate) queued_operations: VecDeque<UploadOp>,
      82              : 
      83              :     /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
      84              :     /// for error logging.
      85              :     ///
      86              :     /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
      87              :     /// bug causing leaks, then it's better to not leave this enabled for production builds.
      88              :     #[cfg(feature = "testing")]
      89              :     pub(crate) dangling_files: HashMap<LayerName, Generation>,
      90              : 
      91              :     /// Deletions that are blocked by the tenant configuration
      92              :     pub(crate) blocked_deletions: Vec<Delete>,
      93              : 
      94              :     /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
      95              :     pub(crate) shutting_down: bool,
      96              : 
      97              :     /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
      98              :     /// wait on until one of them stops the queue. The semaphore is closed when
      99              :     /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
     100              :     pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
     101              : }
     102              : 
     103              : impl UploadQueueInitialized {
     104            8 :     pub(super) fn no_pending_work(&self) -> bool {
     105            8 :         self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
     106            8 :     }
     107              : 
     108            0 :     pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
     109            0 :         self.visible_remote_consistent_lsn.load()
     110            0 :     }
     111              : 
     112            0 :     pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     113            0 :         let lsn = self.clean.0.metadata.disk_consistent_lsn();
     114            0 :         self.clean.1.map(|_| lsn)
     115            0 :     }
     116              : }
     117              : 
     118              : #[derive(Clone, Copy)]
     119              : pub(super) enum SetDeletedFlagProgress {
     120              :     NotRunning,
     121              :     InProgress(NaiveDateTime),
     122              :     Successful(NaiveDateTime),
     123              : }
     124              : 
     125              : pub(super) struct UploadQueueStoppedDeletable {
     126              :     pub(super) upload_queue_for_deletion: UploadQueueInitialized,
     127              :     pub(super) deleted_at: SetDeletedFlagProgress,
     128              : }
     129              : 
     130              : pub(super) enum UploadQueueStopped {
     131              :     Deletable(UploadQueueStoppedDeletable),
     132              :     Uninitialized,
     133              : }
     134              : 
     135            0 : #[derive(thiserror::Error, Debug)]
     136              : pub enum NotInitialized {
     137              :     #[error("queue is in state Uninitialized")]
     138              :     Uninitialized,
     139              :     #[error("queue is in state Stopped")]
     140              :     Stopped,
     141              :     #[error("queue is shutting down")]
     142              :     ShuttingDown,
     143              : }
     144              : 
     145              : impl NotInitialized {
     146            0 :     pub(crate) fn is_stopping(&self) -> bool {
     147              :         use NotInitialized::*;
     148            0 :         match self {
     149            0 :             Uninitialized => false,
     150            0 :             Stopped => true,
     151            0 :             ShuttingDown => true,
     152              :         }
     153            0 :     }
     154              : }
     155              : 
     156              : impl UploadQueue {
     157          412 :     pub(crate) fn initialize_empty_remote(
     158          412 :         &mut self,
     159          412 :         metadata: &TimelineMetadata,
     160          412 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     161          412 :         match self {
     162          412 :             UploadQueue::Uninitialized => (),
     163              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     164            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     165              :             }
     166              :         }
     167              : 
     168          412 :         info!("initializing upload queue for empty remote");
     169              : 
     170          412 :         let index_part = IndexPart::empty(metadata.clone());
     171          412 : 
     172          412 :         let state = UploadQueueInitialized {
     173          412 :             dirty: index_part.clone(),
     174          412 :             clean: (index_part, None),
     175          412 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     176          412 :             visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
     177          412 :             // what follows are boring default initializations
     178          412 :             task_counter: 0,
     179          412 :             num_inprogress_layer_uploads: 0,
     180          412 :             num_inprogress_metadata_uploads: 0,
     181          412 :             num_inprogress_deletions: 0,
     182          412 :             inprogress_tasks: HashMap::new(),
     183          412 :             queued_operations: VecDeque::new(),
     184          412 :             #[cfg(feature = "testing")]
     185          412 :             dangling_files: HashMap::new(),
     186          412 :             blocked_deletions: Vec::new(),
     187          412 :             shutting_down: false,
     188          412 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     189          412 :         };
     190          412 : 
     191          412 :         *self = UploadQueue::Initialized(state);
     192          412 :         Ok(self.initialized_mut().expect("we just set it"))
     193          412 :     }
     194              : 
     195            6 :     pub(crate) fn initialize_with_current_remote_index_part(
     196            6 :         &mut self,
     197            6 :         index_part: &IndexPart,
     198            6 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     199            6 :         match self {
     200            6 :             UploadQueue::Uninitialized => (),
     201              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     202            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     203              :             }
     204              :         }
     205              : 
     206            6 :         info!(
     207            0 :             "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
     208            0 :             index_part.metadata.disk_consistent_lsn()
     209              :         );
     210              : 
     211            6 :         let state = UploadQueueInitialized {
     212            6 :             dirty: index_part.clone(),
     213            6 :             clean: (index_part.clone(), None),
     214            6 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     215            6 :             visible_remote_consistent_lsn: Arc::new(
     216            6 :                 index_part.metadata.disk_consistent_lsn().into(),
     217            6 :             ),
     218            6 :             // what follows are boring default initializations
     219            6 :             task_counter: 0,
     220            6 :             num_inprogress_layer_uploads: 0,
     221            6 :             num_inprogress_metadata_uploads: 0,
     222            6 :             num_inprogress_deletions: 0,
     223            6 :             inprogress_tasks: HashMap::new(),
     224            6 :             queued_operations: VecDeque::new(),
     225            6 :             #[cfg(feature = "testing")]
     226            6 :             dangling_files: HashMap::new(),
     227            6 :             blocked_deletions: Vec::new(),
     228            6 :             shutting_down: false,
     229            6 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     230            6 :         };
     231            6 : 
     232            6 :         *self = UploadQueue::Initialized(state);
     233            6 :         Ok(self.initialized_mut().expect("we just set it"))
     234            6 :     }
     235              : 
     236         5254 :     pub(crate) fn initialized_mut(
     237         5254 :         &mut self,
     238         5254 :     ) -> Result<&mut UploadQueueInitialized, NotInitialized> {
     239              :         use UploadQueue::*;
     240         5254 :         match self {
     241            0 :             Uninitialized => Err(NotInitialized::Uninitialized),
     242         5254 :             Initialized(x) => {
     243         5254 :                 if x.shutting_down {
     244            0 :                     Err(NotInitialized::ShuttingDown)
     245              :                 } else {
     246         5254 :                     Ok(x)
     247              :                 }
     248              :             }
     249            0 :             Stopped(_) => Err(NotInitialized::Stopped),
     250              :         }
     251         5254 :     }
     252              : 
     253            2 :     pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
     254            2 :         match self {
     255              :             UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
     256            0 :                 anyhow::bail!("queue is in state {}", self.as_str())
     257              :             }
     258              :             UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
     259            0 :                 anyhow::bail!("queue is in state Stopped(Uninitialized)")
     260              :             }
     261            2 :             UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
     262              :         }
     263            2 :     }
     264              : }
     265              : 
     266              : /// An in-progress upload or delete task.
     267              : #[derive(Debug)]
     268              : pub(crate) struct UploadTask {
     269              :     /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
     270              :     pub(crate) task_id: u64,
     271              :     pub(crate) retries: AtomicU32,
     272              : 
     273              :     pub(crate) op: UploadOp,
     274              : }
     275              : 
     276              : /// A deletion of some layers within the lifetime of a timeline.  This is not used
     277              : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
     278              : #[derive(Debug, Clone)]
     279              : pub(crate) struct Delete {
     280              :     pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>,
     281              : }
     282              : 
     283              : #[derive(Debug)]
     284              : pub(crate) enum UploadOp {
     285              :     /// Upload a layer file
     286              :     UploadLayer(ResidentLayer, LayerFileMetadata),
     287              : 
     288              :     /// Upload a index_part.json file
     289              :     UploadMetadata {
     290              :         /// The next [`UploadQueueInitialized::clean`] after this upload succeeds.
     291              :         uploaded: Box<IndexPart>,
     292              :     },
     293              : 
     294              :     /// Delete layer files
     295              :     Delete(Delete),
     296              : 
     297              :     /// Barrier. When the barrier operation is reached, the channel is closed.
     298              :     Barrier(tokio::sync::watch::Sender<()>),
     299              : 
     300              :     /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
     301              :     /// this is the same as a Barrier.
     302              :     Shutdown,
     303              : }
     304              : 
     305              : impl std::fmt::Display for UploadOp {
     306            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     307            0 :         match self {
     308            0 :             UploadOp::UploadLayer(layer, metadata) => {
     309            0 :                 write!(
     310            0 :                     f,
     311            0 :                     "UploadLayer({}, size={:?}, gen={:?})",
     312            0 :                     layer, metadata.file_size, metadata.generation
     313            0 :                 )
     314              :             }
     315            0 :             UploadOp::UploadMetadata { uploaded, .. } => {
     316            0 :                 write!(
     317            0 :                     f,
     318            0 :                     "UploadMetadata(lsn: {})",
     319            0 :                     uploaded.metadata.disk_consistent_lsn()
     320            0 :                 )
     321              :             }
     322            0 :             UploadOp::Delete(delete) => {
     323            0 :                 write!(f, "Delete({} layers)", delete.layers.len())
     324              :             }
     325            0 :             UploadOp::Barrier(_) => write!(f, "Barrier"),
     326            0 :             UploadOp::Shutdown => write!(f, "Shutdown"),
     327              :         }
     328            0 :     }
     329              : }
        

Generated by: LCOV version 2.1-beta