Line data Source code
1 : use super::storage_layer::LayerName;
2 : use super::storage_layer::ResidentLayer;
3 : use crate::tenant::metadata::TimelineMetadata;
4 : use crate::tenant::remote_timeline_client::index::IndexPart;
5 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
6 : use crate::tenant::remote_timeline_client::index::Lineage;
7 : use std::collections::{HashMap, VecDeque};
8 : use std::fmt::Debug;
9 :
10 : use chrono::NaiveDateTime;
11 : use pageserver_api::models::AuxFilePolicy;
12 : use std::sync::Arc;
13 : use tracing::info;
14 : use utils::lsn::AtomicLsn;
15 :
16 : use std::sync::atomic::AtomicU32;
17 : use utils::lsn::Lsn;
18 :
19 : #[cfg(feature = "testing")]
20 : use utils::generation::Generation;
21 :
22 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
23 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
24 : // that many upload queues in a running pageserver, and most of them are initialized
25 : // anyway.
26 : #[allow(clippy::large_enum_variant)]
27 : pub(super) enum UploadQueue {
28 : Uninitialized,
29 : Initialized(UploadQueueInitialized),
30 : Stopped(UploadQueueStopped),
31 : }
32 :
33 : impl UploadQueue {
34 0 : pub fn as_str(&self) -> &'static str {
35 0 : match self {
36 0 : UploadQueue::Uninitialized => "Uninitialized",
37 0 : UploadQueue::Initialized(_) => "Initialized",
38 0 : UploadQueue::Stopped(_) => "Stopped",
39 : }
40 0 : }
41 : }
42 :
43 : /// This keeps track of queued and in-progress tasks.
44 : pub(crate) struct UploadQueueInitialized {
45 : /// Counter to assign task IDs
46 : pub(crate) task_counter: u64,
47 :
48 : /// All layer files stored in the remote storage, taking into account all
49 : /// in-progress and queued operations
50 : pub(crate) latest_files: HashMap<LayerName, LayerFileMetadata>,
51 :
52 : /// How many file uploads or deletions been scheduled, since the
53 : /// last (scheduling of) metadata index upload?
54 : pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
55 :
56 : /// Metadata stored in the remote storage, taking into account all
57 : /// in-progress and queued operations.
58 : /// DANGER: do not return to outside world, e.g., safekeepers.
59 : pub(crate) latest_metadata: TimelineMetadata,
60 :
61 : /// Part of the flattened "next" `index_part.json`.
62 : pub(crate) latest_lineage: Lineage,
63 :
64 : /// The last aux file policy used on this timeline.
65 : pub(crate) last_aux_file_policy: Option<AuxFilePolicy>,
66 :
67 : /// `disk_consistent_lsn` from the last metadata file that was successfully
68 : /// uploaded. `Lsn(0)` if nothing was uploaded yet.
69 : /// Unlike `latest_files` or `latest_metadata`, this value is never ahead.
70 : /// Safekeeper can rely on it to make decisions for WAL storage.
71 : ///
72 : /// visible_remote_consistent_lsn is only updated after our generation has been validated with
73 : /// the control plane (unlesss a timeline's generation is None, in which case
74 : /// we skip validation)
75 : pub(crate) projected_remote_consistent_lsn: Option<Lsn>,
76 : pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
77 :
78 : // Breakdown of different kinds of tasks currently in-progress
79 : pub(crate) num_inprogress_layer_uploads: usize,
80 : pub(crate) num_inprogress_metadata_uploads: usize,
81 : pub(crate) num_inprogress_deletions: usize,
82 :
83 : /// Tasks that are currently in-progress. In-progress means that a tokio Task
84 : /// has been launched for it. An in-progress task can be busy uploading, but it can
85 : /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
86 : /// be waiting for retry in `exponential_backoff`.
87 : pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
88 :
89 : /// Queued operations that have not been launched yet. They might depend on previous
90 : /// tasks to finish. For example, metadata upload cannot be performed before all
91 : /// preceding layer file uploads have completed.
92 : pub(crate) queued_operations: VecDeque<UploadOp>,
93 :
94 : /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
95 : /// for error logging.
96 : ///
97 : /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
98 : /// bug causing leaks, then it's better to not leave this enabled for production builds.
99 : #[cfg(feature = "testing")]
100 : pub(crate) dangling_files: HashMap<LayerName, Generation>,
101 :
102 : /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
103 : pub(crate) shutting_down: bool,
104 :
105 : /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
106 : /// wait on until one of them stops the queue. The semaphore is closed when
107 : /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
108 : pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
109 : }
110 :
111 : impl UploadQueueInitialized {
112 0 : pub(super) fn no_pending_work(&self) -> bool {
113 0 : self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
114 0 : }
115 :
116 0 : pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
117 0 : self.visible_remote_consistent_lsn.load()
118 0 : }
119 :
120 0 : pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
121 0 : self.projected_remote_consistent_lsn
122 0 : }
123 : }
124 :
125 : #[derive(Clone, Copy)]
126 : pub(super) enum SetDeletedFlagProgress {
127 : NotRunning,
128 : InProgress(NaiveDateTime),
129 : Successful(NaiveDateTime),
130 : }
131 :
132 : pub(super) struct UploadQueueStoppedDeletable {
133 : pub(super) upload_queue_for_deletion: UploadQueueInitialized,
134 : pub(super) deleted_at: SetDeletedFlagProgress,
135 : }
136 :
137 : pub(super) enum UploadQueueStopped {
138 : Deletable(UploadQueueStoppedDeletable),
139 : Uninitialized,
140 : }
141 :
142 0 : #[derive(thiserror::Error, Debug)]
143 : pub(crate) enum NotInitialized {
144 : #[error("queue is in state Uninitialized")]
145 : Uninitialized,
146 : #[error("queue is in state Stopped")]
147 : Stopped,
148 : #[error("queue is shutting down")]
149 : ShuttingDown,
150 : }
151 :
152 : impl NotInitialized {
153 0 : pub(crate) fn is_stopping(&self) -> bool {
154 0 : use NotInitialized::*;
155 0 : match self {
156 0 : Uninitialized => false,
157 0 : Stopped => true,
158 0 : ShuttingDown => true,
159 : }
160 0 : }
161 : }
162 :
163 : impl UploadQueue {
164 346 : pub(crate) fn initialize_empty_remote(
165 346 : &mut self,
166 346 : metadata: &TimelineMetadata,
167 346 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
168 346 : match self {
169 346 : UploadQueue::Uninitialized => (),
170 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
171 0 : anyhow::bail!("already initialized, state {}", self.as_str())
172 : }
173 : }
174 :
175 346 : info!("initializing upload queue for empty remote");
176 :
177 346 : let state = UploadQueueInitialized {
178 346 : // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
179 346 : latest_files: HashMap::new(),
180 346 : latest_files_changes_since_metadata_upload_scheduled: 0,
181 346 : latest_metadata: metadata.clone(),
182 346 : latest_lineage: Lineage::default(),
183 346 : projected_remote_consistent_lsn: None,
184 346 : visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
185 346 : // what follows are boring default initializations
186 346 : task_counter: 0,
187 346 : num_inprogress_layer_uploads: 0,
188 346 : num_inprogress_metadata_uploads: 0,
189 346 : num_inprogress_deletions: 0,
190 346 : inprogress_tasks: HashMap::new(),
191 346 : queued_operations: VecDeque::new(),
192 346 : #[cfg(feature = "testing")]
193 346 : dangling_files: HashMap::new(),
194 346 : shutting_down: false,
195 346 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
196 346 : last_aux_file_policy: Default::default(),
197 346 : };
198 346 :
199 346 : *self = UploadQueue::Initialized(state);
200 346 : Ok(self.initialized_mut().expect("we just set it"))
201 346 : }
202 :
203 6 : pub(crate) fn initialize_with_current_remote_index_part(
204 6 : &mut self,
205 6 : index_part: &IndexPart,
206 6 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
207 6 : match self {
208 6 : UploadQueue::Uninitialized => (),
209 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
210 0 : anyhow::bail!("already initialized, state {}", self.as_str())
211 : }
212 : }
213 :
214 6 : let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
215 22 : for (layer_name, layer_metadata) in &index_part.layer_metadata {
216 16 : files.insert(
217 16 : layer_name.to_owned(),
218 16 : LayerFileMetadata::from(layer_metadata),
219 16 : );
220 16 : }
221 :
222 6 : info!(
223 0 : "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
224 0 : index_part.metadata.disk_consistent_lsn()
225 : );
226 :
227 6 : let state = UploadQueueInitialized {
228 6 : latest_files: files,
229 6 : latest_files_changes_since_metadata_upload_scheduled: 0,
230 6 : latest_metadata: index_part.metadata.clone(),
231 6 : latest_lineage: index_part.lineage.clone(),
232 6 : projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
233 6 : visible_remote_consistent_lsn: Arc::new(
234 6 : index_part.metadata.disk_consistent_lsn().into(),
235 6 : ),
236 6 : // what follows are boring default initializations
237 6 : task_counter: 0,
238 6 : num_inprogress_layer_uploads: 0,
239 6 : num_inprogress_metadata_uploads: 0,
240 6 : num_inprogress_deletions: 0,
241 6 : inprogress_tasks: HashMap::new(),
242 6 : queued_operations: VecDeque::new(),
243 6 : #[cfg(feature = "testing")]
244 6 : dangling_files: HashMap::new(),
245 6 : shutting_down: false,
246 6 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
247 6 : last_aux_file_policy: index_part.last_aux_file_policy(),
248 6 : };
249 6 :
250 6 : *self = UploadQueue::Initialized(state);
251 6 : Ok(self.initialized_mut().expect("we just set it"))
252 6 : }
253 :
254 3549 : pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
255 3549 : use UploadQueue::*;
256 3549 : match self {
257 0 : Uninitialized => Err(NotInitialized::Uninitialized.into()),
258 3549 : Initialized(x) => {
259 3549 : if x.shutting_down {
260 0 : Err(NotInitialized::ShuttingDown.into())
261 : } else {
262 3549 : Ok(x)
263 : }
264 : }
265 0 : Stopped(_) => Err(NotInitialized::Stopped.into()),
266 : }
267 3549 : }
268 :
269 0 : pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
270 0 : match self {
271 : UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
272 0 : anyhow::bail!("queue is in state {}", self.as_str())
273 : }
274 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
275 0 : anyhow::bail!("queue is in state Stopped(Uninitialized)")
276 : }
277 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
278 : }
279 0 : }
280 : }
281 :
282 : /// An in-progress upload or delete task.
283 : #[derive(Debug)]
284 : pub(crate) struct UploadTask {
285 : /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
286 : pub(crate) task_id: u64,
287 : pub(crate) retries: AtomicU32,
288 :
289 : pub(crate) op: UploadOp,
290 : }
291 :
292 : /// A deletion of some layers within the lifetime of a timeline. This is not used
293 : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
294 : #[derive(Debug)]
295 : pub(crate) struct Delete {
296 : pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>,
297 : }
298 :
299 : #[derive(Debug)]
300 : pub(crate) enum UploadOp {
301 : /// Upload a layer file
302 : UploadLayer(ResidentLayer, LayerFileMetadata),
303 :
304 : /// Upload the metadata file
305 : UploadMetadata(Box<IndexPart>, Lsn),
306 :
307 : /// Delete layer files
308 : Delete(Delete),
309 :
310 : /// Barrier. When the barrier operation is reached,
311 : Barrier(tokio::sync::watch::Sender<()>),
312 :
313 : /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
314 : /// this is the same as a Barrier.
315 : Shutdown,
316 : }
317 :
318 : impl std::fmt::Display for UploadOp {
319 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
320 0 : match self {
321 0 : UploadOp::UploadLayer(layer, metadata) => {
322 0 : write!(
323 0 : f,
324 0 : "UploadLayer({}, size={:?}, gen={:?})",
325 0 : layer,
326 0 : metadata.file_size(),
327 0 : metadata.generation
328 0 : )
329 : }
330 0 : UploadOp::UploadMetadata(_, lsn) => {
331 0 : write!(f, "UploadMetadata(lsn: {})", lsn)
332 : }
333 0 : UploadOp::Delete(delete) => {
334 0 : write!(f, "Delete({} layers)", delete.layers.len())
335 : }
336 0 : UploadOp::Barrier(_) => write!(f, "Barrier"),
337 0 : UploadOp::Shutdown => write!(f, "Shutdown"),
338 : }
339 0 : }
340 : }
|