Line data Source code
1 : use std::collections::{HashMap, HashSet, VecDeque};
2 : use std::fmt::Debug;
3 : use std::sync::Arc;
4 : use std::sync::atomic::AtomicU32;
5 :
6 : use chrono::NaiveDateTime;
7 : use once_cell::sync::Lazy;
8 : use tracing::info;
9 : use utils::generation::Generation;
10 : use utils::lsn::{AtomicLsn, Lsn};
11 :
12 : use super::remote_timeline_client::is_same_remote_layer_path;
13 : use super::storage_layer::{AsLayerDesc as _, LayerName, ResidentLayer};
14 : use crate::tenant::metadata::TimelineMetadata;
15 : use crate::tenant::remote_timeline_client::index::{IndexPart, LayerFileMetadata};
16 :
17 : /// Kill switch for upload queue reordering in case it causes problems.
18 : /// TODO: remove this once we have confidence in it.
19 : static DISABLE_UPLOAD_QUEUE_REORDERING: Lazy<bool> =
20 1229 : Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_REORDERING").as_deref() == Ok("true"));
21 :
22 : /// Kill switch for index upload coalescing in case it causes problems.
23 : /// TODO: remove this once we have confidence in it.
24 : static DISABLE_UPLOAD_QUEUE_INDEX_COALESCING: Lazy<bool> =
25 72 : Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_INDEX_COALESCING").as_deref() == Ok("true"));
26 :
27 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
28 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
29 : // that many upload queues in a running pageserver, and most of them are initialized
30 : // anyway.
31 : #[allow(clippy::large_enum_variant)]
32 : pub enum UploadQueue {
33 : Uninitialized,
34 : Initialized(UploadQueueInitialized),
35 : Stopped(UploadQueueStopped),
36 : }
37 :
38 : impl UploadQueue {
39 0 : pub fn as_str(&self) -> &'static str {
40 0 : match self {
41 0 : UploadQueue::Uninitialized => "Uninitialized",
42 0 : UploadQueue::Initialized(_) => "Initialized",
43 0 : UploadQueue::Stopped(_) => "Stopped",
44 : }
45 0 : }
46 : }
47 :
48 : #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
49 : pub enum OpType {
50 : MayReorder,
51 : FlushDeletion,
52 : }
53 :
54 : /// This keeps track of queued and in-progress tasks.
55 : pub struct UploadQueueInitialized {
56 : /// Maximum number of inprogress tasks to schedule. 0 is no limit.
57 : pub(crate) inprogress_limit: usize,
58 :
59 : /// Counter to assign task IDs
60 : pub(crate) task_counter: u64,
61 :
62 : /// The next uploaded index_part.json; assumed to be dirty.
63 : ///
64 : /// Should not be read, directly except for layer file updates. Instead you should add a
65 : /// projected field.
66 : pub(crate) dirty: IndexPart,
67 :
68 : /// The latest remote persisted IndexPart.
69 : ///
70 : /// Each completed metadata upload will update this. The second item is the task_id which last
71 : /// updated the value, used to ensure we never store an older value over a newer one.
72 : pub(crate) clean: (IndexPart, Option<u64>),
73 :
74 : /// How many file uploads or deletions been scheduled, since the
75 : /// last (scheduling of) metadata index upload?
76 : pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
77 :
78 : /// The Lsn is only updated after our generation has been validated with
79 : /// the control plane (unlesss a timeline's generation is None, in which case
80 : /// we skip validation)
81 : pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
82 :
83 : /// Tasks that are currently in-progress. In-progress means that a tokio Task
84 : /// has been launched for it. An in-progress task can be busy uploading, but it can
85 : /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
86 : /// be waiting for retry in `exponential_backoff`.
87 : pub inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
88 :
89 : /// Queued operations that have not been launched yet. They might depend on previous
90 : /// tasks to finish. For example, metadata upload cannot be performed before all
91 : /// preceding layer file uploads have completed.
92 : pub queued_operations: VecDeque<UploadOp>,
93 :
94 : /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
95 : /// for error logging.
96 : ///
97 : /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
98 : /// bug causing leaks, then it's better to not leave this enabled for production builds.
99 : #[cfg(feature = "testing")]
100 : pub(crate) dangling_files: HashMap<LayerName, Generation>,
101 :
102 : /// Ensure we order file operations correctly.
103 : pub(crate) recently_deleted: HashSet<(LayerName, Generation)>,
104 :
105 : /// Deletions that are blocked by the tenant configuration
106 : pub(crate) blocked_deletions: Vec<Delete>,
107 :
108 : /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
109 : pub(crate) shutting_down: bool,
110 :
111 : /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
112 : /// wait on until one of them stops the queue. The semaphore is closed when
113 : /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
114 : pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
115 : }
116 :
117 : impl UploadQueueInitialized {
118 48 : pub(super) fn no_pending_work(&self) -> bool {
119 48 : self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
120 48 : }
121 :
122 0 : pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
123 0 : self.visible_remote_consistent_lsn.load()
124 0 : }
125 :
126 0 : pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
127 0 : let lsn = self.clean.0.metadata.disk_consistent_lsn();
128 0 : self.clean.1.map(|_| lsn)
129 0 : }
130 :
131 : /// Returns and removes the next ready operation from the queue, if any. This isn't necessarily
132 : /// the first operation in the queue, to avoid head-of-line blocking -- an operation can jump
133 : /// the queue if it doesn't conflict with operations ahead of it.
134 : ///
135 : /// Also returns any operations that were coalesced into this one, e.g. multiple index uploads.
136 : ///
137 : /// None may be returned even if the queue isn't empty, if no operations are ready yet.
138 : ///
139 : /// NB: this is quadratic, but queues are expected to be small, and bounded by inprogress_limit.
140 66934 : pub fn next_ready(&mut self) -> Option<(UploadOp, Vec<UploadOp>)> {
141 66934 : // If inprogress_tasks is already at limit, don't schedule anything more.
142 66934 : if self.inprogress_limit > 0 && self.inprogress_tasks.len() >= self.inprogress_limit {
143 36 : return None;
144 66898 : }
145 :
146 131619 : for (i, candidate) in self.queued_operations.iter().enumerate() {
147 : // If this candidate is ready, go for it. Otherwise, try the next one.
148 131619 : if self.is_ready(i) {
149 : // Shutdown operations are left at the head of the queue, to prevent further
150 : // operations from starting. Signal that we're ready to shut down.
151 23540 : if matches!(candidate, UploadOp::Shutdown) {
152 60 : assert!(self.inprogress_tasks.is_empty(), "shutdown with tasks");
153 60 : assert_eq!(i, 0, "shutdown not at head of queue");
154 60 : self.shutdown_ready.close();
155 60 : return None;
156 23480 : }
157 23480 :
158 23480 : let mut op = self.queued_operations.remove(i).expect("i can't disappear");
159 23480 :
160 23480 : // Coalesce any back-to-back index uploads by only uploading the newest one that's
161 23480 : // ready. This typically happens with layer/index/layer/index/... sequences, where
162 23480 : // the layers bypass the indexes, leaving the indexes queued.
163 23480 : //
164 23480 : // If other operations are interleaved between index uploads we don't try to
165 23480 : // coalesce them, since we may as well update the index concurrently with them.
166 23480 : // This keeps the index fresh and avoids starvation.
167 23480 : //
168 23480 : // NB: we assume that all uploaded indexes have the same remote path. This
169 23480 : // is true at the time of writing: the path only depends on the tenant,
170 23480 : // timeline and generation, all of which are static for a timeline instance.
171 23480 : // Otherwise, we must be careful not to coalesce different paths.
172 23480 : let mut coalesced_ops = Vec::new();
173 23480 : if matches!(op, UploadOp::UploadMetadata { .. }) {
174 9235 : while let Some(UploadOp::UploadMetadata { .. }) = self.queued_operations.get(i)
175 : {
176 208 : if *DISABLE_UPLOAD_QUEUE_INDEX_COALESCING {
177 0 : break;
178 208 : }
179 208 : if !self.is_ready(i) {
180 142 : break;
181 66 : }
182 66 : coalesced_ops.push(op);
183 66 : op = self.queued_operations.remove(i).expect("i can't disappear");
184 : }
185 14311 : }
186 :
187 23480 : return Some((op, coalesced_ops));
188 108079 : }
189 :
190 : // Nothing can bypass a barrier or shutdown. If it wasn't scheduled above, give up.
191 108079 : if matches!(candidate, UploadOp::Barrier(_) | UploadOp::Shutdown) {
192 2728 : return None;
193 105351 : }
194 105351 :
195 105351 : // If upload queue reordering is disabled, bail out after the first operation.
196 105351 : if *DISABLE_UPLOAD_QUEUE_REORDERING {
197 0 : return None;
198 105351 : }
199 : }
200 40630 : None
201 66934 : }
202 :
203 : /// Returns true if the queued operation at the given position is ready to be uploaded, i.e. if
204 : /// it doesn't conflict with any in-progress or queued operations ahead of it. Operations are
205 : /// allowed to skip the queue when it's safe to do so, to increase parallelism.
206 : ///
207 : /// The position must be valid for the queue size.
208 131827 : fn is_ready(&self, pos: usize) -> bool {
209 131827 : let candidate = self.queued_operations.get(pos).expect("invalid position");
210 131827 : self
211 131827 : // Look at in-progress operations, in random order.
212 131827 : .inprogress_tasks
213 131827 : .values()
214 6987629 : .map(|task| &task.op)
215 131827 : // Then queued operations ahead of the candidate, front-to-back.
216 131827 : .chain(self.queued_operations.iter().take(pos))
217 131827 : // Keep track of the active index ahead of each operation. This is used to ensure that
218 131827 : // an upload doesn't skip the queue too far, such that it modifies a layer that's
219 131827 : // referenced by an active index.
220 131827 : //
221 131827 : // It's okay that in-progress operations are emitted in random order above, since at
222 131827 : // most one of them can be an index upload (enforced by can_bypass).
223 7072965 : .scan(&self.clean.0, |next_active_index, op| {
224 7072965 : let active_index = *next_active_index;
225 7072965 : if let UploadOp::UploadMetadata { uploaded } = op {
226 93073 : *next_active_index = uploaded; // stash index for next operation after this
227 6979892 : }
228 7072965 : Some((op, active_index))
229 7072965 : })
230 131827 : // Check if the candidate can bypass all of them.
231 7072965 : .all(|(op, active_index)| candidate.can_bypass(op, active_index))
232 131827 : }
233 :
234 : /// Returns the number of in-progress deletion operations.
235 : #[cfg(test)]
236 12 : pub(crate) fn num_inprogress_deletions(&self) -> usize {
237 12 : self.inprogress_tasks
238 12 : .iter()
239 12 : .filter(|(_, t)| matches!(t.op, UploadOp::Delete(_)))
240 12 : .count()
241 12 : }
242 :
243 : /// Returns the number of in-progress layer uploads.
244 : #[cfg(test)]
245 24 : pub(crate) fn num_inprogress_layer_uploads(&self) -> usize {
246 24 : self.inprogress_tasks
247 24 : .iter()
248 36 : .filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _)))
249 24 : .count()
250 24 : }
251 :
252 : /// Test helper that schedules all ready operations into inprogress_tasks, and returns
253 : /// references to them.
254 : ///
255 : /// TODO: the corresponding production logic should be moved from RemoteTimelineClient into
256 : /// UploadQueue, so we can use the same code path.
257 : #[cfg(test)]
258 468 : fn schedule_ready(&mut self) -> Vec<Arc<UploadTask>> {
259 468 : let mut tasks = Vec::new();
260 : // NB: schedule operations one by one, to handle conflicts with inprogress_tasks.
261 1032 : while let Some((op, coalesced_ops)) = self.next_ready() {
262 564 : self.task_counter += 1;
263 564 : let task = Arc::new(UploadTask {
264 564 : task_id: self.task_counter,
265 564 : op,
266 564 : coalesced_ops,
267 564 : retries: 0.into(),
268 564 : });
269 564 : self.inprogress_tasks.insert(task.task_id, task.clone());
270 564 : tasks.push(task);
271 564 : }
272 468 : tasks
273 468 : }
274 :
275 : /// Test helper that marks an operation as completed, removing it from inprogress_tasks.
276 : ///
277 : /// TODO: the corresponding production logic should be moved from RemoteTimelineClient into
278 : /// UploadQueue, so we can use the same code path.
279 : #[cfg(test)]
280 348 : fn complete(&mut self, task_id: u64) {
281 348 : let Some(task) = self.inprogress_tasks.remove(&task_id) else {
282 0 : return;
283 : };
284 : // Update the clean index on uploads.
285 348 : if let UploadOp::UploadMetadata { ref uploaded } = task.op {
286 96 : if task.task_id > self.clean.1.unwrap_or_default() {
287 96 : self.clean = (*uploaded.clone(), Some(task.task_id));
288 96 : }
289 252 : }
290 348 : }
291 : }
292 :
293 : #[derive(Clone, Copy)]
294 : pub(super) enum SetDeletedFlagProgress {
295 : NotRunning,
296 : InProgress(NaiveDateTime),
297 : Successful(NaiveDateTime),
298 : }
299 :
300 : pub struct UploadQueueStoppedDeletable {
301 : pub(super) upload_queue_for_deletion: UploadQueueInitialized,
302 : pub(super) deleted_at: SetDeletedFlagProgress,
303 : }
304 :
305 : #[allow(clippy::large_enum_variant, reason = "TODO")]
306 : pub enum UploadQueueStopped {
307 : Deletable(UploadQueueStoppedDeletable),
308 : Uninitialized,
309 : }
310 :
311 : #[derive(thiserror::Error, Debug)]
312 : pub enum NotInitialized {
313 : #[error("queue is in state Uninitialized")]
314 : Uninitialized,
315 : #[error("queue is in state Stopped")]
316 : Stopped,
317 : #[error("queue is shutting down")]
318 : ShuttingDown,
319 : }
320 :
321 : impl NotInitialized {
322 0 : pub(crate) fn is_stopping(&self) -> bool {
323 : use NotInitialized::*;
324 0 : match self {
325 0 : Uninitialized => false,
326 0 : Stopped => true,
327 0 : ShuttingDown => true,
328 : }
329 0 : }
330 : }
331 :
332 : impl UploadQueue {
333 2796 : pub fn initialize_empty_remote(
334 2796 : &mut self,
335 2796 : metadata: &TimelineMetadata,
336 2796 : inprogress_limit: usize,
337 2796 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
338 2796 : match self {
339 2796 : UploadQueue::Uninitialized => (),
340 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
341 0 : anyhow::bail!("already initialized, state {}", self.as_str())
342 : }
343 : }
344 :
345 2796 : info!("initializing upload queue for empty remote");
346 :
347 2796 : let index_part = IndexPart::empty(metadata.clone());
348 2796 :
349 2796 : let state = UploadQueueInitialized {
350 2796 : inprogress_limit,
351 2796 : dirty: index_part.clone(),
352 2796 : clean: (index_part, None),
353 2796 : latest_files_changes_since_metadata_upload_scheduled: 0,
354 2796 : visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
355 2796 : // what follows are boring default initializations
356 2796 : task_counter: 0,
357 2796 : inprogress_tasks: HashMap::new(),
358 2796 : queued_operations: VecDeque::new(),
359 2796 : #[cfg(feature = "testing")]
360 2796 : dangling_files: HashMap::new(),
361 2796 : recently_deleted: HashSet::new(),
362 2796 : blocked_deletions: Vec::new(),
363 2796 : shutting_down: false,
364 2796 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
365 2796 : };
366 2796 :
367 2796 : *self = UploadQueue::Initialized(state);
368 2796 : Ok(self.initialized_mut().expect("we just set it"))
369 2796 : }
370 :
371 132 : pub fn initialize_with_current_remote_index_part(
372 132 : &mut self,
373 132 : index_part: &IndexPart,
374 132 : inprogress_limit: usize,
375 132 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
376 132 : match self {
377 132 : UploadQueue::Uninitialized => (),
378 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
379 0 : anyhow::bail!("already initialized, state {}", self.as_str())
380 : }
381 : }
382 :
383 132 : info!(
384 0 : "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
385 0 : index_part.metadata.disk_consistent_lsn()
386 : );
387 :
388 132 : let state = UploadQueueInitialized {
389 132 : inprogress_limit,
390 132 : dirty: index_part.clone(),
391 132 : clean: (index_part.clone(), None),
392 132 : latest_files_changes_since_metadata_upload_scheduled: 0,
393 132 : visible_remote_consistent_lsn: Arc::new(
394 132 : index_part.metadata.disk_consistent_lsn().into(),
395 132 : ),
396 132 : // what follows are boring default initializations
397 132 : task_counter: 0,
398 132 : inprogress_tasks: HashMap::new(),
399 132 : queued_operations: VecDeque::new(),
400 132 : #[cfg(feature = "testing")]
401 132 : dangling_files: HashMap::new(),
402 132 : recently_deleted: HashSet::new(),
403 132 : blocked_deletions: Vec::new(),
404 132 : shutting_down: false,
405 132 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
406 132 : };
407 132 :
408 132 : *self = UploadQueue::Initialized(state);
409 132 : Ok(self.initialized_mut().expect("we just set it"))
410 132 : }
411 :
412 48745 : pub fn initialized_mut(&mut self) -> Result<&mut UploadQueueInitialized, NotInitialized> {
413 : use UploadQueue::*;
414 48745 : match self {
415 0 : Uninitialized => Err(NotInitialized::Uninitialized),
416 48745 : Initialized(x) => {
417 48745 : if x.shutting_down {
418 24 : Err(NotInitialized::ShuttingDown)
419 : } else {
420 48721 : Ok(x)
421 : }
422 : }
423 0 : Stopped(_) => Err(NotInitialized::Stopped),
424 : }
425 48745 : }
426 :
427 12 : pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
428 12 : match self {
429 : UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
430 0 : anyhow::bail!("queue is in state {}", self.as_str())
431 : }
432 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
433 0 : anyhow::bail!("queue is in state Stopped(Uninitialized)")
434 : }
435 12 : UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
436 : }
437 12 : }
438 : }
439 :
440 : /// An in-progress upload or delete task.
441 : #[derive(Debug)]
442 : pub struct UploadTask {
443 : /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
444 : pub task_id: u64,
445 : /// Number of task retries.
446 : pub retries: AtomicU32,
447 : /// The upload operation.
448 : pub op: UploadOp,
449 : /// Any upload operations that were coalesced into this operation. This typically happens with
450 : /// back-to-back index uploads, see `UploadQueueInitialized::next_ready()`.
451 : pub coalesced_ops: Vec<UploadOp>,
452 : }
453 :
454 : /// A deletion of some layers within the lifetime of a timeline. This is not used
455 : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
456 : #[derive(Debug, Clone)]
457 : pub struct Delete {
458 : pub layers: Vec<(LayerName, LayerFileMetadata)>,
459 : }
460 :
461 : #[derive(Clone, Debug)]
462 : pub enum UploadOp {
463 : /// Upload a layer file. The last field indicates the last operation for thie file.
464 : UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
465 :
466 : /// Upload a index_part.json file
467 : UploadMetadata {
468 : /// The next [`UploadQueueInitialized::clean`] after this upload succeeds.
469 : uploaded: Box<IndexPart>,
470 : },
471 :
472 : /// Delete layer files
473 : Delete(Delete),
474 :
475 : /// Barrier. When the barrier operation is reached, the channel is closed.
476 : Barrier(tokio::sync::watch::Sender<()>),
477 :
478 : /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
479 : /// this is the same as a Barrier.
480 : Shutdown,
481 : }
482 :
483 : impl std::fmt::Display for UploadOp {
484 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
485 0 : match self {
486 0 : UploadOp::UploadLayer(layer, metadata, mode) => {
487 0 : write!(
488 0 : f,
489 0 : "UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
490 0 : layer, metadata.file_size, metadata.generation, mode
491 0 : )
492 : }
493 0 : UploadOp::UploadMetadata { uploaded, .. } => {
494 0 : write!(
495 0 : f,
496 0 : "UploadMetadata(lsn: {})",
497 0 : uploaded.metadata.disk_consistent_lsn()
498 0 : )
499 : }
500 0 : UploadOp::Delete(delete) => {
501 0 : write!(f, "Delete({} layers)", delete.layers.len())
502 : }
503 0 : UploadOp::Barrier(_) => write!(f, "Barrier"),
504 0 : UploadOp::Shutdown => write!(f, "Shutdown"),
505 : }
506 0 : }
507 : }
508 :
509 : impl UploadOp {
510 : /// Returns true if self can bypass other, i.e. if the operations don't conflict. index is the
511 : /// active index when other would be uploaded -- if we allow self to bypass other, this would
512 : /// be the active index when self is uploaded.
513 7073253 : pub fn can_bypass(&self, other: &UploadOp, index: &IndexPart) -> bool {
514 7073253 : match (self, other) {
515 : // Nothing can bypass a barrier or shutdown, and it can't bypass anything.
516 2728 : (UploadOp::Barrier(_), _) | (_, UploadOp::Barrier(_)) => false,
517 96 : (UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,
518 :
519 : // Uploads and deletes can bypass each other unless they're for the same file.
520 119868 : (UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => {
521 119868 : let aname = &a.layer_desc().layer_name();
522 119868 : let bname = &b.layer_desc().layer_name();
523 119868 : !is_same_remote_layer_path(aname, ameta, bname, bmeta)
524 : }
525 311 : (UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d))
526 6826282 : | (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => {
527 6826669 : d.layers.iter().all(|(dname, dmeta)| {
528 6826669 : !is_same_remote_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta)
529 6826669 : })
530 : }
531 :
532 : // Deletes are idempotent and can always bypass each other.
533 16905 : (UploadOp::Delete(_), UploadOp::Delete(_)) => true,
534 :
535 : // Uploads and deletes can bypass an index upload as long as neither the uploaded index
536 : // nor the active index below it references the file. A layer can't be modified or
537 : // deleted while referenced by an index.
538 : //
539 : // Similarly, index uploads can bypass uploads and deletes as long as neither the
540 : // uploaded index nor the active index references the file (the latter would be
541 : // incorrect use by the caller).
542 830 : (UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i })
543 15137 : | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => {
544 15967 : let uname = u.layer_desc().layer_name();
545 15967 : !i.references(&uname, umeta) && !index.references(&uname, umeta)
546 : }
547 89722 : (UploadOp::Delete(d), UploadOp::UploadMetadata { uploaded: i })
548 129 : | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => {
549 89851 : d.layers.iter().all(|(dname, dmeta)| {
550 89851 : !i.references(dname, dmeta) && !index.references(dname, dmeta)
551 89851 : })
552 : }
553 :
554 : // Indexes can never bypass each other. They can coalesce though, and
555 : // `UploadQueue::next_ready()` currently does this when possible.
556 1245 : (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false,
557 : }
558 7073253 : }
559 : }
560 :
561 : #[cfg(test)]
562 : mod tests {
563 : use std::str::FromStr as _;
564 :
565 : use itertools::Itertools as _;
566 : use utils::shard::{ShardCount, ShardIndex, ShardNumber};
567 :
568 : use super::*;
569 : use crate::DEFAULT_PG_VERSION;
570 : use crate::tenant::Timeline;
571 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
572 : use crate::tenant::storage_layer::Layer;
573 : use crate::tenant::storage_layer::layer::local_layer_path;
574 :
575 : /// Test helper which asserts that two operations are the same, in lieu of UploadOp PartialEq.
576 : #[track_caller]
577 588 : fn assert_same_op(a: &UploadOp, b: &UploadOp) {
578 : use UploadOp::*;
579 588 : match (a, b) {
580 264 : (UploadLayer(a, ameta, atype), UploadLayer(b, bmeta, btype)) => {
581 264 : assert_eq!(a.layer_desc().layer_name(), b.layer_desc().layer_name());
582 264 : assert_eq!(ameta, bmeta);
583 264 : assert_eq!(atype, btype);
584 : }
585 132 : (Delete(a), Delete(b)) => assert_eq!(a.layers, b.layers),
586 168 : (UploadMetadata { uploaded: a }, UploadMetadata { uploaded: b }) => assert_eq!(a, b),
587 24 : (Barrier(_), Barrier(_)) => {}
588 0 : (Shutdown, Shutdown) => {}
589 0 : (a, b) => panic!("{a:?} != {b:?}"),
590 : }
591 588 : }
592 :
593 : /// Test helper which asserts that two sets of operations are the same.
594 : #[track_caller]
595 132 : fn assert_same_ops<'a>(
596 132 : a: impl IntoIterator<Item = &'a UploadOp>,
597 132 : b: impl IntoIterator<Item = &'a UploadOp>,
598 132 : ) {
599 132 : a.into_iter()
600 132 : .zip_eq(b)
601 348 : .for_each(|(a, b)| assert_same_op(a, b))
602 132 : }
603 :
604 : /// Test helper to construct a test timeline.
605 : ///
606 : /// TODO: it really shouldn't be necessary to construct an entire tenant and timeline just to
607 : /// test the upload queue -- decouple ResidentLayer from Timeline.
608 : ///
609 : /// TODO: the upload queue uses TimelineMetadata::example() instead, because there's no way to
610 : /// obtain a TimelineMetadata from a Timeline.
611 144 : fn make_timeline() -> Arc<Timeline> {
612 144 : // Grab the current test name from the current thread name.
613 144 : // TODO: TenantHarness shouldn't take a &'static str, but just leak the test name for now.
614 144 : let test_name = std::thread::current().name().unwrap().to_string();
615 144 : let test_name = Box::leak(test_name.into_boxed_str());
616 144 :
617 144 : let runtime = tokio::runtime::Builder::new_current_thread()
618 144 : .enable_all()
619 144 : .build()
620 144 : .expect("failed to create runtime");
621 144 :
622 144 : runtime
623 144 : .block_on(async {
624 144 : let harness = TenantHarness::create(test_name).await?;
625 144 : let (tenant, ctx) = harness.load().await;
626 144 : tenant
627 144 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
628 144 : .await
629 144 : })
630 144 : .expect("failed to create timeline")
631 144 : }
632 :
633 : /// Test helper to construct an (empty) resident layer.
634 360 : fn make_layer(timeline: &Arc<Timeline>, name: &str) -> ResidentLayer {
635 360 : make_layer_with_size(timeline, name, 0)
636 360 : }
637 :
638 : /// Test helper to construct a resident layer with the given size.
639 396 : fn make_layer_with_size(timeline: &Arc<Timeline>, name: &str, size: usize) -> ResidentLayer {
640 396 : let metadata = LayerFileMetadata {
641 396 : generation: timeline.generation,
642 396 : shard: timeline.get_shard_index(),
643 396 : file_size: size as u64,
644 396 : };
645 396 : make_layer_with_metadata(timeline, name, metadata)
646 396 : }
647 :
648 : /// Test helper to construct a layer with the given metadata.
649 588 : fn make_layer_with_metadata(
650 588 : timeline: &Arc<Timeline>,
651 588 : name: &str,
652 588 : metadata: LayerFileMetadata,
653 588 : ) -> ResidentLayer {
654 588 : let name = LayerName::from_str(name).expect("invalid name");
655 588 : let local_path = local_layer_path(
656 588 : timeline.conf,
657 588 : &timeline.tenant_shard_id,
658 588 : &timeline.timeline_id,
659 588 : &name,
660 588 : &metadata.generation,
661 588 : );
662 588 : std::fs::write(&local_path, vec![0; metadata.file_size as usize])
663 588 : .expect("failed to write file");
664 588 : Layer::for_resident(timeline.conf, timeline, local_path, name, metadata)
665 588 : }
666 :
667 : /// Test helper to add a layer to an index and return a new index.
668 72 : fn index_with(index: &IndexPart, layer: &ResidentLayer) -> Box<IndexPart> {
669 72 : let mut index = index.clone();
670 72 : index
671 72 : .layer_metadata
672 72 : .insert(layer.layer_desc().layer_name(), layer.metadata());
673 72 : Box::new(index)
674 72 : }
675 :
676 : /// Test helper to remove a layer from an index and return a new index.
677 24 : fn index_without(index: &IndexPart, layer: &ResidentLayer) -> Box<IndexPart> {
678 24 : let mut index = index.clone();
679 24 : index
680 24 : .layer_metadata
681 24 : .remove(&layer.layer_desc().layer_name());
682 24 : Box::new(index)
683 24 : }
684 :
685 : /// Nothing can bypass a barrier, and it can't bypass inprogress tasks.
686 : #[test]
687 12 : fn schedule_barrier() -> anyhow::Result<()> {
688 12 : let mut queue = UploadQueue::Uninitialized;
689 12 : let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
690 12 : let tli = make_timeline();
691 12 :
692 12 : let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter
693 12 : let layer0 = make_layer(
694 12 : &tli,
695 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
696 12 : );
697 12 : let layer1 = make_layer(
698 12 : &tli,
699 12 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
700 12 : );
701 12 : let layer2 = make_layer(
702 12 : &tli,
703 12 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
704 12 : );
705 12 : let layer3 = make_layer(
706 12 : &tli,
707 12 : "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
708 12 : );
709 12 : let (barrier, _) = tokio::sync::watch::channel(());
710 12 :
711 12 : // Enqueue non-conflicting upload, delete, and index before and after a barrier.
712 12 : let ops = [
713 12 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
714 12 : UploadOp::Delete(Delete {
715 12 : layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
716 12 : }),
717 12 : UploadOp::UploadMetadata {
718 12 : uploaded: index.clone(),
719 12 : },
720 12 : UploadOp::Barrier(barrier),
721 12 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
722 12 : UploadOp::Delete(Delete {
723 12 : layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
724 12 : }),
725 12 : UploadOp::UploadMetadata {
726 12 : uploaded: index.clone(),
727 12 : },
728 12 : ];
729 12 :
730 12 : queue.queued_operations.extend(ops.clone());
731 12 :
732 12 : // Schedule the initial operations ahead of the barrier.
733 12 : let tasks = queue.schedule_ready();
734 12 :
735 36 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..3]);
736 12 : assert!(matches!(
737 12 : queue.queued_operations.front(),
738 : Some(&UploadOp::Barrier(_))
739 : ));
740 :
741 : // Complete the initial operations. The barrier isn't scheduled while they're pending.
742 48 : for task in tasks {
743 36 : assert!(queue.schedule_ready().is_empty());
744 36 : queue.complete(task.task_id);
745 : }
746 :
747 : // Schedule the barrier. The later tasks won't schedule until it completes.
748 12 : let tasks = queue.schedule_ready();
749 12 :
750 12 : assert_eq!(tasks.len(), 1);
751 12 : assert!(matches!(tasks[0].op, UploadOp::Barrier(_)));
752 12 : assert_eq!(queue.queued_operations.len(), 3);
753 :
754 : // Complete the barrier. The rest of the tasks schedule immediately.
755 12 : queue.complete(tasks[0].task_id);
756 12 :
757 12 : let tasks = queue.schedule_ready();
758 36 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[4..]);
759 12 : assert!(queue.queued_operations.is_empty());
760 :
761 12 : Ok(())
762 12 : }
763 :
764 : /// Deletes can be scheduled in parallel, even if they're for the same file.
765 : #[test]
766 12 : fn schedule_delete_parallel() -> anyhow::Result<()> {
767 12 : let mut queue = UploadQueue::Uninitialized;
768 12 : let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
769 12 : let tli = make_timeline();
770 12 :
771 12 : // Enqueue a bunch of deletes, some with conflicting names.
772 12 : let layer0 = make_layer(
773 12 : &tli,
774 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
775 12 : );
776 12 : let layer1 = make_layer(
777 12 : &tli,
778 12 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
779 12 : );
780 12 : let layer2 = make_layer(
781 12 : &tli,
782 12 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
783 12 : );
784 12 : let layer3 = make_layer(
785 12 : &tli,
786 12 : "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
787 12 : );
788 12 :
789 12 : let ops = [
790 12 : UploadOp::Delete(Delete {
791 12 : layers: vec![(layer0.layer_desc().layer_name(), layer0.metadata())],
792 12 : }),
793 12 : UploadOp::Delete(Delete {
794 12 : layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
795 12 : }),
796 12 : UploadOp::Delete(Delete {
797 12 : layers: vec![
798 12 : (layer1.layer_desc().layer_name(), layer1.metadata()),
799 12 : (layer2.layer_desc().layer_name(), layer2.metadata()),
800 12 : ],
801 12 : }),
802 12 : UploadOp::Delete(Delete {
803 12 : layers: vec![(layer2.layer_desc().layer_name(), layer2.metadata())],
804 12 : }),
805 12 : UploadOp::Delete(Delete {
806 12 : layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
807 12 : }),
808 12 : ];
809 12 :
810 12 : queue.queued_operations.extend(ops.clone());
811 12 :
812 12 : // Schedule all ready operations. Since deletes don't conflict, they're all scheduled.
813 12 : let tasks = queue.schedule_ready();
814 12 :
815 60 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops);
816 12 : assert!(queue.queued_operations.is_empty());
817 :
818 12 : Ok(())
819 12 : }
820 :
821 : /// Conflicting uploads are serialized.
822 : #[test]
823 12 : fn schedule_upload_conflicts() -> anyhow::Result<()> {
824 12 : let mut queue = UploadQueue::Uninitialized;
825 12 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
826 12 : let tli = make_timeline();
827 12 :
828 12 : // Enqueue three versions of the same layer, with different file sizes.
829 12 : let layer0a = make_layer_with_size(
830 12 : &tli,
831 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
832 12 : 1,
833 12 : );
834 12 : let layer0b = make_layer_with_size(
835 12 : &tli,
836 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
837 12 : 2,
838 12 : );
839 12 : let layer0c = make_layer_with_size(
840 12 : &tli,
841 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
842 12 : 3,
843 12 : );
844 12 :
845 12 : let ops = [
846 12 : UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None),
847 12 : UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None),
848 12 : UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None),
849 12 : ];
850 12 :
851 12 : queue.queued_operations.extend(ops.clone());
852 :
853 : // Only one version should be scheduled and uploaded at a time.
854 48 : for op in ops {
855 36 : let tasks = queue.schedule_ready();
856 36 : assert_eq!(tasks.len(), 1);
857 36 : assert_same_op(&tasks[0].op, &op);
858 36 : queue.complete(tasks[0].task_id);
859 : }
860 12 : assert!(queue.schedule_ready().is_empty());
861 12 : assert!(queue.queued_operations.is_empty());
862 :
863 12 : Ok(())
864 12 : }
865 :
866 : /// Conflicting uploads and deletes are serialized.
867 : #[test]
868 12 : fn schedule_upload_delete_conflicts() -> anyhow::Result<()> {
869 12 : let mut queue = UploadQueue::Uninitialized;
870 12 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
871 12 : let tli = make_timeline();
872 12 :
873 12 : // Enqueue two layer uploads, with a delete of both layers in between them. These should be
874 12 : // scheduled one at a time, since deletes can't bypass uploads and vice versa.
875 12 : let layer0 = make_layer(
876 12 : &tli,
877 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
878 12 : );
879 12 : let layer1 = make_layer(
880 12 : &tli,
881 12 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
882 12 : );
883 12 :
884 12 : let ops = [
885 12 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
886 12 : UploadOp::Delete(Delete {
887 12 : layers: vec![
888 12 : (layer0.layer_desc().layer_name(), layer0.metadata()),
889 12 : (layer1.layer_desc().layer_name(), layer1.metadata()),
890 12 : ],
891 12 : }),
892 12 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
893 12 : ];
894 12 :
895 12 : queue.queued_operations.extend(ops.clone());
896 :
897 : // Only one version should be scheduled and uploaded at a time.
898 48 : for op in ops {
899 36 : let tasks = queue.schedule_ready();
900 36 : assert_eq!(tasks.len(), 1);
901 36 : assert_same_op(&tasks[0].op, &op);
902 36 : queue.complete(tasks[0].task_id);
903 : }
904 12 : assert!(queue.schedule_ready().is_empty());
905 12 : assert!(queue.queued_operations.is_empty());
906 :
907 12 : Ok(())
908 12 : }
909 :
910 : /// Non-conflicting uploads and deletes can bypass the queue, avoiding the conflicting
911 : /// delete/upload operations at the head of the queue.
912 : #[test]
913 12 : fn schedule_upload_delete_conflicts_bypass() -> anyhow::Result<()> {
914 12 : let mut queue = UploadQueue::Uninitialized;
915 12 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
916 12 : let tli = make_timeline();
917 12 :
918 12 : // Enqueue two layer uploads, with a delete of both layers in between them. These should be
919 12 : // scheduled one at a time, since deletes can't bypass uploads and vice versa.
920 12 : //
921 12 : // Also enqueue non-conflicting uploads and deletes at the end. These can bypass the queue
922 12 : // and run immediately.
923 12 : let layer0 = make_layer(
924 12 : &tli,
925 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
926 12 : );
927 12 : let layer1 = make_layer(
928 12 : &tli,
929 12 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
930 12 : );
931 12 : let layer2 = make_layer(
932 12 : &tli,
933 12 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
934 12 : );
935 12 : let layer3 = make_layer(
936 12 : &tli,
937 12 : "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
938 12 : );
939 12 :
940 12 : let ops = [
941 12 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
942 12 : UploadOp::Delete(Delete {
943 12 : layers: vec![
944 12 : (layer0.layer_desc().layer_name(), layer0.metadata()),
945 12 : (layer1.layer_desc().layer_name(), layer1.metadata()),
946 12 : ],
947 12 : }),
948 12 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
949 12 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
950 12 : UploadOp::Delete(Delete {
951 12 : layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
952 12 : }),
953 12 : ];
954 12 :
955 12 : queue.queued_operations.extend(ops.clone());
956 12 :
957 12 : // Operations 0, 3, and 4 are scheduled immediately.
958 12 : let tasks = queue.schedule_ready();
959 36 : assert_same_ops(tasks.iter().map(|t| &t.op), [&ops[0], &ops[3], &ops[4]]);
960 12 : assert_eq!(queue.queued_operations.len(), 2);
961 :
962 12 : Ok(())
963 12 : }
964 :
965 : /// Non-conflicting uploads are parallelized.
966 : #[test]
967 12 : fn schedule_upload_parallel() -> anyhow::Result<()> {
968 12 : let mut queue = UploadQueue::Uninitialized;
969 12 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
970 12 : let tli = make_timeline();
971 12 :
972 12 : // Enqueue three different layer uploads.
973 12 : let layer0 = make_layer(
974 12 : &tli,
975 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
976 12 : );
977 12 : let layer1 = make_layer(
978 12 : &tli,
979 12 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
980 12 : );
981 12 : let layer2 = make_layer(
982 12 : &tli,
983 12 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
984 12 : );
985 12 :
986 12 : let ops = [
987 12 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
988 12 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
989 12 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
990 12 : ];
991 12 :
992 12 : queue.queued_operations.extend(ops.clone());
993 12 :
994 12 : // All uploads should be scheduled concurrently.
995 12 : let tasks = queue.schedule_ready();
996 12 :
997 36 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops);
998 12 : assert!(queue.queued_operations.is_empty());
999 :
1000 12 : Ok(())
1001 12 : }
1002 :
1003 : /// Index uploads are coalesced.
1004 : #[test]
1005 12 : fn schedule_index_coalesce() -> anyhow::Result<()> {
1006 12 : let mut queue = UploadQueue::Uninitialized;
1007 12 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
1008 :
1009 : // Enqueue three uploads of the current empty index.
1010 12 : let index = Box::new(queue.clean.0.clone());
1011 12 :
1012 12 : let ops = [
1013 12 : UploadOp::UploadMetadata {
1014 12 : uploaded: index.clone(),
1015 12 : },
1016 12 : UploadOp::UploadMetadata {
1017 12 : uploaded: index.clone(),
1018 12 : },
1019 12 : UploadOp::UploadMetadata {
1020 12 : uploaded: index.clone(),
1021 12 : },
1022 12 : ];
1023 12 :
1024 12 : queue.queued_operations.extend(ops.clone());
1025 12 :
1026 12 : // The index uploads are coalesced into a single operation.
1027 12 : let tasks = queue.schedule_ready();
1028 12 : assert_eq!(tasks.len(), 1);
1029 12 : assert_same_op(&tasks[0].op, &ops[2]);
1030 12 : assert_same_ops(&tasks[0].coalesced_ops, &ops[0..2]);
1031 12 :
1032 12 : assert!(queue.queued_operations.is_empty());
1033 :
1034 12 : Ok(())
1035 12 : }
1036 :
1037 : /// Chains of upload/index operations lead to parallel layer uploads and serial index uploads.
1038 : /// This is the common case with layer flushes.
1039 : #[test]
1040 12 : fn schedule_index_upload_chain() -> anyhow::Result<()> {
1041 12 : let mut queue = UploadQueue::Uninitialized;
1042 12 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
1043 12 : let tli = make_timeline();
1044 12 :
1045 12 : // Enqueue three uploads of the current empty index.
1046 12 : let index = Box::new(queue.clean.0.clone());
1047 12 : let layer0 = make_layer(
1048 12 : &tli,
1049 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1050 12 : );
1051 12 : let index0 = index_with(&index, &layer0);
1052 12 : let layer1 = make_layer(
1053 12 : &tli,
1054 12 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1055 12 : );
1056 12 : let index1 = index_with(&index0, &layer1);
1057 12 : let layer2 = make_layer(
1058 12 : &tli,
1059 12 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1060 12 : );
1061 12 : let index2 = index_with(&index1, &layer2);
1062 12 :
1063 12 : let ops = [
1064 12 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
1065 12 : UploadOp::UploadMetadata {
1066 12 : uploaded: index0.clone(),
1067 12 : },
1068 12 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
1069 12 : UploadOp::UploadMetadata {
1070 12 : uploaded: index1.clone(),
1071 12 : },
1072 12 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
1073 12 : UploadOp::UploadMetadata {
1074 12 : uploaded: index2.clone(),
1075 12 : },
1076 12 : ];
1077 12 :
1078 12 : queue.queued_operations.extend(ops.clone());
1079 12 :
1080 12 : // The layer uploads should be scheduled immediately. The indexes must wait.
1081 12 : let upload_tasks = queue.schedule_ready();
1082 12 : assert_same_ops(
1083 36 : upload_tasks.iter().map(|t| &t.op),
1084 12 : [&ops[0], &ops[2], &ops[4]],
1085 12 : );
1086 12 :
1087 12 : // layer2 completes first. None of the indexes can upload yet.
1088 12 : queue.complete(upload_tasks[2].task_id);
1089 12 : assert!(queue.schedule_ready().is_empty());
1090 :
1091 : // layer0 completes. index0 can upload. It completes.
1092 12 : queue.complete(upload_tasks[0].task_id);
1093 12 : let index_tasks = queue.schedule_ready();
1094 12 : assert_eq!(index_tasks.len(), 1);
1095 12 : assert_same_op(&index_tasks[0].op, &ops[1]);
1096 12 : queue.complete(index_tasks[0].task_id);
1097 12 :
1098 12 : // layer 1 completes. This unblocks index 1 and 2, which coalesce into
1099 12 : // a single upload for index 2.
1100 12 : queue.complete(upload_tasks[1].task_id);
1101 12 :
1102 12 : let index_tasks = queue.schedule_ready();
1103 12 : assert_eq!(index_tasks.len(), 1);
1104 12 : assert_same_op(&index_tasks[0].op, &ops[5]);
1105 12 : assert_same_ops(&index_tasks[0].coalesced_ops, &ops[3..4]);
1106 12 :
1107 12 : assert!(queue.queued_operations.is_empty());
1108 :
1109 12 : Ok(())
1110 12 : }
1111 :
1112 : /// A delete can't bypass an index upload if an index ahead of it still references it.
1113 : #[test]
1114 12 : fn schedule_index_delete_dereferenced() -> anyhow::Result<()> {
1115 12 : let mut queue = UploadQueue::Uninitialized;
1116 12 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
1117 12 : let tli = make_timeline();
1118 12 :
1119 12 : // Create a layer to upload.
1120 12 : let layer = make_layer(
1121 12 : &tli,
1122 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1123 12 : );
1124 12 : let index_upload = index_with(&queue.clean.0, &layer);
1125 12 :
1126 12 : // Remove the layer reference in a new index, then delete the layer.
1127 12 : let index_deref = index_without(&index_upload, &layer);
1128 12 :
1129 12 : let ops = [
1130 12 : // Initial upload, with a barrier to prevent index coalescing.
1131 12 : UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
1132 12 : UploadOp::UploadMetadata {
1133 12 : uploaded: index_upload.clone(),
1134 12 : },
1135 12 : UploadOp::Barrier(tokio::sync::watch::channel(()).0),
1136 12 : // Dereference the layer and delete it.
1137 12 : UploadOp::UploadMetadata {
1138 12 : uploaded: index_deref.clone(),
1139 12 : },
1140 12 : UploadOp::Delete(Delete {
1141 12 : layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
1142 12 : }),
1143 12 : ];
1144 12 :
1145 12 : queue.queued_operations.extend(ops.clone());
1146 :
1147 : // Operations are serialized.
1148 72 : for op in ops {
1149 60 : let tasks = queue.schedule_ready();
1150 60 : assert_eq!(tasks.len(), 1);
1151 60 : assert_same_op(&tasks[0].op, &op);
1152 60 : queue.complete(tasks[0].task_id);
1153 : }
1154 12 : assert!(queue.queued_operations.is_empty());
1155 :
1156 12 : Ok(())
1157 12 : }
1158 :
1159 : /// An upload with a reused layer name doesn't clobber the previous layer. Specifically, a
1160 : /// dereference/upload/reference cycle can't allow the upload to bypass the reference.
1161 : #[test]
1162 12 : fn schedule_index_upload_dereferenced() -> anyhow::Result<()> {
1163 12 : let mut queue = UploadQueue::Uninitialized;
1164 12 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
1165 12 : let tli = make_timeline();
1166 12 :
1167 12 : // Create a layer to upload.
1168 12 : let layer = make_layer(
1169 12 : &tli,
1170 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1171 12 : );
1172 12 :
1173 12 : // Upload the layer. Then dereference the layer, and upload/reference it again.
1174 12 : let index_upload = index_with(&queue.clean.0, &layer);
1175 12 : let index_deref = index_without(&index_upload, &layer);
1176 12 : let index_ref = index_with(&index_deref, &layer);
1177 12 :
1178 12 : let ops = [
1179 12 : // Initial upload, with a barrier to prevent index coalescing.
1180 12 : UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
1181 12 : UploadOp::UploadMetadata {
1182 12 : uploaded: index_upload.clone(),
1183 12 : },
1184 12 : UploadOp::Barrier(tokio::sync::watch::channel(()).0),
1185 12 : // Dereference the layer.
1186 12 : UploadOp::UploadMetadata {
1187 12 : uploaded: index_deref.clone(),
1188 12 : },
1189 12 : // Replace and reference the layer.
1190 12 : UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
1191 12 : UploadOp::UploadMetadata {
1192 12 : uploaded: index_ref.clone(),
1193 12 : },
1194 12 : ];
1195 12 :
1196 12 : queue.queued_operations.extend(ops.clone());
1197 :
1198 : // Operations are serialized.
1199 84 : for op in ops {
1200 72 : let tasks = queue.schedule_ready();
1201 72 : assert_eq!(tasks.len(), 1);
1202 72 : assert_same_op(&tasks[0].op, &op);
1203 72 : queue.complete(tasks[0].task_id);
1204 : }
1205 12 : assert!(queue.queued_operations.is_empty());
1206 :
1207 12 : Ok(())
1208 12 : }
1209 :
1210 : /// Nothing can bypass a shutdown, and it waits for inprogress tasks. It's never returned from
1211 : /// next_ready(), but is left at the head of the queue.
1212 : #[test]
1213 12 : fn schedule_shutdown() -> anyhow::Result<()> {
1214 12 : let mut queue = UploadQueue::Uninitialized;
1215 12 : let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
1216 12 : let tli = make_timeline();
1217 12 :
1218 12 : let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter
1219 12 : let layer0 = make_layer(
1220 12 : &tli,
1221 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1222 12 : );
1223 12 : let layer1 = make_layer(
1224 12 : &tli,
1225 12 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1226 12 : );
1227 12 : let layer2 = make_layer(
1228 12 : &tli,
1229 12 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1230 12 : );
1231 12 : let layer3 = make_layer(
1232 12 : &tli,
1233 12 : "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1234 12 : );
1235 12 :
1236 12 : // Enqueue non-conflicting upload, delete, and index before and after a shutdown.
1237 12 : let ops = [
1238 12 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
1239 12 : UploadOp::Delete(Delete {
1240 12 : layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
1241 12 : }),
1242 12 : UploadOp::UploadMetadata {
1243 12 : uploaded: index.clone(),
1244 12 : },
1245 12 : UploadOp::Shutdown,
1246 12 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
1247 12 : UploadOp::Delete(Delete {
1248 12 : layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
1249 12 : }),
1250 12 : UploadOp::UploadMetadata {
1251 12 : uploaded: index.clone(),
1252 12 : },
1253 12 : ];
1254 12 :
1255 12 : queue.queued_operations.extend(ops.clone());
1256 12 :
1257 12 : // Schedule the initial operations ahead of the shutdown.
1258 12 : let tasks = queue.schedule_ready();
1259 12 :
1260 36 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..3]);
1261 12 : assert!(matches!(
1262 12 : queue.queued_operations.front(),
1263 : Some(&UploadOp::Shutdown)
1264 : ));
1265 :
1266 : // Complete the initial operations. The shutdown isn't triggered while they're pending.
1267 48 : for task in tasks {
1268 36 : assert!(queue.schedule_ready().is_empty());
1269 36 : queue.complete(task.task_id);
1270 : }
1271 :
1272 : // The shutdown is triggered the next time we try to pull an operation. It isn't returned,
1273 : // but is left in the queue.
1274 12 : assert!(!queue.shutdown_ready.is_closed());
1275 12 : assert!(queue.next_ready().is_none());
1276 12 : assert!(queue.shutdown_ready.is_closed());
1277 :
1278 12 : Ok(())
1279 12 : }
1280 :
1281 : /// Scheduling respects inprogress_limit.
1282 : #[test]
1283 12 : fn schedule_inprogress_limit() -> anyhow::Result<()> {
1284 12 : // Create a queue with inprogress_limit=2.
1285 12 : let mut queue = UploadQueue::Uninitialized;
1286 12 : let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 2)?;
1287 12 : let tli = make_timeline();
1288 12 :
1289 12 : // Enqueue a bunch of uploads.
1290 12 : let layer0 = make_layer(
1291 12 : &tli,
1292 12 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1293 12 : );
1294 12 : let layer1 = make_layer(
1295 12 : &tli,
1296 12 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1297 12 : );
1298 12 : let layer2 = make_layer(
1299 12 : &tli,
1300 12 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1301 12 : );
1302 12 : let layer3 = make_layer(
1303 12 : &tli,
1304 12 : "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1305 12 : );
1306 12 :
1307 12 : let ops = [
1308 12 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
1309 12 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
1310 12 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
1311 12 : UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None),
1312 12 : ];
1313 12 :
1314 12 : queue.queued_operations.extend(ops.clone());
1315 12 :
1316 12 : // Schedule all ready operations. Only 2 are scheduled.
1317 12 : let tasks = queue.schedule_ready();
1318 24 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..2]);
1319 12 : assert!(queue.next_ready().is_none());
1320 :
1321 : // When one completes, another is scheduled.
1322 12 : queue.complete(tasks[0].task_id);
1323 12 : let tasks = queue.schedule_ready();
1324 12 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[2..3]);
1325 12 :
1326 12 : Ok(())
1327 12 : }
1328 :
1329 : /// Tests that can_bypass takes name, generation and shard index into account for all operations.
1330 : #[test]
1331 12 : fn can_bypass_path() -> anyhow::Result<()> {
1332 12 : let tli = make_timeline();
1333 12 :
1334 12 : let name0 = &"000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51";
1335 12 : let name1 = &"100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51";
1336 :
1337 : // Asserts that layers a and b either can or can't bypass each other, for all combinations
1338 : // of operations (except Delete and UploadMetadata which are special-cased).
1339 : #[track_caller]
1340 96 : fn assert_can_bypass(a: ResidentLayer, b: ResidentLayer, can_bypass: bool) {
1341 96 : let index = IndexPart::empty(TimelineMetadata::example());
1342 288 : for (a, b) in make_ops(a).into_iter().zip(make_ops(b)) {
1343 288 : match (&a, &b) {
1344 : // Deletes can always bypass each other.
1345 96 : (UploadOp::Delete(_), UploadOp::Delete(_)) => assert!(a.can_bypass(&b, &index)),
1346 : // Indexes can never bypass each other.
1347 : (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => {
1348 96 : assert!(!a.can_bypass(&b, &index))
1349 : }
1350 : // For other operations, assert as requested.
1351 96 : (a, b) => assert_eq!(a.can_bypass(b, &index), can_bypass),
1352 : }
1353 : }
1354 96 : }
1355 :
1356 192 : fn make_ops(layer: ResidentLayer) -> Vec<UploadOp> {
1357 192 : let mut index = IndexPart::empty(TimelineMetadata::example());
1358 192 : index
1359 192 : .layer_metadata
1360 192 : .insert(layer.layer_desc().layer_name(), layer.metadata());
1361 192 : vec![
1362 192 : UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
1363 192 : UploadOp::Delete(Delete {
1364 192 : layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
1365 192 : }),
1366 192 : UploadOp::UploadMetadata {
1367 192 : uploaded: Box::new(index),
1368 192 : },
1369 192 : ]
1370 192 : }
1371 :
1372 : // Makes a ResidentLayer.
1373 192 : let layer = |name: &'static str, shard: Option<u8>, generation: u32| -> ResidentLayer {
1374 192 : let shard = shard
1375 192 : .map(|n| ShardIndex::new(ShardNumber(n), ShardCount(8)))
1376 192 : .unwrap_or(ShardIndex::unsharded());
1377 192 : let metadata = LayerFileMetadata {
1378 192 : shard,
1379 192 : generation: Generation::Valid(generation),
1380 192 : file_size: 0,
1381 192 : };
1382 192 : make_layer_with_metadata(&tli, name, metadata)
1383 192 : };
1384 :
1385 : // Same name and metadata can't bypass. This goes both for unsharded and sharded, as well as
1386 : // 0 or >0 generation.
1387 12 : assert_can_bypass(layer(name0, None, 0), layer(name0, None, 0), false);
1388 12 : assert_can_bypass(layer(name0, Some(0), 0), layer(name0, Some(0), 0), false);
1389 12 : assert_can_bypass(layer(name0, None, 1), layer(name0, None, 1), false);
1390 12 :
1391 12 : // Different names can bypass.
1392 12 : assert_can_bypass(layer(name0, None, 0), layer(name1, None, 0), true);
1393 12 :
1394 12 : // Different shards can bypass. Shard 0 is different from unsharded.
1395 12 : assert_can_bypass(layer(name0, Some(0), 0), layer(name0, Some(1), 0), true);
1396 12 : assert_can_bypass(layer(name0, Some(0), 0), layer(name0, None, 0), true);
1397 12 :
1398 12 : // Different generations can bypass, both sharded and unsharded.
1399 12 : assert_can_bypass(layer(name0, None, 0), layer(name0, None, 1), true);
1400 12 : assert_can_bypass(layer(name0, Some(1), 0), layer(name0, Some(1), 1), true);
1401 12 :
1402 12 : Ok(())
1403 12 : }
1404 : }
|