Line data Source code
1 : use std::collections::{HashMap, HashSet, VecDeque};
2 : use std::fmt::Debug;
3 : use std::sync::Arc;
4 : use std::sync::atomic::AtomicU32;
5 :
6 : use chrono::NaiveDateTime;
7 : use once_cell::sync::Lazy;
8 : use tracing::info;
9 : use utils::generation::Generation;
10 : use utils::lsn::{AtomicLsn, Lsn};
11 :
12 : use super::remote_timeline_client::is_same_remote_layer_path;
13 : use super::storage_layer::{AsLayerDesc as _, LayerName, ResidentLayer};
14 : use crate::tenant::metadata::TimelineMetadata;
15 : use crate::tenant::remote_timeline_client::index::{IndexPart, LayerFileMetadata};
16 :
17 : /// Kill switch for upload queue reordering in case it causes problems.
18 : /// TODO: remove this once we have confidence in it.
19 : static DISABLE_UPLOAD_QUEUE_REORDERING: Lazy<bool> =
20 393 : Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_REORDERING").as_deref() == Ok("true"));
21 :
22 : /// Kill switch for index upload coalescing in case it causes problems.
23 : /// TODO: remove this once we have confidence in it.
24 : static DISABLE_UPLOAD_QUEUE_INDEX_COALESCING: Lazy<bool> =
25 15 : Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_INDEX_COALESCING").as_deref() == Ok("true"));
26 :
27 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
28 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
29 : // that many upload queues in a running pageserver, and most of them are initialized
30 : // anyway.
31 : #[allow(clippy::large_enum_variant)]
32 : pub enum UploadQueue {
33 : Uninitialized,
34 : Initialized(UploadQueueInitialized),
35 : Stopped(UploadQueueStopped),
36 : }
37 :
38 : impl UploadQueue {
39 0 : pub fn as_str(&self) -> &'static str {
40 0 : match self {
41 0 : UploadQueue::Uninitialized => "Uninitialized",
42 0 : UploadQueue::Initialized(_) => "Initialized",
43 0 : UploadQueue::Stopped(_) => "Stopped",
44 : }
45 0 : }
46 : }
47 :
48 : #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
49 : pub enum OpType {
50 : MayReorder,
51 : FlushDeletion,
52 : }
53 :
54 : /// This keeps track of queued and in-progress tasks.
55 : pub struct UploadQueueInitialized {
56 : /// Maximum number of inprogress tasks to schedule. 0 is no limit.
57 : pub(crate) inprogress_limit: usize,
58 :
59 : /// Counter to assign task IDs
60 : pub(crate) task_counter: u64,
61 :
62 : /// The next uploaded index_part.json; assumed to be dirty.
63 : ///
64 : /// Should not be read, directly except for layer file updates. Instead you should add a
65 : /// projected field.
66 : pub(crate) dirty: IndexPart,
67 :
68 : /// The latest remote persisted IndexPart.
69 : ///
70 : /// Each completed metadata upload will update this. The second item is the task_id which last
71 : /// updated the value, used to ensure we never store an older value over a newer one.
72 : pub(crate) clean: (IndexPart, Option<u64>),
73 :
74 : /// How many file uploads or deletions been scheduled, since the
75 : /// last (scheduling of) metadata index upload?
76 : pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
77 :
78 : /// The Lsn is only updated after our generation has been validated with
79 : /// the control plane (unlesss a timeline's generation is None, in which case
80 : /// we skip validation)
81 : pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
82 :
83 : /// Tasks that are currently in-progress. In-progress means that a tokio Task
84 : /// has been launched for it. An in-progress task can be busy uploading, but it can
85 : /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
86 : /// be waiting for retry in `exponential_backoff`.
87 : pub inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
88 :
89 : /// Queued operations that have not been launched yet. They might depend on previous
90 : /// tasks to finish. For example, metadata upload cannot be performed before all
91 : /// preceding layer file uploads have completed.
92 : pub queued_operations: VecDeque<UploadOp>,
93 :
94 : /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
95 : /// for error logging.
96 : ///
97 : /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
98 : /// bug causing leaks, then it's better to not leave this enabled for production builds.
99 : #[cfg(feature = "testing")]
100 : pub(crate) dangling_files: HashMap<LayerName, Generation>,
101 :
102 : /// Ensure we order file operations correctly.
103 : pub(crate) recently_deleted: HashSet<(LayerName, Generation)>,
104 :
105 : /// Deletions that are blocked by the tenant configuration
106 : pub(crate) blocked_deletions: Vec<Delete>,
107 :
108 : /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
109 : pub(crate) shutting_down: bool,
110 :
111 : /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
112 : /// wait on until one of them stops the queue. The semaphore is closed when
113 : /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
114 : pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
115 : }
116 :
117 : impl UploadQueueInitialized {
118 16 : pub(super) fn no_pending_work(&self) -> bool {
119 16 : self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
120 16 : }
121 :
122 0 : pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
123 0 : self.visible_remote_consistent_lsn.load()
124 0 : }
125 :
126 0 : pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
127 0 : let lsn = self.clean.0.metadata.disk_consistent_lsn();
128 0 : self.clean.1.map(|_| lsn)
129 0 : }
130 :
131 : /// Returns and removes the next ready operation from the queue, if any. This isn't necessarily
132 : /// the first operation in the queue, to avoid head-of-line blocking -- an operation can jump
133 : /// the queue if it doesn't conflict with operations ahead of it.
134 : ///
135 : /// Also returns any operations that were coalesced into this one, e.g. multiple index uploads.
136 : ///
137 : /// None may be returned even if the queue isn't empty, if no operations are ready yet.
138 : ///
139 : /// NB: this is quadratic, but queues are expected to be small, and bounded by inprogress_limit.
140 26786 : pub fn next_ready(&mut self) -> Option<(UploadOp, Vec<UploadOp>)> {
141 26786 : // If inprogress_tasks is already at limit, don't schedule anything more.
142 26786 : if self.inprogress_limit > 0 && self.inprogress_tasks.len() >= self.inprogress_limit {
143 12 : return None;
144 26774 : }
145 :
146 44480 : for (i, candidate) in self.queued_operations.iter().enumerate() {
147 : // If this candidate is ready, go for it. Otherwise, try the next one.
148 44480 : if self.is_ready(i) {
149 : // Shutdown operations are left at the head of the queue, to prevent further
150 : // operations from starting. Signal that we're ready to shut down.
151 10164 : if matches!(candidate, UploadOp::Shutdown) {
152 20 : assert!(self.inprogress_tasks.is_empty(), "shutdown with tasks");
153 20 : assert_eq!(i, 0, "shutdown not at head of queue");
154 20 : self.shutdown_ready.close();
155 20 : return None;
156 10144 : }
157 10144 :
158 10144 : let mut op = self.queued_operations.remove(i).expect("i can't disappear");
159 10144 :
160 10144 : // Coalesce any back-to-back index uploads by only uploading the newest one that's
161 10144 : // ready. This typically happens with layer/index/layer/index/... sequences, where
162 10144 : // the layers bypass the indexes, leaving the indexes queued.
163 10144 : //
164 10144 : // If other operations are interleaved between index uploads we don't try to
165 10144 : // coalesce them, since we may as well update the index concurrently with them.
166 10144 : // This keeps the index fresh and avoids starvation.
167 10144 : //
168 10144 : // NB: we assume that all uploaded indexes have the same remote path. This
169 10144 : // is true at the time of writing: the path only depends on the tenant,
170 10144 : // timeline and generation, all of which are static for a timeline instance.
171 10144 : // Otherwise, we must be careful not to coalesce different paths.
172 10144 : let mut coalesced_ops = Vec::new();
173 10144 : if matches!(op, UploadOp::UploadMetadata { .. }) {
174 3085 : while let Some(UploadOp::UploadMetadata { .. }) = self.queued_operations.get(i)
175 : {
176 28 : if *DISABLE_UPLOAD_QUEUE_INDEX_COALESCING {
177 0 : break;
178 28 : }
179 28 : if !self.is_ready(i) {
180 10 : break;
181 18 : }
182 18 : coalesced_ops.push(op);
183 18 : op = self.queued_operations.remove(i).expect("i can't disappear");
184 : }
185 7077 : }
186 :
187 10144 : return Some((op, coalesced_ops));
188 34316 : }
189 :
190 : // Nothing can bypass a barrier or shutdown. If it wasn't scheduled above, give up.
191 34316 : if matches!(candidate, UploadOp::Barrier(_) | UploadOp::Shutdown) {
192 4805 : return None;
193 29511 : }
194 29511 :
195 29511 : // If upload queue reordering is disabled, bail out after the first operation.
196 29511 : if *DISABLE_UPLOAD_QUEUE_REORDERING {
197 0 : return None;
198 29511 : }
199 : }
200 11805 : None
201 26786 : }
202 :
203 : /// Returns true if the queued operation at the given position is ready to be uploaded, i.e. if
204 : /// it doesn't conflict with any in-progress or queued operations ahead of it. Operations are
205 : /// allowed to skip the queue when it's safe to do so, to increase parallelism.
206 : ///
207 : /// The position must be valid for the queue size.
208 44508 : fn is_ready(&self, pos: usize) -> bool {
209 44508 : let candidate = self.queued_operations.get(pos).expect("invalid position");
210 44508 : self
211 44508 : // Look at in-progress operations, in random order.
212 44508 : .inprogress_tasks
213 44508 : .values()
214 1911496 : .map(|task| &task.op)
215 44508 : // Then queued operations ahead of the candidate, front-to-back.
216 44508 : .chain(self.queued_operations.iter().take(pos))
217 44508 : // Keep track of the active index ahead of each operation. This is used to ensure that
218 44508 : // an upload doesn't skip the queue too far, such that it modifies a layer that's
219 44508 : // referenced by an active index.
220 44508 : //
221 44508 : // It's okay that in-progress operations are emitted in random order above, since at
222 44508 : // most one of them can be an index upload (enforced by can_bypass).
223 1932952 : .scan(&self.clean.0, |next_active_index, op| {
224 1932952 : let active_index = *next_active_index;
225 1932952 : if let UploadOp::UploadMetadata { uploaded } = op {
226 25572 : *next_active_index = uploaded; // stash index for next operation after this
227 1907380 : }
228 1932952 : Some((op, active_index))
229 1932952 : })
230 44508 : // Check if the candidate can bypass all of them.
231 1932952 : .all(|(op, active_index)| candidate.can_bypass(op, active_index))
232 44508 : }
233 :
234 : /// Returns the number of in-progress deletion operations.
235 : #[cfg(test)]
236 4 : pub(crate) fn num_inprogress_deletions(&self) -> usize {
237 4 : self.inprogress_tasks
238 4 : .iter()
239 4 : .filter(|(_, t)| matches!(t.op, UploadOp::Delete(_)))
240 4 : .count()
241 4 : }
242 :
243 : /// Returns the number of in-progress layer uploads.
244 : #[cfg(test)]
245 8 : pub(crate) fn num_inprogress_layer_uploads(&self) -> usize {
246 8 : self.inprogress_tasks
247 8 : .iter()
248 12 : .filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _)))
249 8 : .count()
250 8 : }
251 :
252 : /// Test helper that schedules all ready operations into inprogress_tasks, and returns
253 : /// references to them.
254 : ///
255 : /// TODO: the corresponding production logic should be moved from RemoteTimelineClient into
256 : /// UploadQueue, so we can use the same code path.
257 : #[cfg(test)]
258 156 : fn schedule_ready(&mut self) -> Vec<Arc<UploadTask>> {
259 156 : let mut tasks = Vec::new();
260 : // NB: schedule operations one by one, to handle conflicts with inprogress_tasks.
261 344 : while let Some((op, coalesced_ops)) = self.next_ready() {
262 188 : self.task_counter += 1;
263 188 : let task = Arc::new(UploadTask {
264 188 : task_id: self.task_counter,
265 188 : op,
266 188 : coalesced_ops,
267 188 : retries: 0.into(),
268 188 : });
269 188 : self.inprogress_tasks.insert(task.task_id, task.clone());
270 188 : tasks.push(task);
271 188 : }
272 156 : tasks
273 156 : }
274 :
275 : /// Test helper that marks an operation as completed, removing it from inprogress_tasks.
276 : ///
277 : /// TODO: the corresponding production logic should be moved from RemoteTimelineClient into
278 : /// UploadQueue, so we can use the same code path.
279 : #[cfg(test)]
280 116 : fn complete(&mut self, task_id: u64) {
281 116 : let Some(task) = self.inprogress_tasks.remove(&task_id) else {
282 0 : return;
283 : };
284 : // Update the clean index on uploads.
285 116 : if let UploadOp::UploadMetadata { ref uploaded } = task.op {
286 32 : if task.task_id > self.clean.1.unwrap_or_default() {
287 32 : self.clean = (*uploaded.clone(), Some(task.task_id));
288 32 : }
289 84 : }
290 116 : }
291 : }
292 :
293 : #[derive(Clone, Copy)]
294 : pub(super) enum SetDeletedFlagProgress {
295 : NotRunning,
296 : InProgress(NaiveDateTime),
297 : Successful(NaiveDateTime),
298 : }
299 :
300 : pub struct UploadQueueStoppedDeletable {
301 : pub(super) upload_queue_for_deletion: UploadQueueInitialized,
302 : pub(super) deleted_at: SetDeletedFlagProgress,
303 : }
304 :
305 : pub enum UploadQueueStopped {
306 : Deletable(UploadQueueStoppedDeletable),
307 : Uninitialized,
308 : }
309 :
310 : #[derive(thiserror::Error, Debug)]
311 : pub enum NotInitialized {
312 : #[error("queue is in state Uninitialized")]
313 : Uninitialized,
314 : #[error("queue is in state Stopped")]
315 : Stopped,
316 : #[error("queue is shutting down")]
317 : ShuttingDown,
318 : }
319 :
320 : impl NotInitialized {
321 0 : pub(crate) fn is_stopping(&self) -> bool {
322 : use NotInitialized::*;
323 0 : match self {
324 0 : Uninitialized => false,
325 0 : Stopped => true,
326 0 : ShuttingDown => true,
327 : }
328 0 : }
329 : }
330 :
331 : impl UploadQueue {
332 908 : pub fn initialize_empty_remote(
333 908 : &mut self,
334 908 : metadata: &TimelineMetadata,
335 908 : inprogress_limit: usize,
336 908 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
337 908 : match self {
338 908 : UploadQueue::Uninitialized => (),
339 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
340 0 : anyhow::bail!("already initialized, state {}", self.as_str())
341 : }
342 : }
343 :
344 908 : info!("initializing upload queue for empty remote");
345 :
346 908 : let index_part = IndexPart::empty(metadata.clone());
347 908 :
348 908 : let state = UploadQueueInitialized {
349 908 : inprogress_limit,
350 908 : dirty: index_part.clone(),
351 908 : clean: (index_part, None),
352 908 : latest_files_changes_since_metadata_upload_scheduled: 0,
353 908 : visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
354 908 : // what follows are boring default initializations
355 908 : task_counter: 0,
356 908 : inprogress_tasks: HashMap::new(),
357 908 : queued_operations: VecDeque::new(),
358 908 : #[cfg(feature = "testing")]
359 908 : dangling_files: HashMap::new(),
360 908 : recently_deleted: HashSet::new(),
361 908 : blocked_deletions: Vec::new(),
362 908 : shutting_down: false,
363 908 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
364 908 : };
365 908 :
366 908 : *self = UploadQueue::Initialized(state);
367 908 : Ok(self.initialized_mut().expect("we just set it"))
368 908 : }
369 :
370 44 : pub fn initialize_with_current_remote_index_part(
371 44 : &mut self,
372 44 : index_part: &IndexPart,
373 44 : inprogress_limit: usize,
374 44 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
375 44 : match self {
376 44 : UploadQueue::Uninitialized => (),
377 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
378 0 : anyhow::bail!("already initialized, state {}", self.as_str())
379 : }
380 : }
381 :
382 44 : info!(
383 0 : "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
384 0 : index_part.metadata.disk_consistent_lsn()
385 : );
386 :
387 44 : let state = UploadQueueInitialized {
388 44 : inprogress_limit,
389 44 : dirty: index_part.clone(),
390 44 : clean: (index_part.clone(), None),
391 44 : latest_files_changes_since_metadata_upload_scheduled: 0,
392 44 : visible_remote_consistent_lsn: Arc::new(
393 44 : index_part.metadata.disk_consistent_lsn().into(),
394 44 : ),
395 44 : // what follows are boring default initializations
396 44 : task_counter: 0,
397 44 : inprogress_tasks: HashMap::new(),
398 44 : queued_operations: VecDeque::new(),
399 44 : #[cfg(feature = "testing")]
400 44 : dangling_files: HashMap::new(),
401 44 : recently_deleted: HashSet::new(),
402 44 : blocked_deletions: Vec::new(),
403 44 : shutting_down: false,
404 44 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
405 44 : };
406 44 :
407 44 : *self = UploadQueue::Initialized(state);
408 44 : Ok(self.initialized_mut().expect("we just set it"))
409 44 : }
410 :
411 18402 : pub fn initialized_mut(&mut self) -> Result<&mut UploadQueueInitialized, NotInitialized> {
412 : use UploadQueue::*;
413 18402 : match self {
414 0 : Uninitialized => Err(NotInitialized::Uninitialized),
415 18402 : Initialized(x) => {
416 18402 : if x.shutting_down {
417 0 : Err(NotInitialized::ShuttingDown)
418 : } else {
419 18402 : Ok(x)
420 : }
421 : }
422 0 : Stopped(_) => Err(NotInitialized::Stopped),
423 : }
424 18402 : }
425 :
426 4 : pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
427 4 : match self {
428 : UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
429 0 : anyhow::bail!("queue is in state {}", self.as_str())
430 : }
431 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
432 0 : anyhow::bail!("queue is in state Stopped(Uninitialized)")
433 : }
434 4 : UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
435 : }
436 4 : }
437 : }
438 :
439 : /// An in-progress upload or delete task.
440 : #[derive(Debug)]
441 : pub struct UploadTask {
442 : /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
443 : pub task_id: u64,
444 : /// Number of task retries.
445 : pub retries: AtomicU32,
446 : /// The upload operation.
447 : pub op: UploadOp,
448 : /// Any upload operations that were coalesced into this operation. This typically happens with
449 : /// back-to-back index uploads, see `UploadQueueInitialized::next_ready()`.
450 : pub coalesced_ops: Vec<UploadOp>,
451 : }
452 :
453 : /// A deletion of some layers within the lifetime of a timeline. This is not used
454 : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
455 : #[derive(Debug, Clone)]
456 : pub struct Delete {
457 : pub layers: Vec<(LayerName, LayerFileMetadata)>,
458 : }
459 :
460 : #[derive(Clone, Debug)]
461 : pub enum UploadOp {
462 : /// Upload a layer file. The last field indicates the last operation for thie file.
463 : UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
464 :
465 : /// Upload a index_part.json file
466 : UploadMetadata {
467 : /// The next [`UploadQueueInitialized::clean`] after this upload succeeds.
468 : uploaded: Box<IndexPart>,
469 : },
470 :
471 : /// Delete layer files
472 : Delete(Delete),
473 :
474 : /// Barrier. When the barrier operation is reached, the channel is closed.
475 : Barrier(tokio::sync::watch::Sender<()>),
476 :
477 : /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
478 : /// this is the same as a Barrier.
479 : Shutdown,
480 : }
481 :
482 : impl std::fmt::Display for UploadOp {
483 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
484 0 : match self {
485 0 : UploadOp::UploadLayer(layer, metadata, mode) => {
486 0 : write!(
487 0 : f,
488 0 : "UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
489 0 : layer, metadata.file_size, metadata.generation, mode
490 0 : )
491 : }
492 0 : UploadOp::UploadMetadata { uploaded, .. } => {
493 0 : write!(
494 0 : f,
495 0 : "UploadMetadata(lsn: {})",
496 0 : uploaded.metadata.disk_consistent_lsn()
497 0 : )
498 : }
499 0 : UploadOp::Delete(delete) => {
500 0 : write!(f, "Delete({} layers)", delete.layers.len())
501 : }
502 0 : UploadOp::Barrier(_) => write!(f, "Barrier"),
503 0 : UploadOp::Shutdown => write!(f, "Shutdown"),
504 : }
505 0 : }
506 : }
507 :
508 : impl UploadOp {
509 : /// Returns true if self can bypass other, i.e. if the operations don't conflict. index is the
510 : /// active index when other would be uploaded -- if we allow self to bypass other, this would
511 : /// be the active index when self is uploaded.
512 1933048 : pub fn can_bypass(&self, other: &UploadOp, index: &IndexPart) -> bool {
513 1933048 : match (self, other) {
514 : // Nothing can bypass a barrier or shutdown, and it can't bypass anything.
515 4821 : (UploadOp::Barrier(_), _) | (_, UploadOp::Barrier(_)) => false,
516 16 : (UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,
517 :
518 : // Uploads and deletes can bypass each other unless they're for the same file.
519 40048 : (UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => {
520 40048 : let aname = &a.layer_desc().layer_name();
521 40048 : let bname = &b.layer_desc().layer_name();
522 40048 : !is_same_remote_layer_path(aname, ameta, bname, bmeta)
523 : }
524 81 : (UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d))
525 1854024 : | (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => {
526 1854133 : d.layers.iter().all(|(dname, dmeta)| {
527 1854133 : !is_same_remote_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta)
528 1854133 : })
529 : }
530 :
531 : // Deletes are idempotent and can always bypass each other.
532 4256 : (UploadOp::Delete(_), UploadOp::Delete(_)) => true,
533 :
534 : // Uploads and deletes can bypass an index upload as long as neither the uploaded index
535 : // nor the active index below it references the file. A layer can't be modified or
536 : // deleted while referenced by an index.
537 : //
538 : // Similarly, index uploads can bypass uploads and deletes as long as neither the
539 : // uploaded index nor the active index references the file (the latter would be
540 : // incorrect use by the caller).
541 172 : (UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i })
542 6559 : | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => {
543 6731 : let uname = u.layer_desc().layer_name();
544 6731 : !i.references(&uname, umeta) && !index.references(&uname, umeta)
545 : }
546 22699 : (UploadOp::Delete(d), UploadOp::UploadMetadata { uploaded: i })
547 31 : | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => {
548 22730 : d.layers.iter().all(|(dname, dmeta)| {
549 22730 : !i.references(dname, dmeta) && !index.references(dname, dmeta)
550 22730 : })
551 : }
552 :
553 : // Indexes can never bypass each other. They can coalesce though, and
554 : // `UploadQueue::next_ready()` currently does this when possible.
555 341 : (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false,
556 : }
557 1933048 : }
558 : }
559 :
560 : #[cfg(test)]
561 : mod tests {
562 : use std::str::FromStr as _;
563 :
564 : use itertools::Itertools as _;
565 : use utils::shard::{ShardCount, ShardIndex, ShardNumber};
566 :
567 : use super::*;
568 : use crate::DEFAULT_PG_VERSION;
569 : use crate::tenant::Timeline;
570 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
571 : use crate::tenant::storage_layer::Layer;
572 : use crate::tenant::storage_layer::layer::local_layer_path;
573 :
574 : /// Test helper which asserts that two operations are the same, in lieu of UploadOp PartialEq.
575 : #[track_caller]
576 196 : fn assert_same_op(a: &UploadOp, b: &UploadOp) {
577 : use UploadOp::*;
578 196 : match (a, b) {
579 88 : (UploadLayer(a, ameta, atype), UploadLayer(b, bmeta, btype)) => {
580 88 : assert_eq!(a.layer_desc().layer_name(), b.layer_desc().layer_name());
581 88 : assert_eq!(ameta, bmeta);
582 88 : assert_eq!(atype, btype);
583 : }
584 44 : (Delete(a), Delete(b)) => assert_eq!(a.layers, b.layers),
585 56 : (UploadMetadata { uploaded: a }, UploadMetadata { uploaded: b }) => assert_eq!(a, b),
586 8 : (Barrier(_), Barrier(_)) => {}
587 0 : (Shutdown, Shutdown) => {}
588 0 : (a, b) => panic!("{a:?} != {b:?}"),
589 : }
590 196 : }
591 :
592 : /// Test helper which asserts that two sets of operations are the same.
593 : #[track_caller]
594 44 : fn assert_same_ops<'a>(
595 44 : a: impl IntoIterator<Item = &'a UploadOp>,
596 44 : b: impl IntoIterator<Item = &'a UploadOp>,
597 44 : ) {
598 44 : a.into_iter()
599 44 : .zip_eq(b)
600 116 : .for_each(|(a, b)| assert_same_op(a, b))
601 44 : }
602 :
603 : /// Test helper to construct a test timeline.
604 : ///
605 : /// TODO: it really shouldn't be necessary to construct an entire tenant and timeline just to
606 : /// test the upload queue -- decouple ResidentLayer from Timeline.
607 : ///
608 : /// TODO: the upload queue uses TimelineMetadata::example() instead, because there's no way to
609 : /// obtain a TimelineMetadata from a Timeline.
610 48 : fn make_timeline() -> Arc<Timeline> {
611 48 : // Grab the current test name from the current thread name.
612 48 : // TODO: TenantHarness shouldn't take a &'static str, but just leak the test name for now.
613 48 : let test_name = std::thread::current().name().unwrap().to_string();
614 48 : let test_name = Box::leak(test_name.into_boxed_str());
615 48 :
616 48 : let runtime = tokio::runtime::Builder::new_current_thread()
617 48 : .enable_all()
618 48 : .build()
619 48 : .expect("failed to create runtime");
620 48 :
621 48 : runtime
622 48 : .block_on(async {
623 48 : let harness = TenantHarness::create(test_name).await?;
624 48 : let (tenant, ctx) = harness.load().await;
625 48 : tenant
626 48 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
627 48 : .await
628 48 : })
629 48 : .expect("failed to create timeline")
630 48 : }
631 :
632 : /// Test helper to construct an (empty) resident layer.
633 120 : fn make_layer(timeline: &Arc<Timeline>, name: &str) -> ResidentLayer {
634 120 : make_layer_with_size(timeline, name, 0)
635 120 : }
636 :
637 : /// Test helper to construct a resident layer with the given size.
638 132 : fn make_layer_with_size(timeline: &Arc<Timeline>, name: &str, size: usize) -> ResidentLayer {
639 132 : let metadata = LayerFileMetadata {
640 132 : generation: timeline.generation,
641 132 : shard: timeline.get_shard_index(),
642 132 : file_size: size as u64,
643 132 : };
644 132 : make_layer_with_metadata(timeline, name, metadata)
645 132 : }
646 :
647 : /// Test helper to construct a layer with the given metadata.
648 196 : fn make_layer_with_metadata(
649 196 : timeline: &Arc<Timeline>,
650 196 : name: &str,
651 196 : metadata: LayerFileMetadata,
652 196 : ) -> ResidentLayer {
653 196 : let name = LayerName::from_str(name).expect("invalid name");
654 196 : let local_path = local_layer_path(
655 196 : timeline.conf,
656 196 : &timeline.tenant_shard_id,
657 196 : &timeline.timeline_id,
658 196 : &name,
659 196 : &metadata.generation,
660 196 : );
661 196 : std::fs::write(&local_path, vec![0; metadata.file_size as usize])
662 196 : .expect("failed to write file");
663 196 : Layer::for_resident(timeline.conf, timeline, local_path, name, metadata)
664 196 : }
665 :
666 : /// Test helper to add a layer to an index and return a new index.
667 24 : fn index_with(index: &IndexPart, layer: &ResidentLayer) -> Box<IndexPart> {
668 24 : let mut index = index.clone();
669 24 : index
670 24 : .layer_metadata
671 24 : .insert(layer.layer_desc().layer_name(), layer.metadata());
672 24 : Box::new(index)
673 24 : }
674 :
675 : /// Test helper to remove a layer from an index and return a new index.
676 8 : fn index_without(index: &IndexPart, layer: &ResidentLayer) -> Box<IndexPart> {
677 8 : let mut index = index.clone();
678 8 : index
679 8 : .layer_metadata
680 8 : .remove(&layer.layer_desc().layer_name());
681 8 : Box::new(index)
682 8 : }
683 :
684 : /// Nothing can bypass a barrier, and it can't bypass inprogress tasks.
685 : #[test]
686 4 : fn schedule_barrier() -> anyhow::Result<()> {
687 4 : let mut queue = UploadQueue::Uninitialized;
688 4 : let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
689 4 : let tli = make_timeline();
690 4 :
691 4 : let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter
692 4 : let layer0 = make_layer(
693 4 : &tli,
694 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
695 4 : );
696 4 : let layer1 = make_layer(
697 4 : &tli,
698 4 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
699 4 : );
700 4 : let layer2 = make_layer(
701 4 : &tli,
702 4 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
703 4 : );
704 4 : let layer3 = make_layer(
705 4 : &tli,
706 4 : "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
707 4 : );
708 4 : let (barrier, _) = tokio::sync::watch::channel(());
709 4 :
710 4 : // Enqueue non-conflicting upload, delete, and index before and after a barrier.
711 4 : let ops = [
712 4 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
713 4 : UploadOp::Delete(Delete {
714 4 : layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
715 4 : }),
716 4 : UploadOp::UploadMetadata {
717 4 : uploaded: index.clone(),
718 4 : },
719 4 : UploadOp::Barrier(barrier),
720 4 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
721 4 : UploadOp::Delete(Delete {
722 4 : layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
723 4 : }),
724 4 : UploadOp::UploadMetadata {
725 4 : uploaded: index.clone(),
726 4 : },
727 4 : ];
728 4 :
729 4 : queue.queued_operations.extend(ops.clone());
730 4 :
731 4 : // Schedule the initial operations ahead of the barrier.
732 4 : let tasks = queue.schedule_ready();
733 4 :
734 12 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..3]);
735 4 : assert!(matches!(
736 4 : queue.queued_operations.front(),
737 : Some(&UploadOp::Barrier(_))
738 : ));
739 :
740 : // Complete the initial operations. The barrier isn't scheduled while they're pending.
741 16 : for task in tasks {
742 12 : assert!(queue.schedule_ready().is_empty());
743 12 : queue.complete(task.task_id);
744 : }
745 :
746 : // Schedule the barrier. The later tasks won't schedule until it completes.
747 4 : let tasks = queue.schedule_ready();
748 4 :
749 4 : assert_eq!(tasks.len(), 1);
750 4 : assert!(matches!(tasks[0].op, UploadOp::Barrier(_)));
751 4 : assert_eq!(queue.queued_operations.len(), 3);
752 :
753 : // Complete the barrier. The rest of the tasks schedule immediately.
754 4 : queue.complete(tasks[0].task_id);
755 4 :
756 4 : let tasks = queue.schedule_ready();
757 12 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[4..]);
758 4 : assert!(queue.queued_operations.is_empty());
759 :
760 4 : Ok(())
761 4 : }
762 :
763 : /// Deletes can be scheduled in parallel, even if they're for the same file.
764 : #[test]
765 4 : fn schedule_delete_parallel() -> anyhow::Result<()> {
766 4 : let mut queue = UploadQueue::Uninitialized;
767 4 : let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
768 4 : let tli = make_timeline();
769 4 :
770 4 : // Enqueue a bunch of deletes, some with conflicting names.
771 4 : let layer0 = make_layer(
772 4 : &tli,
773 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
774 4 : );
775 4 : let layer1 = make_layer(
776 4 : &tli,
777 4 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
778 4 : );
779 4 : let layer2 = make_layer(
780 4 : &tli,
781 4 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
782 4 : );
783 4 : let layer3 = make_layer(
784 4 : &tli,
785 4 : "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
786 4 : );
787 4 :
788 4 : let ops = [
789 4 : UploadOp::Delete(Delete {
790 4 : layers: vec![(layer0.layer_desc().layer_name(), layer0.metadata())],
791 4 : }),
792 4 : UploadOp::Delete(Delete {
793 4 : layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
794 4 : }),
795 4 : UploadOp::Delete(Delete {
796 4 : layers: vec![
797 4 : (layer1.layer_desc().layer_name(), layer1.metadata()),
798 4 : (layer2.layer_desc().layer_name(), layer2.metadata()),
799 4 : ],
800 4 : }),
801 4 : UploadOp::Delete(Delete {
802 4 : layers: vec![(layer2.layer_desc().layer_name(), layer2.metadata())],
803 4 : }),
804 4 : UploadOp::Delete(Delete {
805 4 : layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
806 4 : }),
807 4 : ];
808 4 :
809 4 : queue.queued_operations.extend(ops.clone());
810 4 :
811 4 : // Schedule all ready operations. Since deletes don't conflict, they're all scheduled.
812 4 : let tasks = queue.schedule_ready();
813 4 :
814 20 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops);
815 4 : assert!(queue.queued_operations.is_empty());
816 :
817 4 : Ok(())
818 4 : }
819 :
820 : /// Conflicting uploads are serialized.
821 : #[test]
822 4 : fn schedule_upload_conflicts() -> anyhow::Result<()> {
823 4 : let mut queue = UploadQueue::Uninitialized;
824 4 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
825 4 : let tli = make_timeline();
826 4 :
827 4 : // Enqueue three versions of the same layer, with different file sizes.
828 4 : let layer0a = make_layer_with_size(
829 4 : &tli,
830 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
831 4 : 1,
832 4 : );
833 4 : let layer0b = make_layer_with_size(
834 4 : &tli,
835 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
836 4 : 2,
837 4 : );
838 4 : let layer0c = make_layer_with_size(
839 4 : &tli,
840 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
841 4 : 3,
842 4 : );
843 4 :
844 4 : let ops = [
845 4 : UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None),
846 4 : UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None),
847 4 : UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None),
848 4 : ];
849 4 :
850 4 : queue.queued_operations.extend(ops.clone());
851 :
852 : // Only one version should be scheduled and uploaded at a time.
853 16 : for op in ops {
854 12 : let tasks = queue.schedule_ready();
855 12 : assert_eq!(tasks.len(), 1);
856 12 : assert_same_op(&tasks[0].op, &op);
857 12 : queue.complete(tasks[0].task_id);
858 : }
859 4 : assert!(queue.schedule_ready().is_empty());
860 4 : assert!(queue.queued_operations.is_empty());
861 :
862 4 : Ok(())
863 4 : }
864 :
865 : /// Conflicting uploads and deletes are serialized.
866 : #[test]
867 4 : fn schedule_upload_delete_conflicts() -> anyhow::Result<()> {
868 4 : let mut queue = UploadQueue::Uninitialized;
869 4 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
870 4 : let tli = make_timeline();
871 4 :
872 4 : // Enqueue two layer uploads, with a delete of both layers in between them. These should be
873 4 : // scheduled one at a time, since deletes can't bypass uploads and vice versa.
874 4 : let layer0 = make_layer(
875 4 : &tli,
876 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
877 4 : );
878 4 : let layer1 = make_layer(
879 4 : &tli,
880 4 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
881 4 : );
882 4 :
883 4 : let ops = [
884 4 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
885 4 : UploadOp::Delete(Delete {
886 4 : layers: vec![
887 4 : (layer0.layer_desc().layer_name(), layer0.metadata()),
888 4 : (layer1.layer_desc().layer_name(), layer1.metadata()),
889 4 : ],
890 4 : }),
891 4 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
892 4 : ];
893 4 :
894 4 : queue.queued_operations.extend(ops.clone());
895 :
896 : // Only one version should be scheduled and uploaded at a time.
897 16 : for op in ops {
898 12 : let tasks = queue.schedule_ready();
899 12 : assert_eq!(tasks.len(), 1);
900 12 : assert_same_op(&tasks[0].op, &op);
901 12 : queue.complete(tasks[0].task_id);
902 : }
903 4 : assert!(queue.schedule_ready().is_empty());
904 4 : assert!(queue.queued_operations.is_empty());
905 :
906 4 : Ok(())
907 4 : }
908 :
909 : /// Non-conflicting uploads and deletes can bypass the queue, avoiding the conflicting
910 : /// delete/upload operations at the head of the queue.
911 : #[test]
912 4 : fn schedule_upload_delete_conflicts_bypass() -> anyhow::Result<()> {
913 4 : let mut queue = UploadQueue::Uninitialized;
914 4 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
915 4 : let tli = make_timeline();
916 4 :
917 4 : // Enqueue two layer uploads, with a delete of both layers in between them. These should be
918 4 : // scheduled one at a time, since deletes can't bypass uploads and vice versa.
919 4 : //
920 4 : // Also enqueue non-conflicting uploads and deletes at the end. These can bypass the queue
921 4 : // and run immediately.
922 4 : let layer0 = make_layer(
923 4 : &tli,
924 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
925 4 : );
926 4 : let layer1 = make_layer(
927 4 : &tli,
928 4 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
929 4 : );
930 4 : let layer2 = make_layer(
931 4 : &tli,
932 4 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
933 4 : );
934 4 : let layer3 = make_layer(
935 4 : &tli,
936 4 : "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
937 4 : );
938 4 :
939 4 : let ops = [
940 4 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
941 4 : UploadOp::Delete(Delete {
942 4 : layers: vec![
943 4 : (layer0.layer_desc().layer_name(), layer0.metadata()),
944 4 : (layer1.layer_desc().layer_name(), layer1.metadata()),
945 4 : ],
946 4 : }),
947 4 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
948 4 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
949 4 : UploadOp::Delete(Delete {
950 4 : layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
951 4 : }),
952 4 : ];
953 4 :
954 4 : queue.queued_operations.extend(ops.clone());
955 4 :
956 4 : // Operations 0, 3, and 4 are scheduled immediately.
957 4 : let tasks = queue.schedule_ready();
958 12 : assert_same_ops(tasks.iter().map(|t| &t.op), [&ops[0], &ops[3], &ops[4]]);
959 4 : assert_eq!(queue.queued_operations.len(), 2);
960 :
961 4 : Ok(())
962 4 : }
963 :
964 : /// Non-conflicting uploads are parallelized.
965 : #[test]
966 4 : fn schedule_upload_parallel() -> anyhow::Result<()> {
967 4 : let mut queue = UploadQueue::Uninitialized;
968 4 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
969 4 : let tli = make_timeline();
970 4 :
971 4 : // Enqueue three different layer uploads.
972 4 : let layer0 = make_layer(
973 4 : &tli,
974 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
975 4 : );
976 4 : let layer1 = make_layer(
977 4 : &tli,
978 4 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
979 4 : );
980 4 : let layer2 = make_layer(
981 4 : &tli,
982 4 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
983 4 : );
984 4 :
985 4 : let ops = [
986 4 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
987 4 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
988 4 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
989 4 : ];
990 4 :
991 4 : queue.queued_operations.extend(ops.clone());
992 4 :
993 4 : // All uploads should be scheduled concurrently.
994 4 : let tasks = queue.schedule_ready();
995 4 :
996 12 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops);
997 4 : assert!(queue.queued_operations.is_empty());
998 :
999 4 : Ok(())
1000 4 : }
1001 :
1002 : /// Index uploads are coalesced.
1003 : #[test]
1004 4 : fn schedule_index_coalesce() -> anyhow::Result<()> {
1005 4 : let mut queue = UploadQueue::Uninitialized;
1006 4 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
1007 :
1008 : // Enqueue three uploads of the current empty index.
1009 4 : let index = Box::new(queue.clean.0.clone());
1010 4 :
1011 4 : let ops = [
1012 4 : UploadOp::UploadMetadata {
1013 4 : uploaded: index.clone(),
1014 4 : },
1015 4 : UploadOp::UploadMetadata {
1016 4 : uploaded: index.clone(),
1017 4 : },
1018 4 : UploadOp::UploadMetadata {
1019 4 : uploaded: index.clone(),
1020 4 : },
1021 4 : ];
1022 4 :
1023 4 : queue.queued_operations.extend(ops.clone());
1024 4 :
1025 4 : // The index uploads are coalesced into a single operation.
1026 4 : let tasks = queue.schedule_ready();
1027 4 : assert_eq!(tasks.len(), 1);
1028 4 : assert_same_op(&tasks[0].op, &ops[2]);
1029 4 : assert_same_ops(&tasks[0].coalesced_ops, &ops[0..2]);
1030 4 :
1031 4 : assert!(queue.queued_operations.is_empty());
1032 :
1033 4 : Ok(())
1034 4 : }
1035 :
1036 : /// Chains of upload/index operations lead to parallel layer uploads and serial index uploads.
1037 : /// This is the common case with layer flushes.
1038 : #[test]
1039 4 : fn schedule_index_upload_chain() -> anyhow::Result<()> {
1040 4 : let mut queue = UploadQueue::Uninitialized;
1041 4 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
1042 4 : let tli = make_timeline();
1043 4 :
1044 4 : // Enqueue three uploads of the current empty index.
1045 4 : let index = Box::new(queue.clean.0.clone());
1046 4 : let layer0 = make_layer(
1047 4 : &tli,
1048 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1049 4 : );
1050 4 : let index0 = index_with(&index, &layer0);
1051 4 : let layer1 = make_layer(
1052 4 : &tli,
1053 4 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1054 4 : );
1055 4 : let index1 = index_with(&index0, &layer1);
1056 4 : let layer2 = make_layer(
1057 4 : &tli,
1058 4 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1059 4 : );
1060 4 : let index2 = index_with(&index1, &layer2);
1061 4 :
1062 4 : let ops = [
1063 4 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
1064 4 : UploadOp::UploadMetadata {
1065 4 : uploaded: index0.clone(),
1066 4 : },
1067 4 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
1068 4 : UploadOp::UploadMetadata {
1069 4 : uploaded: index1.clone(),
1070 4 : },
1071 4 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
1072 4 : UploadOp::UploadMetadata {
1073 4 : uploaded: index2.clone(),
1074 4 : },
1075 4 : ];
1076 4 :
1077 4 : queue.queued_operations.extend(ops.clone());
1078 4 :
1079 4 : // The layer uploads should be scheduled immediately. The indexes must wait.
1080 4 : let upload_tasks = queue.schedule_ready();
1081 4 : assert_same_ops(
1082 12 : upload_tasks.iter().map(|t| &t.op),
1083 4 : [&ops[0], &ops[2], &ops[4]],
1084 4 : );
1085 4 :
1086 4 : // layer2 completes first. None of the indexes can upload yet.
1087 4 : queue.complete(upload_tasks[2].task_id);
1088 4 : assert!(queue.schedule_ready().is_empty());
1089 :
1090 : // layer0 completes. index0 can upload. It completes.
1091 4 : queue.complete(upload_tasks[0].task_id);
1092 4 : let index_tasks = queue.schedule_ready();
1093 4 : assert_eq!(index_tasks.len(), 1);
1094 4 : assert_same_op(&index_tasks[0].op, &ops[1]);
1095 4 : queue.complete(index_tasks[0].task_id);
1096 4 :
1097 4 : // layer 1 completes. This unblocks index 1 and 2, which coalesce into
1098 4 : // a single upload for index 2.
1099 4 : queue.complete(upload_tasks[1].task_id);
1100 4 :
1101 4 : let index_tasks = queue.schedule_ready();
1102 4 : assert_eq!(index_tasks.len(), 1);
1103 4 : assert_same_op(&index_tasks[0].op, &ops[5]);
1104 4 : assert_same_ops(&index_tasks[0].coalesced_ops, &ops[3..4]);
1105 4 :
1106 4 : assert!(queue.queued_operations.is_empty());
1107 :
1108 4 : Ok(())
1109 4 : }
1110 :
1111 : /// A delete can't bypass an index upload if an index ahead of it still references it.
1112 : #[test]
1113 4 : fn schedule_index_delete_dereferenced() -> anyhow::Result<()> {
1114 4 : let mut queue = UploadQueue::Uninitialized;
1115 4 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
1116 4 : let tli = make_timeline();
1117 4 :
1118 4 : // Create a layer to upload.
1119 4 : let layer = make_layer(
1120 4 : &tli,
1121 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1122 4 : );
1123 4 : let index_upload = index_with(&queue.clean.0, &layer);
1124 4 :
1125 4 : // Remove the layer reference in a new index, then delete the layer.
1126 4 : let index_deref = index_without(&index_upload, &layer);
1127 4 :
1128 4 : let ops = [
1129 4 : // Initial upload, with a barrier to prevent index coalescing.
1130 4 : UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
1131 4 : UploadOp::UploadMetadata {
1132 4 : uploaded: index_upload.clone(),
1133 4 : },
1134 4 : UploadOp::Barrier(tokio::sync::watch::channel(()).0),
1135 4 : // Dereference the layer and delete it.
1136 4 : UploadOp::UploadMetadata {
1137 4 : uploaded: index_deref.clone(),
1138 4 : },
1139 4 : UploadOp::Delete(Delete {
1140 4 : layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
1141 4 : }),
1142 4 : ];
1143 4 :
1144 4 : queue.queued_operations.extend(ops.clone());
1145 :
1146 : // Operations are serialized.
1147 24 : for op in ops {
1148 20 : let tasks = queue.schedule_ready();
1149 20 : assert_eq!(tasks.len(), 1);
1150 20 : assert_same_op(&tasks[0].op, &op);
1151 20 : queue.complete(tasks[0].task_id);
1152 : }
1153 4 : assert!(queue.queued_operations.is_empty());
1154 :
1155 4 : Ok(())
1156 4 : }
1157 :
1158 : /// An upload with a reused layer name doesn't clobber the previous layer. Specifically, a
1159 : /// dereference/upload/reference cycle can't allow the upload to bypass the reference.
1160 : #[test]
1161 4 : fn schedule_index_upload_dereferenced() -> anyhow::Result<()> {
1162 4 : let mut queue = UploadQueue::Uninitialized;
1163 4 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
1164 4 : let tli = make_timeline();
1165 4 :
1166 4 : // Create a layer to upload.
1167 4 : let layer = make_layer(
1168 4 : &tli,
1169 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1170 4 : );
1171 4 :
1172 4 : // Upload the layer. Then dereference the layer, and upload/reference it again.
1173 4 : let index_upload = index_with(&queue.clean.0, &layer);
1174 4 : let index_deref = index_without(&index_upload, &layer);
1175 4 : let index_ref = index_with(&index_deref, &layer);
1176 4 :
1177 4 : let ops = [
1178 4 : // Initial upload, with a barrier to prevent index coalescing.
1179 4 : UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
1180 4 : UploadOp::UploadMetadata {
1181 4 : uploaded: index_upload.clone(),
1182 4 : },
1183 4 : UploadOp::Barrier(tokio::sync::watch::channel(()).0),
1184 4 : // Dereference the layer.
1185 4 : UploadOp::UploadMetadata {
1186 4 : uploaded: index_deref.clone(),
1187 4 : },
1188 4 : // Replace and reference the layer.
1189 4 : UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
1190 4 : UploadOp::UploadMetadata {
1191 4 : uploaded: index_ref.clone(),
1192 4 : },
1193 4 : ];
1194 4 :
1195 4 : queue.queued_operations.extend(ops.clone());
1196 :
1197 : // Operations are serialized.
1198 28 : for op in ops {
1199 24 : let tasks = queue.schedule_ready();
1200 24 : assert_eq!(tasks.len(), 1);
1201 24 : assert_same_op(&tasks[0].op, &op);
1202 24 : queue.complete(tasks[0].task_id);
1203 : }
1204 4 : assert!(queue.queued_operations.is_empty());
1205 :
1206 4 : Ok(())
1207 4 : }
1208 :
1209 : /// Nothing can bypass a shutdown, and it waits for inprogress tasks. It's never returned from
1210 : /// next_ready(), but is left at the head of the queue.
1211 : #[test]
1212 4 : fn schedule_shutdown() -> anyhow::Result<()> {
1213 4 : let mut queue = UploadQueue::Uninitialized;
1214 4 : let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
1215 4 : let tli = make_timeline();
1216 4 :
1217 4 : let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter
1218 4 : let layer0 = make_layer(
1219 4 : &tli,
1220 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1221 4 : );
1222 4 : let layer1 = make_layer(
1223 4 : &tli,
1224 4 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1225 4 : );
1226 4 : let layer2 = make_layer(
1227 4 : &tli,
1228 4 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1229 4 : );
1230 4 : let layer3 = make_layer(
1231 4 : &tli,
1232 4 : "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1233 4 : );
1234 4 :
1235 4 : // Enqueue non-conflicting upload, delete, and index before and after a shutdown.
1236 4 : let ops = [
1237 4 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
1238 4 : UploadOp::Delete(Delete {
1239 4 : layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
1240 4 : }),
1241 4 : UploadOp::UploadMetadata {
1242 4 : uploaded: index.clone(),
1243 4 : },
1244 4 : UploadOp::Shutdown,
1245 4 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
1246 4 : UploadOp::Delete(Delete {
1247 4 : layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
1248 4 : }),
1249 4 : UploadOp::UploadMetadata {
1250 4 : uploaded: index.clone(),
1251 4 : },
1252 4 : ];
1253 4 :
1254 4 : queue.queued_operations.extend(ops.clone());
1255 4 :
1256 4 : // Schedule the initial operations ahead of the shutdown.
1257 4 : let tasks = queue.schedule_ready();
1258 4 :
1259 12 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..3]);
1260 4 : assert!(matches!(
1261 4 : queue.queued_operations.front(),
1262 : Some(&UploadOp::Shutdown)
1263 : ));
1264 :
1265 : // Complete the initial operations. The shutdown isn't triggered while they're pending.
1266 16 : for task in tasks {
1267 12 : assert!(queue.schedule_ready().is_empty());
1268 12 : queue.complete(task.task_id);
1269 : }
1270 :
1271 : // The shutdown is triggered the next time we try to pull an operation. It isn't returned,
1272 : // but is left in the queue.
1273 4 : assert!(!queue.shutdown_ready.is_closed());
1274 4 : assert!(queue.next_ready().is_none());
1275 4 : assert!(queue.shutdown_ready.is_closed());
1276 :
1277 4 : Ok(())
1278 4 : }
1279 :
1280 : /// Scheduling respects inprogress_limit.
1281 : #[test]
1282 4 : fn schedule_inprogress_limit() -> anyhow::Result<()> {
1283 4 : // Create a queue with inprogress_limit=2.
1284 4 : let mut queue = UploadQueue::Uninitialized;
1285 4 : let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 2)?;
1286 4 : let tli = make_timeline();
1287 4 :
1288 4 : // Enqueue a bunch of uploads.
1289 4 : let layer0 = make_layer(
1290 4 : &tli,
1291 4 : "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1292 4 : );
1293 4 : let layer1 = make_layer(
1294 4 : &tli,
1295 4 : "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1296 4 : );
1297 4 : let layer2 = make_layer(
1298 4 : &tli,
1299 4 : "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1300 4 : );
1301 4 : let layer3 = make_layer(
1302 4 : &tli,
1303 4 : "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
1304 4 : );
1305 4 :
1306 4 : let ops = [
1307 4 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
1308 4 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
1309 4 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
1310 4 : UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None),
1311 4 : ];
1312 4 :
1313 4 : queue.queued_operations.extend(ops.clone());
1314 4 :
1315 4 : // Schedule all ready operations. Only 2 are scheduled.
1316 4 : let tasks = queue.schedule_ready();
1317 8 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..2]);
1318 4 : assert!(queue.next_ready().is_none());
1319 :
1320 : // When one completes, another is scheduled.
1321 4 : queue.complete(tasks[0].task_id);
1322 4 : let tasks = queue.schedule_ready();
1323 4 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[2..3]);
1324 4 :
1325 4 : Ok(())
1326 4 : }
1327 :
1328 : /// Tests that can_bypass takes name, generation and shard index into account for all operations.
1329 : #[test]
1330 4 : fn can_bypass_path() -> anyhow::Result<()> {
1331 4 : let tli = make_timeline();
1332 4 :
1333 4 : let name0 = &"000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51";
1334 4 : let name1 = &"100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51";
1335 :
1336 : // Asserts that layers a and b either can or can't bypass each other, for all combinations
1337 : // of operations (except Delete and UploadMetadata which are special-cased).
1338 : #[track_caller]
1339 32 : fn assert_can_bypass(a: ResidentLayer, b: ResidentLayer, can_bypass: bool) {
1340 32 : let index = IndexPart::empty(TimelineMetadata::example());
1341 96 : for (a, b) in make_ops(a).into_iter().zip(make_ops(b)) {
1342 96 : match (&a, &b) {
1343 : // Deletes can always bypass each other.
1344 32 : (UploadOp::Delete(_), UploadOp::Delete(_)) => assert!(a.can_bypass(&b, &index)),
1345 : // Indexes can never bypass each other.
1346 : (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => {
1347 32 : assert!(!a.can_bypass(&b, &index))
1348 : }
1349 : // For other operations, assert as requested.
1350 32 : (a, b) => assert_eq!(a.can_bypass(b, &index), can_bypass),
1351 : }
1352 : }
1353 32 : }
1354 :
1355 64 : fn make_ops(layer: ResidentLayer) -> Vec<UploadOp> {
1356 64 : let mut index = IndexPart::empty(TimelineMetadata::example());
1357 64 : index
1358 64 : .layer_metadata
1359 64 : .insert(layer.layer_desc().layer_name(), layer.metadata());
1360 64 : vec![
1361 64 : UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
1362 64 : UploadOp::Delete(Delete {
1363 64 : layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
1364 64 : }),
1365 64 : UploadOp::UploadMetadata {
1366 64 : uploaded: Box::new(index),
1367 64 : },
1368 64 : ]
1369 64 : }
1370 :
1371 : // Makes a ResidentLayer.
1372 64 : let layer = |name: &'static str, shard: Option<u8>, generation: u32| -> ResidentLayer {
1373 64 : let shard = shard
1374 64 : .map(|n| ShardIndex::new(ShardNumber(n), ShardCount(8)))
1375 64 : .unwrap_or(ShardIndex::unsharded());
1376 64 : let metadata = LayerFileMetadata {
1377 64 : shard,
1378 64 : generation: Generation::Valid(generation),
1379 64 : file_size: 0,
1380 64 : };
1381 64 : make_layer_with_metadata(&tli, name, metadata)
1382 64 : };
1383 :
1384 : // Same name and metadata can't bypass. This goes both for unsharded and sharded, as well as
1385 : // 0 or >0 generation.
1386 4 : assert_can_bypass(layer(name0, None, 0), layer(name0, None, 0), false);
1387 4 : assert_can_bypass(layer(name0, Some(0), 0), layer(name0, Some(0), 0), false);
1388 4 : assert_can_bypass(layer(name0, None, 1), layer(name0, None, 1), false);
1389 4 :
1390 4 : // Different names can bypass.
1391 4 : assert_can_bypass(layer(name0, None, 0), layer(name1, None, 0), true);
1392 4 :
1393 4 : // Different shards can bypass. Shard 0 is different from unsharded.
1394 4 : assert_can_bypass(layer(name0, Some(0), 0), layer(name0, Some(1), 0), true);
1395 4 : assert_can_bypass(layer(name0, Some(0), 0), layer(name0, None, 0), true);
1396 4 :
1397 4 : // Different generations can bypass, both sharded and unsharded.
1398 4 : assert_can_bypass(layer(name0, None, 0), layer(name0, None, 1), true);
1399 4 : assert_can_bypass(layer(name0, Some(1), 0), layer(name0, Some(1), 1), true);
1400 4 :
1401 4 : Ok(())
1402 4 : }
1403 : }
|