Line data Source code
1 : use std::collections::{HashMap, HashSet, VecDeque};
2 : use std::fmt::Debug;
3 : use std::sync::atomic::AtomicU32;
4 : use std::sync::Arc;
5 :
6 : use super::remote_timeline_client::is_same_remote_layer_path;
7 : use super::storage_layer::AsLayerDesc as _;
8 : use super::storage_layer::LayerName;
9 : use super::storage_layer::ResidentLayer;
10 : use crate::tenant::metadata::TimelineMetadata;
11 : use crate::tenant::remote_timeline_client::index::IndexPart;
12 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
13 : use utils::generation::Generation;
14 : use utils::lsn::{AtomicLsn, Lsn};
15 :
16 : use chrono::NaiveDateTime;
17 : use once_cell::sync::Lazy;
18 : use tracing::info;
19 :
20 : /// Kill switch for upload queue reordering in case it causes problems.
21 : /// TODO: remove this once we have confidence in it.
22 : static DISABLE_UPLOAD_QUEUE_REORDERING: Lazy<bool> =
23 192 : Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_REORDERING").as_deref() == Ok("true"));
24 :
25 : /// Kill switch for index upload coalescing in case it causes problems.
26 : /// TODO: remove this once we have confidence in it.
27 : static DISABLE_UPLOAD_QUEUE_INDEX_COALESCING: Lazy<bool> =
28 8 : Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_INDEX_COALESCING").as_deref() == Ok("true"));
29 :
30 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
31 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
32 : // that many upload queues in a running pageserver, and most of them are initialized
33 : // anyway.
34 : #[allow(clippy::large_enum_variant)]
35 : pub enum UploadQueue {
36 : Uninitialized,
37 : Initialized(UploadQueueInitialized),
38 : Stopped(UploadQueueStopped),
39 : }
40 :
41 : impl UploadQueue {
42 0 : pub fn as_str(&self) -> &'static str {
43 0 : match self {
44 0 : UploadQueue::Uninitialized => "Uninitialized",
45 0 : UploadQueue::Initialized(_) => "Initialized",
46 0 : UploadQueue::Stopped(_) => "Stopped",
47 : }
48 0 : }
49 : }
50 :
51 : #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
52 : pub enum OpType {
53 : MayReorder,
54 : FlushDeletion,
55 : }
56 :
57 : /// This keeps track of queued and in-progress tasks.
58 : pub struct UploadQueueInitialized {
59 : /// Maximum number of inprogress tasks to schedule. 0 is no limit.
60 : pub(crate) inprogress_limit: usize,
61 :
62 : /// Counter to assign task IDs
63 : pub(crate) task_counter: u64,
64 :
65 : /// The next uploaded index_part.json; assumed to be dirty.
66 : ///
67 : /// Should not be read, directly except for layer file updates. Instead you should add a
68 : /// projected field.
69 : pub(crate) dirty: IndexPart,
70 :
71 : /// The latest remote persisted IndexPart.
72 : ///
73 : /// Each completed metadata upload will update this. The second item is the task_id which last
74 : /// updated the value, used to ensure we never store an older value over a newer one.
75 : pub(crate) clean: (IndexPart, Option<u64>),
76 :
77 : /// How many file uploads or deletions been scheduled, since the
78 : /// last (scheduling of) metadata index upload?
79 : pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
80 :
81 : /// The Lsn is only updated after our generation has been validated with
82 : /// the control plane (unlesss a timeline's generation is None, in which case
83 : /// we skip validation)
84 : pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
85 :
86 : /// Tasks that are currently in-progress. In-progress means that a tokio Task
87 : /// has been launched for it. An in-progress task can be busy uploading, but it can
88 : /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
89 : /// be waiting for retry in `exponential_backoff`.
90 : pub inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
91 :
92 : /// Queued operations that have not been launched yet. They might depend on previous
93 : /// tasks to finish. For example, metadata upload cannot be performed before all
94 : /// preceding layer file uploads have completed.
95 : pub queued_operations: VecDeque<UploadOp>,
96 :
97 : /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
98 : /// for error logging.
99 : ///
100 : /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
101 : /// bug causing leaks, then it's better to not leave this enabled for production builds.
102 : #[cfg(feature = "testing")]
103 : pub(crate) dangling_files: HashMap<LayerName, Generation>,
104 :
105 : /// Ensure we order file operations correctly.
106 : pub(crate) recently_deleted: HashSet<(LayerName, Generation)>,
107 :
108 : /// Deletions that are blocked by the tenant configuration
109 : pub(crate) blocked_deletions: Vec<Delete>,
110 :
111 : /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
112 : pub(crate) shutting_down: bool,
113 :
114 : /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
115 : /// wait on until one of them stops the queue. The semaphore is closed when
116 : /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
117 : pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
118 : }
119 :
120 : impl UploadQueueInitialized {
121 8 : pub(super) fn no_pending_work(&self) -> bool {
122 8 : self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
123 8 : }
124 :
125 0 : pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
126 0 : self.visible_remote_consistent_lsn.load()
127 0 : }
128 :
129 0 : pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
130 0 : let lsn = self.clean.0.metadata.disk_consistent_lsn();
131 0 : self.clean.1.map(|_| lsn)
132 0 : }
133 :
134 : /// Returns and removes the next ready operation from the queue, if any. This isn't necessarily
135 : /// the first operation in the queue, to avoid head-of-line blocking -- an operation can jump
136 : /// the queue if it doesn't conflict with operations ahead of it.
137 : ///
138 : /// Also returns any operations that were coalesced into this one, e.g. multiple index uploads.
139 : ///
140 : /// None may be returned even if the queue isn't empty, if no operations are ready yet.
141 : ///
142 : /// NB: this is quadratic, but queues are expected to be small, and bounded by inprogress_limit.
143 12964 : pub fn next_ready(&mut self) -> Option<(UploadOp, Vec<UploadOp>)> {
144 12964 : // If inprogress_tasks is already at limit, don't schedule anything more.
145 12964 : if self.inprogress_limit > 0 && self.inprogress_tasks.len() >= self.inprogress_limit {
146 6 : return None;
147 12958 : }
148 :
149 25429 : for (i, candidate) in self.queued_operations.iter().enumerate() {
150 : // If this candidate is ready, go for it. Otherwise, try the next one.
151 25429 : if self.is_ready(i) {
152 : // Shutdown operations are left at the head of the queue, to prevent further
153 : // operations from starting. Signal that we're ready to shut down.
154 4924 : if matches!(candidate, UploadOp::Shutdown) {
155 10 : assert!(self.inprogress_tasks.is_empty(), "shutdown with tasks");
156 10 : assert_eq!(i, 0, "shutdown not at head of queue");
157 10 : self.shutdown_ready.close();
158 10 : return None;
159 4914 : }
160 4914 :
161 4914 : let mut op = self.queued_operations.remove(i).expect("i can't disappear");
162 4914 :
163 4914 : // Coalesce any back-to-back index uploads by only uploading the newest one that's
164 4914 : // ready. This typically happens with layer/index/layer/index/... sequences, where
165 4914 : // the layers bypass the indexes, leaving the indexes queued.
166 4914 : //
167 4914 : // If other operations are interleaved between index uploads we don't try to
168 4914 : // coalesce them, since we may as well update the index concurrently with them.
169 4914 : // This keeps the index fresh and avoids starvation.
170 4914 : //
171 4914 : // NB: we assume that all uploaded indexes have the same remote path. This
172 4914 : // is true at the time of writing: the path only depends on the tenant,
173 4914 : // timeline and generation, all of which are static for a timeline instance.
174 4914 : // Otherwise, we must be careful not to coalesce different paths.
175 4914 : let mut coalesced_ops = Vec::new();
176 4914 : if matches!(op, UploadOp::UploadMetadata { .. }) {
177 1533 : while let Some(UploadOp::UploadMetadata { .. }) = self.queued_operations.get(i)
178 : {
179 16 : if *DISABLE_UPLOAD_QUEUE_INDEX_COALESCING {
180 0 : break;
181 16 : }
182 16 : if !self.is_ready(i) {
183 6 : break;
184 10 : }
185 10 : coalesced_ops.push(op);
186 10 : op = self.queued_operations.remove(i).expect("i can't disappear");
187 : }
188 3391 : }
189 :
190 4914 : return Some((op, coalesced_ops));
191 20505 : }
192 :
193 : // Nothing can bypass a barrier or shutdown. If it wasn't scheduled above, give up.
194 20505 : if matches!(candidate, UploadOp::Barrier(_) | UploadOp::Shutdown) {
195 2392 : return None;
196 18113 : }
197 18113 :
198 18113 : // If upload queue reordering is disabled, bail out after the first operation.
199 18113 : if *DISABLE_UPLOAD_QUEUE_REORDERING {
200 0 : return None;
201 18113 : }
202 : }
203 5642 : None
204 12964 : }
205 :
206 : /// Returns true if the queued operation at the given position is ready to be uploaded, i.e. if
207 : /// it doesn't conflict with any in-progress or queued operations ahead of it. Operations are
208 : /// allowed to skip the queue when it's safe to do so, to increase parallelism.
209 : ///
210 : /// The position must be valid for the queue size.
211 25445 : fn is_ready(&self, pos: usize) -> bool {
212 25445 : let candidate = self.queued_operations.get(pos).expect("invalid position");
213 25445 : self
214 25445 : // Look at in-progress operations, in random order.
215 25445 : .inprogress_tasks
216 25445 : .values()
217 1167033 : .map(|task| &task.op)
218 25445 : // Then queued operations ahead of the candidate, front-to-back.
219 25445 : .chain(self.queued_operations.iter().take(pos))
220 25445 : // Keep track of the active index ahead of each operation. This is used to ensure that
221 25445 : // an upload doesn't skip the queue too far, such that it modifies a layer that's
222 25445 : // referenced by an active index.
223 25445 : //
224 25445 : // It's okay that in-progress operations are emitted in random order above, since at
225 25445 : // most one of them can be an index upload (enforced by can_bypass).
226 1181313 : .scan(&self.clean.0, |next_active_index, op| {
227 1181313 : let active_index = *next_active_index;
228 1181313 : if let UploadOp::UploadMetadata { ref uploaded } = op {
229 16182 : *next_active_index = uploaded; // stash index for next operation after this
230 1165131 : }
231 1181313 : Some((op, active_index))
232 1181313 : })
233 25445 : // Check if the candidate can bypass all of them.
234 1181313 : .all(|(op, active_index)| candidate.can_bypass(op, active_index))
235 25445 : }
236 :
237 : /// Returns the number of in-progress deletion operations.
238 : #[cfg(test)]
239 2 : pub(crate) fn num_inprogress_deletions(&self) -> usize {
240 2 : self.inprogress_tasks
241 2 : .iter()
242 2 : .filter(|(_, t)| matches!(t.op, UploadOp::Delete(_)))
243 2 : .count()
244 2 : }
245 :
246 : /// Returns the number of in-progress layer uploads.
247 : #[cfg(test)]
248 4 : pub(crate) fn num_inprogress_layer_uploads(&self) -> usize {
249 4 : self.inprogress_tasks
250 4 : .iter()
251 6 : .filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _)))
252 4 : .count()
253 4 : }
254 :
255 : /// Test helper that schedules all ready operations into inprogress_tasks, and returns
256 : /// references to them.
257 : ///
258 : /// TODO: the corresponding production logic should be moved from RemoteTimelineClient into
259 : /// UploadQueue, so we can use the same code path.
260 : #[cfg(test)]
261 78 : fn schedule_ready(&mut self) -> Vec<Arc<UploadTask>> {
262 78 : let mut tasks = Vec::new();
263 : // NB: schedule operations one by one, to handle conflicts with inprogress_tasks.
264 172 : while let Some((op, coalesced_ops)) = self.next_ready() {
265 94 : self.task_counter += 1;
266 94 : let task = Arc::new(UploadTask {
267 94 : task_id: self.task_counter,
268 94 : op,
269 94 : coalesced_ops,
270 94 : retries: 0.into(),
271 94 : });
272 94 : self.inprogress_tasks.insert(task.task_id, task.clone());
273 94 : tasks.push(task);
274 94 : }
275 78 : tasks
276 78 : }
277 :
278 : /// Test helper that marks an operation as completed, removing it from inprogress_tasks.
279 : ///
280 : /// TODO: the corresponding production logic should be moved from RemoteTimelineClient into
281 : /// UploadQueue, so we can use the same code path.
282 : #[cfg(test)]
283 58 : fn complete(&mut self, task_id: u64) {
284 58 : let Some(task) = self.inprogress_tasks.remove(&task_id) else {
285 0 : return;
286 : };
287 : // Update the clean index on uploads.
288 58 : if let UploadOp::UploadMetadata { ref uploaded } = task.op {
289 16 : if task.task_id > self.clean.1.unwrap_or_default() {
290 16 : self.clean = (*uploaded.clone(), Some(task.task_id));
291 16 : }
292 42 : }
293 58 : }
294 : }
295 :
296 : #[derive(Clone, Copy)]
297 : pub(super) enum SetDeletedFlagProgress {
298 : NotRunning,
299 : InProgress(NaiveDateTime),
300 : Successful(NaiveDateTime),
301 : }
302 :
303 : pub struct UploadQueueStoppedDeletable {
304 : pub(super) upload_queue_for_deletion: UploadQueueInitialized,
305 : pub(super) deleted_at: SetDeletedFlagProgress,
306 : }
307 :
308 : pub enum UploadQueueStopped {
309 : Deletable(UploadQueueStoppedDeletable),
310 : Uninitialized,
311 : }
312 :
313 : #[derive(thiserror::Error, Debug)]
314 : pub enum NotInitialized {
315 : #[error("queue is in state Uninitialized")]
316 : Uninitialized,
317 : #[error("queue is in state Stopped")]
318 : Stopped,
319 : #[error("queue is shutting down")]
320 : ShuttingDown,
321 : }
322 :
323 : impl NotInitialized {
324 0 : pub(crate) fn is_stopping(&self) -> bool {
325 : use NotInitialized::*;
326 0 : match self {
327 0 : Uninitialized => false,
328 0 : Stopped => true,
329 0 : ShuttingDown => true,
330 : }
331 0 : }
332 : }
333 :
334 : impl UploadQueue {
335 448 : pub fn initialize_empty_remote(
336 448 : &mut self,
337 448 : metadata: &TimelineMetadata,
338 448 : inprogress_limit: usize,
339 448 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
340 448 : match self {
341 448 : UploadQueue::Uninitialized => (),
342 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
343 0 : anyhow::bail!("already initialized, state {}", self.as_str())
344 : }
345 : }
346 :
347 448 : info!("initializing upload queue for empty remote");
348 :
349 448 : let index_part = IndexPart::empty(metadata.clone());
350 448 :
351 448 : let state = UploadQueueInitialized {
352 448 : inprogress_limit,
353 448 : dirty: index_part.clone(),
354 448 : clean: (index_part, None),
355 448 : latest_files_changes_since_metadata_upload_scheduled: 0,
356 448 : visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
357 448 : // what follows are boring default initializations
358 448 : task_counter: 0,
359 448 : inprogress_tasks: HashMap::new(),
360 448 : queued_operations: VecDeque::new(),
361 448 : #[cfg(feature = "testing")]
362 448 : dangling_files: HashMap::new(),
363 448 : recently_deleted: HashSet::new(),
364 448 : blocked_deletions: Vec::new(),
365 448 : shutting_down: false,
366 448 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
367 448 : };
368 448 :
369 448 : *self = UploadQueue::Initialized(state);
370 448 : Ok(self.initialized_mut().expect("we just set it"))
371 448 : }
372 :
373 22 : pub fn initialize_with_current_remote_index_part(
374 22 : &mut self,
375 22 : index_part: &IndexPart,
376 22 : inprogress_limit: usize,
377 22 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
378 22 : match self {
379 22 : UploadQueue::Uninitialized => (),
380 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
381 0 : anyhow::bail!("already initialized, state {}", self.as_str())
382 : }
383 : }
384 :
385 22 : info!(
386 0 : "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
387 0 : index_part.metadata.disk_consistent_lsn()
388 : );
389 :
390 22 : let state = UploadQueueInitialized {
391 22 : inprogress_limit,
392 22 : dirty: index_part.clone(),
393 22 : clean: (index_part.clone(), None),
394 22 : latest_files_changes_since_metadata_upload_scheduled: 0,
395 22 : visible_remote_consistent_lsn: Arc::new(
396 22 : index_part.metadata.disk_consistent_lsn().into(),
397 22 : ),
398 22 : // what follows are boring default initializations
399 22 : task_counter: 0,
400 22 : inprogress_tasks: HashMap::new(),
401 22 : queued_operations: VecDeque::new(),
402 22 : #[cfg(feature = "testing")]
403 22 : dangling_files: HashMap::new(),
404 22 : recently_deleted: HashSet::new(),
405 22 : blocked_deletions: Vec::new(),
406 22 : shutting_down: false,
407 22 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
408 22 : };
409 22 :
410 22 : *self = UploadQueue::Initialized(state);
411 22 : Ok(self.initialized_mut().expect("we just set it"))
412 22 : }
413 :
414 8818 : pub fn initialized_mut(&mut self) -> Result<&mut UploadQueueInitialized, NotInitialized> {
415 : use UploadQueue::*;
416 8818 : match self {
417 0 : Uninitialized => Err(NotInitialized::Uninitialized),
418 8818 : Initialized(x) => {
419 8818 : if x.shutting_down {
420 0 : Err(NotInitialized::ShuttingDown)
421 : } else {
422 8818 : Ok(x)
423 : }
424 : }
425 0 : Stopped(_) => Err(NotInitialized::Stopped),
426 : }
427 8818 : }
428 :
429 2 : pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
430 2 : match self {
431 : UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
432 0 : anyhow::bail!("queue is in state {}", self.as_str())
433 : }
434 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
435 0 : anyhow::bail!("queue is in state Stopped(Uninitialized)")
436 : }
437 2 : UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
438 : }
439 2 : }
440 : }
441 :
442 : /// An in-progress upload or delete task.
443 : #[derive(Debug)]
444 : pub struct UploadTask {
445 : /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
446 : pub task_id: u64,
447 : /// Number of task retries.
448 : pub retries: AtomicU32,
449 : /// The upload operation.
450 : pub op: UploadOp,
451 : /// Any upload operations that were coalesced into this operation. This typically happens with
452 : /// back-to-back index uploads, see `UploadQueueInitialized::next_ready()`.
453 : pub coalesced_ops: Vec<UploadOp>,
454 : }
455 :
456 : /// A deletion of some layers within the lifetime of a timeline. This is not used
457 : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
458 : #[derive(Debug, Clone)]
459 : pub struct Delete {
460 : pub layers: Vec<(LayerName, LayerFileMetadata)>,
461 : }
462 :
463 : #[derive(Clone, Debug)]
464 : pub enum UploadOp {
465 : /// Upload a layer file. The last field indicates the last operation for thie file.
466 : UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
467 :
468 : /// Upload a index_part.json file
469 : UploadMetadata {
470 : /// The next [`UploadQueueInitialized::clean`] after this upload succeeds.
471 : uploaded: Box<IndexPart>,
472 : },
473 :
474 : /// Delete layer files
475 : Delete(Delete),
476 :
477 : /// Barrier. When the barrier operation is reached, the channel is closed.
478 : Barrier(tokio::sync::watch::Sender<()>),
479 :
480 : /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
481 : /// this is the same as a Barrier.
482 : Shutdown,
483 : }
484 :
485 : impl std::fmt::Display for UploadOp {
486 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
487 0 : match self {
488 0 : UploadOp::UploadLayer(layer, metadata, mode) => {
489 0 : write!(
490 0 : f,
491 0 : "UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
492 0 : layer, metadata.file_size, metadata.generation, mode
493 0 : )
494 : }
495 0 : UploadOp::UploadMetadata { uploaded, .. } => {
496 0 : write!(
497 0 : f,
498 0 : "UploadMetadata(lsn: {})",
499 0 : uploaded.metadata.disk_consistent_lsn()
500 0 : )
501 : }
502 0 : UploadOp::Delete(delete) => {
503 0 : write!(f, "Delete({} layers)", delete.layers.len())
504 : }
505 0 : UploadOp::Barrier(_) => write!(f, "Barrier"),
506 0 : UploadOp::Shutdown => write!(f, "Shutdown"),
507 : }
508 0 : }
509 : }
510 :
511 : impl UploadOp {
512 : /// Returns true if self can bypass other, i.e. if the operations don't conflict. index is the
513 : /// active index when other would be uploaded -- if we allow self to bypass other, this would
514 : /// be the active index when self is uploaded.
515 1181361 : pub fn can_bypass(&self, other: &UploadOp, index: &IndexPart) -> bool {
516 1181361 : match (self, other) {
517 : // Nothing can bypass a barrier or shutdown, and it can't bypass anything.
518 2400 : (UploadOp::Barrier(_), _) | (_, UploadOp::Barrier(_)) => false,
519 8 : (UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,
520 :
521 : // Uploads and deletes can bypass each other unless they're for the same file.
522 19877 : (UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => {
523 19877 : let aname = &a.layer_desc().layer_name();
524 19877 : let bname = &b.layer_desc().layer_name();
525 19877 : !is_same_remote_layer_path(aname, ameta, bname, bmeta)
526 : }
527 27 : (UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d))
528 1137462 : | (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => {
529 1137503 : d.layers.iter().all(|(dname, dmeta)| {
530 1137503 : !is_same_remote_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta)
531 1137503 : })
532 : }
533 :
534 : // Deletes are idempotent and can always bypass each other.
535 3029 : (UploadOp::Delete(_), UploadOp::Delete(_)) => true,
536 :
537 : // Uploads and deletes can bypass an index upload as long as neither the uploaded index
538 : // nor the active index below it references the file. A layer can't be modified or
539 : // deleted while referenced by an index.
540 : //
541 : // Similarly, index uploads can bypass uploads and deletes as long as neither the
542 : // uploaded index nor the active index references the file (the latter would be
543 : // incorrect use by the caller).
544 78 : (UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i })
545 3378 : | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => {
546 3456 : let uname = u.layer_desc().layer_name();
547 3456 : !i.references(&uname, umeta) && !index.references(&uname, umeta)
548 : }
549 14730 : (UploadOp::Delete(d), UploadOp::UploadMetadata { uploaded: i })
550 170 : | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => {
551 14900 : d.layers.iter().all(|(dname, dmeta)| {
552 14900 : !i.references(dname, dmeta) && !index.references(dname, dmeta)
553 14900 : })
554 : }
555 :
556 : // Indexes can never bypass each other. They can coalesce though, and
557 : // `UploadQueue::next_ready()` currently does this when possible.
558 202 : (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false,
559 : }
560 1181361 : }
561 : }
562 :
563 : #[cfg(test)]
564 : mod tests {
565 : use super::*;
566 : use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
567 : use crate::tenant::storage_layer::layer::local_layer_path;
568 : use crate::tenant::storage_layer::Layer;
569 : use crate::tenant::Timeline;
570 : use crate::DEFAULT_PG_VERSION;
571 : use itertools::Itertools as _;
572 : use std::str::FromStr as _;
573 : use utils::shard::{ShardCount, ShardIndex, ShardNumber};
574 :
575 : /// Test helper which asserts that two operations are the same, in lieu of UploadOp PartialEq.
576 : #[track_caller]
577 98 : fn assert_same_op(a: &UploadOp, b: &UploadOp) {
578 : use UploadOp::*;
579 98 : match (a, b) {
580 44 : (UploadLayer(a, ameta, atype), UploadLayer(b, bmeta, btype)) => {
581 44 : assert_eq!(a.layer_desc().layer_name(), b.layer_desc().layer_name());
582 44 : assert_eq!(ameta, bmeta);
583 44 : assert_eq!(atype, btype);
584 : }
585 22 : (Delete(a), Delete(b)) => assert_eq!(a.layers, b.layers),
586 28 : (UploadMetadata { uploaded: a }, UploadMetadata { uploaded: b }) => assert_eq!(a, b),
587 4 : (Barrier(_), Barrier(_)) => {}
588 0 : (Shutdown, Shutdown) => {}
589 0 : (a, b) => panic!("{a:?} != {b:?}"),
590 : }
591 98 : }
592 :
593 : /// Test helper which asserts that two sets of operations are the same.
594 : #[track_caller]
595 22 : fn assert_same_ops<'a>(
596 22 : a: impl IntoIterator<Item = &'a UploadOp>,
597 22 : b: impl IntoIterator<Item = &'a UploadOp>,
598 22 : ) {
599 22 : a.into_iter()
600 22 : .zip_eq(b)
601 58 : .for_each(|(a, b)| assert_same_op(a, b))
602 22 : }
603 :
604 : /// Test helper to construct a test timeline.
605 : ///
606 : /// TODO: it really shouldn't be necessary to construct an entire tenant and timeline just to
607 : /// test the upload queue -- decouple ResidentLayer from Timeline.
608 : ///
609 : /// TODO: the upload queue uses TimelineMetadata::example() instead, because there's no way to
610 : /// obtain a TimelineMetadata from a Timeline.
611 24 : fn make_timeline() -> Arc<Timeline> {
612 24 : // Grab the current test name from the current thread name.
613 24 : // TODO: TenantHarness shouldn't take a &'static str, but just leak the test name for now.
614 24 : let test_name = std::thread::current().name().unwrap().to_string();
615 24 : let test_name = Box::leak(test_name.into_boxed_str());
616 24 :
617 24 : let runtime = tokio::runtime::Builder::new_current_thread()
618 24 : .enable_all()
619 24 : .build()
620 24 : .expect("failed to create runtime");
621 24 :
622 24 : runtime
623 24 : .block_on(async {
624 24 : let harness = TenantHarness::create(test_name).await?;
625 24 : let (tenant, ctx) = harness.load().await;
626 24 : tenant
627 24 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
628 24 : .await
629 24 : })
630 24 : .expect("failed to create timeline")
631 24 : }
632 :
633 : /// Test helper to construct an (empty) resident layer.
634 60 : fn make_layer(timeline: &Arc<Timeline>, name: &str) -> ResidentLayer {
635 60 : make_layer_with_size(timeline, name, 0)
636 60 : }
637 :
638 : /// Test helper to construct a resident layer with the given size.
639 66 : fn make_layer_with_size(timeline: &Arc<Timeline>, name: &str, size: usize) -> ResidentLayer {
640 66 : let metadata = LayerFileMetadata {
641 66 : generation: timeline.generation,
642 66 : shard: timeline.get_shard_index(),
643 66 : file_size: size as u64,
644 66 : };
645 66 : make_layer_with_metadata(timeline, name, metadata)
646 66 : }
647 :
648 : /// Test helper to construct a layer with the given metadata.
649 98 : fn make_layer_with_metadata(
650 98 : timeline: &Arc<Timeline>,
651 98 : name: &str,
652 98 : metadata: LayerFileMetadata,
653 98 : ) -> ResidentLayer {
654 98 : let name = LayerName::from_str(name).expect("invalid name");
655 98 : let local_path = local_layer_path(
656 98 : timeline.conf,
657 98 : &timeline.tenant_shard_id,
658 98 : &timeline.timeline_id,
659 98 : &name,
660 98 : &metadata.generation,
661 98 : );
662 98 : std::fs::write(&local_path, vec![0; metadata.file_size as usize])
663 98 : .expect("failed to write file");
664 98 : Layer::for_resident(timeline.conf, timeline, local_path, name, metadata)
665 98 : }
666 :
667 : /// Test helper to add a layer to an index and return a new index.
668 12 : fn index_with(index: &IndexPart, layer: &ResidentLayer) -> Box<IndexPart> {
669 12 : let mut index = index.clone();
670 12 : index
671 12 : .layer_metadata
672 12 : .insert(layer.layer_desc().layer_name(), layer.metadata());
673 12 : Box::new(index)
674 12 : }
675 :
676 : /// Test helper to remove a layer from an index and return a new index.
677 4 : fn index_without(index: &IndexPart, layer: &ResidentLayer) -> Box<IndexPart> {
678 4 : let mut index = index.clone();
679 4 : index
680 4 : .layer_metadata
681 4 : .remove(&layer.layer_desc().layer_name());
682 4 : Box::new(index)
683 4 : }
684 :
685 : /// Nothing can bypass a barrier, and it can't bypass inprogress tasks.
686 : #[test]
687 2 : fn schedule_barrier() -> anyhow::Result<()> {
688 2 : let mut queue = UploadQueue::Uninitialized;
689 2 : let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
690 2 : let tli = make_timeline();
691 2 :
692 2 : let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter
693 2 : let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
694 2 : let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
695 2 : let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
696 2 : let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
697 2 : let (barrier, _) = tokio::sync::watch::channel(());
698 2 :
699 2 : // Enqueue non-conflicting upload, delete, and index before and after a barrier.
700 2 : let ops = [
701 2 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
702 2 : UploadOp::Delete(Delete {
703 2 : layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
704 2 : }),
705 2 : UploadOp::UploadMetadata {
706 2 : uploaded: index.clone(),
707 2 : },
708 2 : UploadOp::Barrier(barrier),
709 2 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
710 2 : UploadOp::Delete(Delete {
711 2 : layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
712 2 : }),
713 2 : UploadOp::UploadMetadata {
714 2 : uploaded: index.clone(),
715 2 : },
716 2 : ];
717 2 :
718 2 : queue.queued_operations.extend(ops.clone());
719 2 :
720 2 : // Schedule the initial operations ahead of the barrier.
721 2 : let tasks = queue.schedule_ready();
722 2 :
723 6 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..3]);
724 2 : assert!(matches!(
725 2 : queue.queued_operations.front(),
726 : Some(&UploadOp::Barrier(_))
727 : ));
728 :
729 : // Complete the initial operations. The barrier isn't scheduled while they're pending.
730 8 : for task in tasks {
731 6 : assert!(queue.schedule_ready().is_empty());
732 6 : queue.complete(task.task_id);
733 : }
734 :
735 : // Schedule the barrier. The later tasks won't schedule until it completes.
736 2 : let tasks = queue.schedule_ready();
737 2 :
738 2 : assert_eq!(tasks.len(), 1);
739 2 : assert!(matches!(tasks[0].op, UploadOp::Barrier(_)));
740 2 : assert_eq!(queue.queued_operations.len(), 3);
741 :
742 : // Complete the barrier. The rest of the tasks schedule immediately.
743 2 : queue.complete(tasks[0].task_id);
744 2 :
745 2 : let tasks = queue.schedule_ready();
746 6 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[4..]);
747 2 : assert!(queue.queued_operations.is_empty());
748 :
749 2 : Ok(())
750 2 : }
751 :
752 : /// Deletes can be scheduled in parallel, even if they're for the same file.
753 : #[test]
754 2 : fn schedule_delete_parallel() -> anyhow::Result<()> {
755 2 : let mut queue = UploadQueue::Uninitialized;
756 2 : let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
757 2 : let tli = make_timeline();
758 2 :
759 2 : // Enqueue a bunch of deletes, some with conflicting names.
760 2 : let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
761 2 : let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
762 2 : let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
763 2 : let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
764 2 :
765 2 : let ops = [
766 2 : UploadOp::Delete(Delete {
767 2 : layers: vec![(layer0.layer_desc().layer_name(), layer0.metadata())],
768 2 : }),
769 2 : UploadOp::Delete(Delete {
770 2 : layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
771 2 : }),
772 2 : UploadOp::Delete(Delete {
773 2 : layers: vec![
774 2 : (layer1.layer_desc().layer_name(), layer1.metadata()),
775 2 : (layer2.layer_desc().layer_name(), layer2.metadata()),
776 2 : ],
777 2 : }),
778 2 : UploadOp::Delete(Delete {
779 2 : layers: vec![(layer2.layer_desc().layer_name(), layer2.metadata())],
780 2 : }),
781 2 : UploadOp::Delete(Delete {
782 2 : layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
783 2 : }),
784 2 : ];
785 2 :
786 2 : queue.queued_operations.extend(ops.clone());
787 2 :
788 2 : // Schedule all ready operations. Since deletes don't conflict, they're all scheduled.
789 2 : let tasks = queue.schedule_ready();
790 2 :
791 10 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops);
792 2 : assert!(queue.queued_operations.is_empty());
793 :
794 2 : Ok(())
795 2 : }
796 :
797 : /// Conflicting uploads are serialized.
798 : #[test]
799 2 : fn schedule_upload_conflicts() -> anyhow::Result<()> {
800 2 : let mut queue = UploadQueue::Uninitialized;
801 2 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
802 2 : let tli = make_timeline();
803 2 :
804 2 : // Enqueue three versions of the same layer, with different file sizes.
805 2 : let layer0a = make_layer_with_size(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51", 1);
806 2 : let layer0b = make_layer_with_size(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51", 2);
807 2 : let layer0c = make_layer_with_size(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51", 3);
808 2 :
809 2 : let ops = [
810 2 : UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None),
811 2 : UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None),
812 2 : UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None),
813 2 : ];
814 2 :
815 2 : queue.queued_operations.extend(ops.clone());
816 :
817 : // Only one version should be scheduled and uploaded at a time.
818 8 : for op in ops {
819 6 : let tasks = queue.schedule_ready();
820 6 : assert_eq!(tasks.len(), 1);
821 6 : assert_same_op(&tasks[0].op, &op);
822 6 : queue.complete(tasks[0].task_id);
823 : }
824 2 : assert!(queue.schedule_ready().is_empty());
825 2 : assert!(queue.queued_operations.is_empty());
826 :
827 2 : Ok(())
828 2 : }
829 :
830 : /// Conflicting uploads and deletes are serialized.
831 : #[test]
832 2 : fn schedule_upload_delete_conflicts() -> anyhow::Result<()> {
833 2 : let mut queue = UploadQueue::Uninitialized;
834 2 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
835 2 : let tli = make_timeline();
836 2 :
837 2 : // Enqueue two layer uploads, with a delete of both layers in between them. These should be
838 2 : // scheduled one at a time, since deletes can't bypass uploads and vice versa.
839 2 : let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
840 2 : let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
841 2 :
842 2 : let ops = [
843 2 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
844 2 : UploadOp::Delete(Delete {
845 2 : layers: vec![
846 2 : (layer0.layer_desc().layer_name(), layer0.metadata()),
847 2 : (layer1.layer_desc().layer_name(), layer1.metadata()),
848 2 : ],
849 2 : }),
850 2 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
851 2 : ];
852 2 :
853 2 : queue.queued_operations.extend(ops.clone());
854 :
855 : // Only one version should be scheduled and uploaded at a time.
856 8 : for op in ops {
857 6 : let tasks = queue.schedule_ready();
858 6 : assert_eq!(tasks.len(), 1);
859 6 : assert_same_op(&tasks[0].op, &op);
860 6 : queue.complete(tasks[0].task_id);
861 : }
862 2 : assert!(queue.schedule_ready().is_empty());
863 2 : assert!(queue.queued_operations.is_empty());
864 :
865 2 : Ok(())
866 2 : }
867 :
868 : /// Non-conflicting uploads and deletes can bypass the queue, avoiding the conflicting
869 : /// delete/upload operations at the head of the queue.
870 : #[test]
871 2 : fn schedule_upload_delete_conflicts_bypass() -> anyhow::Result<()> {
872 2 : let mut queue = UploadQueue::Uninitialized;
873 2 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
874 2 : let tli = make_timeline();
875 2 :
876 2 : // Enqueue two layer uploads, with a delete of both layers in between them. These should be
877 2 : // scheduled one at a time, since deletes can't bypass uploads and vice versa.
878 2 : //
879 2 : // Also enqueue non-conflicting uploads and deletes at the end. These can bypass the queue
880 2 : // and run immediately.
881 2 : let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
882 2 : let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
883 2 : let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
884 2 : let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
885 2 :
886 2 : let ops = [
887 2 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
888 2 : UploadOp::Delete(Delete {
889 2 : layers: vec![
890 2 : (layer0.layer_desc().layer_name(), layer0.metadata()),
891 2 : (layer1.layer_desc().layer_name(), layer1.metadata()),
892 2 : ],
893 2 : }),
894 2 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
895 2 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
896 2 : UploadOp::Delete(Delete {
897 2 : layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
898 2 : }),
899 2 : ];
900 2 :
901 2 : queue.queued_operations.extend(ops.clone());
902 2 :
903 2 : // Operations 0, 3, and 4 are scheduled immediately.
904 2 : let tasks = queue.schedule_ready();
905 6 : assert_same_ops(tasks.iter().map(|t| &t.op), [&ops[0], &ops[3], &ops[4]]);
906 2 : assert_eq!(queue.queued_operations.len(), 2);
907 :
908 2 : Ok(())
909 2 : }
910 :
911 : /// Non-conflicting uploads are parallelized.
912 : #[test]
913 2 : fn schedule_upload_parallel() -> anyhow::Result<()> {
914 2 : let mut queue = UploadQueue::Uninitialized;
915 2 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
916 2 : let tli = make_timeline();
917 2 :
918 2 : // Enqueue three different layer uploads.
919 2 : let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
920 2 : let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
921 2 : let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
922 2 :
923 2 : let ops = [
924 2 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
925 2 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
926 2 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
927 2 : ];
928 2 :
929 2 : queue.queued_operations.extend(ops.clone());
930 2 :
931 2 : // All uploads should be scheduled concurrently.
932 2 : let tasks = queue.schedule_ready();
933 2 :
934 6 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops);
935 2 : assert!(queue.queued_operations.is_empty());
936 :
937 2 : Ok(())
938 2 : }
939 :
940 : /// Index uploads are coalesced.
941 : #[test]
942 2 : fn schedule_index_coalesce() -> anyhow::Result<()> {
943 2 : let mut queue = UploadQueue::Uninitialized;
944 2 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
945 :
946 : // Enqueue three uploads of the current empty index.
947 2 : let index = Box::new(queue.clean.0.clone());
948 2 :
949 2 : let ops = [
950 2 : UploadOp::UploadMetadata {
951 2 : uploaded: index.clone(),
952 2 : },
953 2 : UploadOp::UploadMetadata {
954 2 : uploaded: index.clone(),
955 2 : },
956 2 : UploadOp::UploadMetadata {
957 2 : uploaded: index.clone(),
958 2 : },
959 2 : ];
960 2 :
961 2 : queue.queued_operations.extend(ops.clone());
962 2 :
963 2 : // The index uploads are coalesced into a single operation.
964 2 : let tasks = queue.schedule_ready();
965 2 : assert_eq!(tasks.len(), 1);
966 2 : assert_same_op(&tasks[0].op, &ops[2]);
967 2 : assert_same_ops(&tasks[0].coalesced_ops, &ops[0..2]);
968 2 :
969 2 : assert!(queue.queued_operations.is_empty());
970 :
971 2 : Ok(())
972 2 : }
973 :
974 : /// Chains of upload/index operations lead to parallel layer uploads and serial index uploads.
975 : /// This is the common case with layer flushes.
976 : #[test]
977 2 : fn schedule_index_upload_chain() -> anyhow::Result<()> {
978 2 : let mut queue = UploadQueue::Uninitialized;
979 2 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
980 2 : let tli = make_timeline();
981 2 :
982 2 : // Enqueue three uploads of the current empty index.
983 2 : let index = Box::new(queue.clean.0.clone());
984 2 : let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
985 2 : let index0 = index_with(&index, &layer0);
986 2 : let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
987 2 : let index1 = index_with(&index0, &layer1);
988 2 : let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
989 2 : let index2 = index_with(&index1, &layer2);
990 2 :
991 2 : let ops = [
992 2 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
993 2 : UploadOp::UploadMetadata {
994 2 : uploaded: index0.clone(),
995 2 : },
996 2 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
997 2 : UploadOp::UploadMetadata {
998 2 : uploaded: index1.clone(),
999 2 : },
1000 2 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
1001 2 : UploadOp::UploadMetadata {
1002 2 : uploaded: index2.clone(),
1003 2 : },
1004 2 : ];
1005 2 :
1006 2 : queue.queued_operations.extend(ops.clone());
1007 2 :
1008 2 : // The layer uploads should be scheduled immediately. The indexes must wait.
1009 2 : let upload_tasks = queue.schedule_ready();
1010 2 : assert_same_ops(
1011 6 : upload_tasks.iter().map(|t| &t.op),
1012 2 : [&ops[0], &ops[2], &ops[4]],
1013 2 : );
1014 2 :
1015 2 : // layer2 completes first. None of the indexes can upload yet.
1016 2 : queue.complete(upload_tasks[2].task_id);
1017 2 : assert!(queue.schedule_ready().is_empty());
1018 :
1019 : // layer0 completes. index0 can upload. It completes.
1020 2 : queue.complete(upload_tasks[0].task_id);
1021 2 : let index_tasks = queue.schedule_ready();
1022 2 : assert_eq!(index_tasks.len(), 1);
1023 2 : assert_same_op(&index_tasks[0].op, &ops[1]);
1024 2 : queue.complete(index_tasks[0].task_id);
1025 2 :
1026 2 : // layer 1 completes. This unblocks index 1 and 2, which coalesce into
1027 2 : // a single upload for index 2.
1028 2 : queue.complete(upload_tasks[1].task_id);
1029 2 :
1030 2 : let index_tasks = queue.schedule_ready();
1031 2 : assert_eq!(index_tasks.len(), 1);
1032 2 : assert_same_op(&index_tasks[0].op, &ops[5]);
1033 2 : assert_same_ops(&index_tasks[0].coalesced_ops, &ops[3..4]);
1034 2 :
1035 2 : assert!(queue.queued_operations.is_empty());
1036 :
1037 2 : Ok(())
1038 2 : }
1039 :
1040 : /// A delete can't bypass an index upload if an index ahead of it still references it.
1041 : #[test]
1042 2 : fn schedule_index_delete_dereferenced() -> anyhow::Result<()> {
1043 2 : let mut queue = UploadQueue::Uninitialized;
1044 2 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
1045 2 : let tli = make_timeline();
1046 2 :
1047 2 : // Create a layer to upload.
1048 2 : let layer = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
1049 2 : let index_upload = index_with(&queue.clean.0, &layer);
1050 2 :
1051 2 : // Remove the layer reference in a new index, then delete the layer.
1052 2 : let index_deref = index_without(&index_upload, &layer);
1053 2 :
1054 2 : let ops = [
1055 2 : // Initial upload, with a barrier to prevent index coalescing.
1056 2 : UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
1057 2 : UploadOp::UploadMetadata {
1058 2 : uploaded: index_upload.clone(),
1059 2 : },
1060 2 : UploadOp::Barrier(tokio::sync::watch::channel(()).0),
1061 2 : // Dereference the layer and delete it.
1062 2 : UploadOp::UploadMetadata {
1063 2 : uploaded: index_deref.clone(),
1064 2 : },
1065 2 : UploadOp::Delete(Delete {
1066 2 : layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
1067 2 : }),
1068 2 : ];
1069 2 :
1070 2 : queue.queued_operations.extend(ops.clone());
1071 :
1072 : // Operations are serialized.
1073 12 : for op in ops {
1074 10 : let tasks = queue.schedule_ready();
1075 10 : assert_eq!(tasks.len(), 1);
1076 10 : assert_same_op(&tasks[0].op, &op);
1077 10 : queue.complete(tasks[0].task_id);
1078 : }
1079 2 : assert!(queue.queued_operations.is_empty());
1080 :
1081 2 : Ok(())
1082 2 : }
1083 :
1084 : /// An upload with a reused layer name doesn't clobber the previous layer. Specifically, a
1085 : /// dereference/upload/reference cycle can't allow the upload to bypass the reference.
1086 : #[test]
1087 2 : fn schedule_index_upload_dereferenced() -> anyhow::Result<()> {
1088 2 : let mut queue = UploadQueue::Uninitialized;
1089 2 : let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
1090 2 : let tli = make_timeline();
1091 2 :
1092 2 : // Create a layer to upload.
1093 2 : let layer = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
1094 2 :
1095 2 : // Upload the layer. Then dereference the layer, and upload/reference it again.
1096 2 : let index_upload = index_with(&queue.clean.0, &layer);
1097 2 : let index_deref = index_without(&index_upload, &layer);
1098 2 : let index_ref = index_with(&index_deref, &layer);
1099 2 :
1100 2 : let ops = [
1101 2 : // Initial upload, with a barrier to prevent index coalescing.
1102 2 : UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
1103 2 : UploadOp::UploadMetadata {
1104 2 : uploaded: index_upload.clone(),
1105 2 : },
1106 2 : UploadOp::Barrier(tokio::sync::watch::channel(()).0),
1107 2 : // Dereference the layer.
1108 2 : UploadOp::UploadMetadata {
1109 2 : uploaded: index_deref.clone(),
1110 2 : },
1111 2 : // Replace and reference the layer.
1112 2 : UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
1113 2 : UploadOp::UploadMetadata {
1114 2 : uploaded: index_ref.clone(),
1115 2 : },
1116 2 : ];
1117 2 :
1118 2 : queue.queued_operations.extend(ops.clone());
1119 :
1120 : // Operations are serialized.
1121 14 : for op in ops {
1122 12 : let tasks = queue.schedule_ready();
1123 12 : assert_eq!(tasks.len(), 1);
1124 12 : assert_same_op(&tasks[0].op, &op);
1125 12 : queue.complete(tasks[0].task_id);
1126 : }
1127 2 : assert!(queue.queued_operations.is_empty());
1128 :
1129 2 : Ok(())
1130 2 : }
1131 :
1132 : /// Nothing can bypass a shutdown, and it waits for inprogress tasks. It's never returned from
1133 : /// next_ready(), but is left at the head of the queue.
1134 : #[test]
1135 2 : fn schedule_shutdown() -> anyhow::Result<()> {
1136 2 : let mut queue = UploadQueue::Uninitialized;
1137 2 : let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
1138 2 : let tli = make_timeline();
1139 2 :
1140 2 : let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter
1141 2 : let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
1142 2 : let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
1143 2 : let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
1144 2 : let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
1145 2 :
1146 2 : // Enqueue non-conflicting upload, delete, and index before and after a shutdown.
1147 2 : let ops = [
1148 2 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
1149 2 : UploadOp::Delete(Delete {
1150 2 : layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
1151 2 : }),
1152 2 : UploadOp::UploadMetadata {
1153 2 : uploaded: index.clone(),
1154 2 : },
1155 2 : UploadOp::Shutdown,
1156 2 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
1157 2 : UploadOp::Delete(Delete {
1158 2 : layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
1159 2 : }),
1160 2 : UploadOp::UploadMetadata {
1161 2 : uploaded: index.clone(),
1162 2 : },
1163 2 : ];
1164 2 :
1165 2 : queue.queued_operations.extend(ops.clone());
1166 2 :
1167 2 : // Schedule the initial operations ahead of the shutdown.
1168 2 : let tasks = queue.schedule_ready();
1169 2 :
1170 6 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..3]);
1171 2 : assert!(matches!(
1172 2 : queue.queued_operations.front(),
1173 : Some(&UploadOp::Shutdown)
1174 : ));
1175 :
1176 : // Complete the initial operations. The shutdown isn't triggered while they're pending.
1177 8 : for task in tasks {
1178 6 : assert!(queue.schedule_ready().is_empty());
1179 6 : queue.complete(task.task_id);
1180 : }
1181 :
1182 : // The shutdown is triggered the next time we try to pull an operation. It isn't returned,
1183 : // but is left in the queue.
1184 2 : assert!(!queue.shutdown_ready.is_closed());
1185 2 : assert!(queue.next_ready().is_none());
1186 2 : assert!(queue.shutdown_ready.is_closed());
1187 :
1188 2 : Ok(())
1189 2 : }
1190 :
1191 : /// Scheduling respects inprogress_limit.
1192 : #[test]
1193 2 : fn schedule_inprogress_limit() -> anyhow::Result<()> {
1194 2 : // Create a queue with inprogress_limit=2.
1195 2 : let mut queue = UploadQueue::Uninitialized;
1196 2 : let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 2)?;
1197 2 : let tli = make_timeline();
1198 2 :
1199 2 : // Enqueue a bunch of uploads.
1200 2 : let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
1201 2 : let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
1202 2 : let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
1203 2 : let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
1204 2 :
1205 2 : let ops = [
1206 2 : UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
1207 2 : UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
1208 2 : UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
1209 2 : UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None),
1210 2 : ];
1211 2 :
1212 2 : queue.queued_operations.extend(ops.clone());
1213 2 :
1214 2 : // Schedule all ready operations. Only 2 are scheduled.
1215 2 : let tasks = queue.schedule_ready();
1216 4 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..2]);
1217 2 : assert!(queue.next_ready().is_none());
1218 :
1219 : // When one completes, another is scheduled.
1220 2 : queue.complete(tasks[0].task_id);
1221 2 : let tasks = queue.schedule_ready();
1222 2 : assert_same_ops(tasks.iter().map(|t| &t.op), &ops[2..3]);
1223 2 :
1224 2 : Ok(())
1225 2 : }
1226 :
1227 : /// Tests that can_bypass takes name, generation and shard index into account for all operations.
1228 : #[test]
1229 2 : fn can_bypass_path() -> anyhow::Result<()> {
1230 2 : let tli = make_timeline();
1231 2 :
1232 2 : let name0 = &"000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51";
1233 2 : let name1 = &"100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51";
1234 :
1235 : // Asserts that layers a and b either can or can't bypass each other, for all combinations
1236 : // of operations (except Delete and UploadMetadata which are special-cased).
1237 : #[track_caller]
1238 16 : fn assert_can_bypass(a: ResidentLayer, b: ResidentLayer, can_bypass: bool) {
1239 16 : let index = IndexPart::empty(TimelineMetadata::example());
1240 48 : for (a, b) in make_ops(a).into_iter().zip(make_ops(b)) {
1241 48 : match (&a, &b) {
1242 : // Deletes can always bypass each other.
1243 16 : (UploadOp::Delete(_), UploadOp::Delete(_)) => assert!(a.can_bypass(&b, &index)),
1244 : // Indexes can never bypass each other.
1245 : (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => {
1246 16 : assert!(!a.can_bypass(&b, &index))
1247 : }
1248 : // For other operations, assert as requested.
1249 16 : (a, b) => assert_eq!(a.can_bypass(b, &index), can_bypass),
1250 : }
1251 : }
1252 16 : }
1253 :
1254 32 : fn make_ops(layer: ResidentLayer) -> Vec<UploadOp> {
1255 32 : let mut index = IndexPart::empty(TimelineMetadata::example());
1256 32 : index
1257 32 : .layer_metadata
1258 32 : .insert(layer.layer_desc().layer_name(), layer.metadata());
1259 32 : vec![
1260 32 : UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
1261 32 : UploadOp::Delete(Delete {
1262 32 : layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
1263 32 : }),
1264 32 : UploadOp::UploadMetadata {
1265 32 : uploaded: Box::new(index),
1266 32 : },
1267 32 : ]
1268 32 : }
1269 :
1270 : // Makes a ResidentLayer.
1271 32 : let layer = |name: &'static str, shard: Option<u8>, generation: u32| -> ResidentLayer {
1272 32 : let shard = shard
1273 32 : .map(|n| ShardIndex::new(ShardNumber(n), ShardCount(8)))
1274 32 : .unwrap_or(ShardIndex::unsharded());
1275 32 : let metadata = LayerFileMetadata {
1276 32 : shard,
1277 32 : generation: Generation::Valid(generation),
1278 32 : file_size: 0,
1279 32 : };
1280 32 : make_layer_with_metadata(&tli, name, metadata)
1281 32 : };
1282 :
1283 : // Same name and metadata can't bypass. This goes both for unsharded and sharded, as well as
1284 : // 0 or >0 generation.
1285 2 : assert_can_bypass(layer(name0, None, 0), layer(name0, None, 0), false);
1286 2 : assert_can_bypass(layer(name0, Some(0), 0), layer(name0, Some(0), 0), false);
1287 2 : assert_can_bypass(layer(name0, None, 1), layer(name0, None, 1), false);
1288 2 :
1289 2 : // Different names can bypass.
1290 2 : assert_can_bypass(layer(name0, None, 0), layer(name1, None, 0), true);
1291 2 :
1292 2 : // Different shards can bypass. Shard 0 is different from unsharded.
1293 2 : assert_can_bypass(layer(name0, Some(0), 0), layer(name0, Some(1), 0), true);
1294 2 : assert_can_bypass(layer(name0, Some(0), 0), layer(name0, None, 0), true);
1295 2 :
1296 2 : // Different generations can bypass, both sharded and unsharded.
1297 2 : assert_can_bypass(layer(name0, None, 0), layer(name0, None, 1), true);
1298 2 : assert_can_bypass(layer(name0, Some(1), 0), layer(name0, Some(1), 1), true);
1299 2 :
1300 2 : Ok(())
1301 2 : }
1302 : }
|