Line data Source code
1 : //! This module manages synchronizing local FS with remote storage.
2 : //!
3 : //! # Overview
4 : //!
5 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
6 : //! It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
7 : //! when it's safe to do so.
8 : //!
9 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
10 : //!
11 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
12 : //!
13 : //! # APIs & How To Use Them
14 : //!
15 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
16 : //! unless the pageserver is configured without remote storage.
17 : //!
18 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
19 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
20 : //! timeline.
21 : //! However, the client does not become ready for use until we've initialized its upload queue:
22 : //!
23 : //! - For timelines that already have some state on the remote storage, we use
24 : //! [`RemoteTimelineClient::init_upload_queue`] .
25 : //! - For newly created timelines, we use
26 : //! [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
27 : //!
28 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
29 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
30 : //!
31 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
32 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
33 : //!
34 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`] when we've created a new layer file.
35 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
36 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
37 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
38 : //!
39 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
40 : //!
41 : //! There are also APIs for downloading files.
42 : //! These are not part of the aforementioned queuing and will not be discussed
43 : //! further here, except in the section covering tenant attach.
44 : //!
45 : //! # Remote Storage Structure & [`IndexPart`] Index File
46 : //!
47 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
48 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
49 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
50 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
51 : //! files for a given timeline.
52 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
53 : //!
54 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
55 : //!
56 : //! # Consistency
57 : //!
58 : //! To have a consistent remote structure, it's important that uploads and
59 : //! deletions are performed in the right order. For example, the index file
60 : //! contains a list of layer files, so it must not be uploaded until all the
61 : //! layer files that are in its list have been successfully uploaded.
62 : //!
63 : //! The contract between client and its user is that the user is responsible of
64 : //! scheduling operations in an order that keeps the remote consistent as
65 : //! described above.
66 : //! From the user's perspective, the operations are executed sequentially.
67 : //! Internally, the client knows which operations can be performed in parallel,
68 : //! and which operations act like a "barrier" that require preceding operations
69 : //! to finish. The calling code just needs to call the schedule-functions in the
70 : //! correct order, and the client will parallelize the operations in a way that
71 : //! is safe.
72 : //!
73 : //! The caller should be careful with deletion, though. They should not delete
74 : //! local files that have been scheduled for upload but not yet finished uploading.
75 : //! Otherwise the upload will fail. To wait for an upload to finish, use
76 : //! the 'wait_completion' function (more on that later.)
77 : //!
78 : //! All of this relies on the following invariants:
79 : //!
80 : //! - We rely on read-after write consistency in the remote storage.
81 : //! - Layer files are immutable
82 : //!
83 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
84 : //! storage. Different tenants can be attached to different pageservers, but if the
85 : //! same tenant is attached to two pageservers at the same time, they will overwrite
86 : //! each other's index file updates, and confusion will ensue. There's no interlock or
87 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
88 : //! that that doesn't happen.
89 : //!
90 : //! ## Implementation Note
91 : //!
92 : //! The *actual* remote state lags behind the *desired* remote state while
93 : //! there are in-flight operations.
94 : //! We keep track of the desired remote state in [`UploadQueueInitialized::dirty`].
95 : //! It is initialized based on the [`IndexPart`] that was passed during init
96 : //! and updated with every `schedule_*` function call.
97 : //! All this is necessary necessary to compute the future [`IndexPart`]s
98 : //! when scheduling an operation while other operations that also affect the
99 : //! remote [`IndexPart`] are in flight.
100 : //!
101 : //! # Retries & Error Handling
102 : //!
103 : //! The client retries operations indefinitely, using exponential back-off.
104 : //! There is no way to force a retry, i.e., interrupt the back-off.
105 : //! This could be built easily.
106 : //!
107 : //! # Cancellation
108 : //!
109 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
110 : //! the client's tenant and timeline.
111 : //! Dropping the client will drop queued operations but not executing operations.
112 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
113 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
114 : //!
115 : //! # Completion
116 : //!
117 : //! Once an operation has completed, we update [`UploadQueueInitialized::clean`] immediately,
118 : //! and submit a request through the DeletionQueue to update
119 : //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
120 : //! validated that our generation is not stale. It is this visible value
121 : //! that is advertized to safekeepers as a signal that that they can
122 : //! delete the WAL up to that LSN.
123 : //!
124 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
125 : //! for all pending operations to complete. It does not prevent more
126 : //! operations from getting scheduled.
127 : //!
128 : //! # Crash Consistency
129 : //!
130 : //! We do not persist the upload queue state.
131 : //! If we drop the client, or crash, all unfinished operations are lost.
132 : //!
133 : //! To recover, the following steps need to be taken:
134 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
135 : //! consistent remote state, assuming the user scheduled the operations in
136 : //! the correct order.
137 : //! - Initiate upload queue with that [`IndexPart`].
138 : //! - Reschedule all lost operations by comparing the local filesystem state
139 : //! and remote state as per [`IndexPart`]. This is done in
140 : //! [`Tenant::timeline_init_and_sync`].
141 : //!
142 : //! Note that if we crash during file deletion between the index update
143 : //! that removes the file from the list of files, and deleting the remote file,
144 : //! the file is leaked in the remote storage. Similarly, if a new file is created
145 : //! and uploaded, but the pageserver dies permanently before updating the
146 : //! remote index file, the new file is leaked in remote storage. We accept and
147 : //! tolerate that for now.
148 : //! Note further that we cannot easily fix this by scheduling deletes for every
149 : //! file that is present only on the remote, because we cannot distinguish the
150 : //! following two cases:
151 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
152 : //! but crashed before it finished remotely.
153 : //! - (2) We never had the file locally because we haven't on-demand downloaded
154 : //! it yet.
155 : //!
156 : //! # Downloads
157 : //!
158 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
159 : //! downloading files from the remote storage. Downloads are performed immediately
160 : //! against the `RemoteStorage`, independently of the upload queue.
161 : //!
162 : //! When we attach a tenant, we perform the following steps:
163 : //! - create `Tenant` object in `TenantState::Attaching` state
164 : //! - List timelines that are present in remote storage, and for each:
165 : //! - download their remote [`IndexPart`]s
166 : //! - create `Timeline` struct and a `RemoteTimelineClient`
167 : //! - initialize the client's upload queue with its `IndexPart`
168 : //! - schedule uploads for layers that are only present locally.
169 : //! - After the above is done for each timeline, open the tenant for business by
170 : //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
171 : //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
172 : //!
173 : //! # Operating Without Remote Storage
174 : //!
175 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
176 : //! not created and the uploads are skipped.
177 : //!
178 : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
179 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
180 :
181 : pub(crate) mod download;
182 : pub mod index;
183 : pub(crate) mod upload;
184 :
185 : use anyhow::Context;
186 : use camino::Utf8Path;
187 : use chrono::{NaiveDateTime, Utc};
188 :
189 : pub(crate) use download::download_initdb_tar_zst;
190 : use pageserver_api::models::{AuxFilePolicy, TimelineArchivalState};
191 : use pageserver_api::shard::{ShardIndex, TenantShardId};
192 : use scopeguard::ScopeGuard;
193 : use tokio_util::sync::CancellationToken;
194 : pub(crate) use upload::upload_initdb_dir;
195 : use utils::backoff::{
196 : self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
197 : };
198 : use utils::pausable_failpoint;
199 :
200 : use std::collections::{HashMap, VecDeque};
201 : use std::sync::atomic::{AtomicU32, Ordering};
202 : use std::sync::{Arc, Mutex};
203 : use std::time::Duration;
204 :
205 : use remote_storage::{
206 : DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
207 : };
208 : use std::ops::DerefMut;
209 : use tracing::{debug, error, info, instrument, warn};
210 : use tracing::{info_span, Instrument};
211 : use utils::lsn::Lsn;
212 :
213 : use crate::context::RequestContext;
214 : use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
215 : use crate::metrics::{
216 : MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
217 : RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
218 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
219 : };
220 : use crate::task_mgr::shutdown_token;
221 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
222 : use crate::tenant::remote_timeline_client::download::download_retry;
223 : use crate::tenant::storage_layer::AsLayerDesc;
224 : use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable};
225 : use crate::tenant::TIMELINES_SEGMENT_NAME;
226 : use crate::{
227 : config::PageServerConf,
228 : task_mgr,
229 : task_mgr::TaskKind,
230 : task_mgr::BACKGROUND_RUNTIME,
231 : tenant::metadata::TimelineMetadata,
232 : tenant::upload_queue::{
233 : UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
234 : },
235 : TENANT_HEATMAP_BASENAME,
236 : };
237 :
238 : use utils::id::{TenantId, TimelineId};
239 :
240 : use self::index::IndexPart;
241 :
242 : use super::metadata::MetadataUpdate;
243 : use super::storage_layer::{Layer, LayerName, ResidentLayer};
244 : use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
245 : use super::Generation;
246 :
247 : pub(crate) use download::{
248 : download_index_part, is_temp_download_file, list_remote_tenant_shards, list_remote_timelines,
249 : };
250 : pub(crate) use index::LayerFileMetadata;
251 :
252 : // Occasional network issues and such can cause remote operations to fail, and
253 : // that's expected. If a download fails, we log it at info-level, and retry.
254 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
255 : // level instead, as repeated failures can mean a more serious problem. If it
256 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
257 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
258 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
259 :
260 : // Similarly log failed uploads and deletions at WARN level, after this many
261 : // retries. Uploads and deletions are retried forever, though.
262 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
263 :
264 : pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
265 :
266 : pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst";
267 :
268 : /// Default buffer size when interfacing with [`tokio::fs::File`].
269 : pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
270 :
271 : /// Doing non-essential flushes of deletion queue is subject to this timeout, after
272 : /// which we warn and skip.
273 : const DELETION_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_secs(10);
274 :
275 : pub enum MaybeDeletedIndexPart {
276 : IndexPart(IndexPart),
277 : Deleted(IndexPart),
278 : }
279 :
280 0 : #[derive(Debug, thiserror::Error)]
281 : pub enum PersistIndexPartWithDeletedFlagError {
282 : #[error("another task is already setting the deleted_flag, started at {0:?}")]
283 : AlreadyInProgress(NaiveDateTime),
284 : #[error("the deleted_flag was already set, value is {0:?}")]
285 : AlreadyDeleted(NaiveDateTime),
286 : #[error(transparent)]
287 : Other(#[from] anyhow::Error),
288 : }
289 :
290 0 : #[derive(Debug, thiserror::Error)]
291 : pub enum WaitCompletionError {
292 : #[error(transparent)]
293 : NotInitialized(NotInitialized),
294 : #[error("wait_completion aborted because upload queue was stopped")]
295 : UploadQueueShutDownOrStopped,
296 : }
297 :
298 : /// A client for accessing a timeline's data in remote storage.
299 : ///
300 : /// This takes care of managing the number of connections, and balancing them
301 : /// across tenants. This also handles retries of failed uploads.
302 : ///
303 : /// Upload and delete requests are ordered so that before a deletion is
304 : /// performed, we wait for all preceding uploads to finish. This ensures sure
305 : /// that if you perform a compaction operation that reshuffles data in layer
306 : /// files, we don't have a transient state where the old files have already been
307 : /// deleted, but new files have not yet been uploaded.
308 : ///
309 : /// Similarly, this enforces an order between index-file uploads, and layer
310 : /// uploads. Before an index-file upload is performed, all preceding layer
311 : /// uploads must be finished.
312 : ///
313 : /// This also maintains a list of remote files, and automatically includes that
314 : /// in the index part file, whenever timeline metadata is uploaded.
315 : ///
316 : /// Downloads are not queued, they are performed immediately.
317 : pub struct RemoteTimelineClient {
318 : conf: &'static PageServerConf,
319 :
320 : runtime: tokio::runtime::Handle,
321 :
322 : tenant_shard_id: TenantShardId,
323 : timeline_id: TimelineId,
324 : generation: Generation,
325 :
326 : upload_queue: Mutex<UploadQueue>,
327 :
328 : pub(crate) metrics: Arc<RemoteTimelineClientMetrics>,
329 :
330 : storage_impl: GenericRemoteStorage,
331 :
332 : deletion_queue_client: DeletionQueueClient,
333 :
334 : cancel: CancellationToken,
335 : }
336 :
337 : impl RemoteTimelineClient {
338 : ///
339 : /// Create a remote storage client for given timeline
340 : ///
341 : /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
342 : /// by calling init_upload_queue.
343 : ///
344 1236 : pub fn new(
345 1236 : remote_storage: GenericRemoteStorage,
346 1236 : deletion_queue_client: DeletionQueueClient,
347 1236 : conf: &'static PageServerConf,
348 1236 : tenant_shard_id: TenantShardId,
349 1236 : timeline_id: TimelineId,
350 1236 : generation: Generation,
351 1236 : ) -> RemoteTimelineClient {
352 1236 : RemoteTimelineClient {
353 1236 : conf,
354 1236 : runtime: if cfg!(test) {
355 : // remote_timeline_client.rs tests rely on current-thread runtime
356 1236 : tokio::runtime::Handle::current()
357 : } else {
358 0 : BACKGROUND_RUNTIME.handle().clone()
359 : },
360 1236 : tenant_shard_id,
361 1236 : timeline_id,
362 1236 : generation,
363 1236 : storage_impl: remote_storage,
364 1236 : deletion_queue_client,
365 1236 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
366 1236 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
367 1236 : &tenant_shard_id,
368 1236 : &timeline_id,
369 1236 : )),
370 1236 : cancel: CancellationToken::new(),
371 1236 : }
372 1236 : }
373 :
374 : /// Initialize the upload queue for a remote storage that already received
375 : /// an index file upload, i.e., it's not empty.
376 : /// The given `index_part` must be the one on the remote.
377 18 : pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
378 18 : let mut upload_queue = self.upload_queue.lock().unwrap();
379 18 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
380 18 : self.update_remote_physical_size_gauge(Some(index_part));
381 18 : info!(
382 0 : "initialized upload queue from remote index with {} layer files",
383 0 : index_part.layer_metadata.len()
384 : );
385 18 : Ok(())
386 18 : }
387 :
388 : /// Initialize the upload queue for the case where the remote storage is empty,
389 : /// i.e., it doesn't have an `IndexPart`.
390 1218 : pub fn init_upload_queue_for_empty_remote(
391 1218 : &self,
392 1218 : local_metadata: &TimelineMetadata,
393 1218 : ) -> anyhow::Result<()> {
394 1218 : let mut upload_queue = self.upload_queue.lock().unwrap();
395 1218 : upload_queue.initialize_empty_remote(local_metadata)?;
396 1218 : self.update_remote_physical_size_gauge(None);
397 1218 : info!("initialized upload queue as empty");
398 1218 : Ok(())
399 1218 : }
400 :
401 : /// Initialize the queue in stopped state. Used in startup path
402 : /// to continue deletion operation interrupted by pageserver crash or restart.
403 0 : pub fn init_upload_queue_stopped_to_continue_deletion(
404 0 : &self,
405 0 : index_part: &IndexPart,
406 0 : ) -> anyhow::Result<()> {
407 : // FIXME: consider newtype for DeletedIndexPart.
408 0 : let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
409 0 : "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
410 0 : ))?;
411 :
412 0 : let mut upload_queue = self.upload_queue.lock().unwrap();
413 0 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
414 0 : self.update_remote_physical_size_gauge(Some(index_part));
415 0 : self.stop_impl(&mut upload_queue);
416 0 :
417 0 : upload_queue
418 0 : .stopped_mut()
419 0 : .expect("stopped above")
420 0 : .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
421 0 :
422 0 : Ok(())
423 0 : }
424 :
425 : /// Returns `None` if nothing is yet uplodaded, `Some(disk_consistent_lsn)` otherwise.
426 0 : pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
427 0 : match &mut *self.upload_queue.lock().unwrap() {
428 0 : UploadQueue::Uninitialized => None,
429 0 : UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
430 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
431 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => q
432 0 : .upload_queue_for_deletion
433 0 : .get_last_remote_consistent_lsn_projected(),
434 : }
435 0 : }
436 :
437 0 : pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
438 0 : match &mut *self.upload_queue.lock().unwrap() {
439 0 : UploadQueue::Uninitialized => None,
440 0 : UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
441 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
442 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => Some(
443 0 : q.upload_queue_for_deletion
444 0 : .get_last_remote_consistent_lsn_visible(),
445 0 : ),
446 : }
447 0 : }
448 :
449 : /// Returns true if this timeline was previously detached at this Lsn and the remote timeline
450 : /// client is currently initialized.
451 0 : pub(crate) fn is_previous_ancestor_lsn(&self, lsn: Lsn) -> bool {
452 0 : self.upload_queue
453 0 : .lock()
454 0 : .unwrap()
455 0 : .initialized_mut()
456 0 : .map(|uq| uq.clean.0.lineage.is_previous_ancestor_lsn(lsn))
457 0 : .unwrap_or(false)
458 0 : }
459 :
460 : /// Returns whether the timeline is archived.
461 : /// Return None if the remote index_part hasn't been downloaded yet.
462 0 : pub(crate) fn is_archived(&self) -> Option<bool> {
463 0 : self.upload_queue
464 0 : .lock()
465 0 : .unwrap()
466 0 : .initialized_mut()
467 0 : .map(|q| q.clean.0.archived_at.is_some())
468 0 : .ok()
469 0 : }
470 :
471 5442 : fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
472 5442 : let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
473 4224 : current_remote_index_part
474 4224 : .layer_metadata
475 4224 : .values()
476 52547 : .map(|ilmd| ilmd.file_size)
477 4224 : .sum()
478 : } else {
479 1218 : 0
480 : };
481 5442 : self.metrics.remote_physical_size_gauge.set(size);
482 5442 : }
483 :
484 6 : pub fn get_remote_physical_size(&self) -> u64 {
485 6 : self.metrics.remote_physical_size_gauge.get()
486 6 : }
487 :
488 : //
489 : // Download operations.
490 : //
491 : // These don't use the per-timeline queue. They do use the global semaphore in
492 : // S3Bucket, to limit the total number of concurrent operations, though.
493 : //
494 :
495 : /// Download index file
496 60 : pub async fn download_index_file(
497 60 : &self,
498 60 : cancel: &CancellationToken,
499 60 : ) -> Result<MaybeDeletedIndexPart, DownloadError> {
500 60 : let _unfinished_gauge_guard = self.metrics.call_begin(
501 60 : &RemoteOpFileKind::Index,
502 60 : &RemoteOpKind::Download,
503 60 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
504 60 : reason: "no need for a downloads gauge",
505 60 : },
506 60 : );
507 :
508 60 : let (index_part, _index_generation) = download::download_index_part(
509 60 : &self.storage_impl,
510 60 : &self.tenant_shard_id,
511 60 : &self.timeline_id,
512 60 : self.generation,
513 60 : cancel,
514 60 : )
515 60 : .measure_remote_op(
516 60 : RemoteOpFileKind::Index,
517 60 : RemoteOpKind::Download,
518 60 : Arc::clone(&self.metrics),
519 60 : )
520 294 : .await?;
521 :
522 60 : if index_part.deleted_at.is_some() {
523 0 : Ok(MaybeDeletedIndexPart::Deleted(index_part))
524 : } else {
525 60 : Ok(MaybeDeletedIndexPart::IndexPart(index_part))
526 : }
527 60 : }
528 :
529 : /// Download a (layer) file from `path`, into local filesystem.
530 : ///
531 : /// 'layer_metadata' is the metadata from the remote index file.
532 : ///
533 : /// On success, returns the size of the downloaded file.
534 18 : pub async fn download_layer_file(
535 18 : &self,
536 18 : layer_file_name: &LayerName,
537 18 : layer_metadata: &LayerFileMetadata,
538 18 : local_path: &Utf8Path,
539 18 : cancel: &CancellationToken,
540 18 : ctx: &RequestContext,
541 18 : ) -> Result<u64, DownloadError> {
542 18 : let downloaded_size = {
543 18 : let _unfinished_gauge_guard = self.metrics.call_begin(
544 18 : &RemoteOpFileKind::Layer,
545 18 : &RemoteOpKind::Download,
546 18 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
547 18 : reason: "no need for a downloads gauge",
548 18 : },
549 18 : );
550 18 : download::download_layer_file(
551 18 : self.conf,
552 18 : &self.storage_impl,
553 18 : self.tenant_shard_id,
554 18 : self.timeline_id,
555 18 : layer_file_name,
556 18 : layer_metadata,
557 18 : local_path,
558 18 : cancel,
559 18 : ctx,
560 18 : )
561 18 : .measure_remote_op(
562 18 : RemoteOpFileKind::Layer,
563 18 : RemoteOpKind::Download,
564 18 : Arc::clone(&self.metrics),
565 18 : )
566 274 : .await?
567 : };
568 :
569 18 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
570 18 : REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
571 18 :
572 18 : Ok(downloaded_size)
573 18 : }
574 :
575 : //
576 : // Upload operations.
577 : //
578 :
579 : /// Launch an index-file upload operation in the background, with
580 : /// fully updated metadata.
581 : ///
582 : /// This should only be used to upload initial metadata to remote storage.
583 : ///
584 : /// The upload will be added to the queue immediately, but it
585 : /// won't be performed until all previously scheduled layer file
586 : /// upload operations have completed successfully. This is to
587 : /// ensure that when the index file claims that layers X, Y and Z
588 : /// exist in remote storage, they really do. To wait for the upload
589 : /// to complete, use `wait_completion`.
590 : ///
591 : /// If there were any changes to the list of files, i.e. if any
592 : /// layer file uploads were scheduled, since the last index file
593 : /// upload, those will be included too.
594 684 : pub fn schedule_index_upload_for_full_metadata_update(
595 684 : self: &Arc<Self>,
596 684 : metadata: &TimelineMetadata,
597 684 : ) -> anyhow::Result<()> {
598 684 : let mut guard = self.upload_queue.lock().unwrap();
599 684 : let upload_queue = guard.initialized_mut()?;
600 :
601 : // As documented in the struct definition, it's ok for latest_metadata to be
602 : // ahead of what's _actually_ on the remote during index upload.
603 684 : upload_queue.dirty.metadata = metadata.clone();
604 684 :
605 684 : self.schedule_index_upload(upload_queue)?;
606 :
607 684 : Ok(())
608 684 : }
609 :
610 : /// Launch an index-file upload operation in the background, with only parts of the metadata
611 : /// updated.
612 : ///
613 : /// This is the regular way of updating metadata on layer flushes or Gc.
614 : ///
615 : /// Using this lighter update mechanism allows for reparenting and detaching without changes to
616 : /// `index_part.json`, while being more clear on what values update regularly.
617 3438 : pub(crate) fn schedule_index_upload_for_metadata_update(
618 3438 : self: &Arc<Self>,
619 3438 : update: &MetadataUpdate,
620 3438 : ) -> anyhow::Result<()> {
621 3438 : let mut guard = self.upload_queue.lock().unwrap();
622 3438 : let upload_queue = guard.initialized_mut()?;
623 :
624 3438 : upload_queue.dirty.metadata.apply(update);
625 3438 :
626 3438 : self.schedule_index_upload(upload_queue)?;
627 :
628 3438 : Ok(())
629 3438 : }
630 :
631 : /// Launch an index-file upload operation in the background, with only the `aux_file_policy` flag updated.
632 48 : pub(crate) fn schedule_index_upload_for_aux_file_policy_update(
633 48 : self: &Arc<Self>,
634 48 : last_aux_file_policy: Option<AuxFilePolicy>,
635 48 : ) -> anyhow::Result<()> {
636 48 : let mut guard = self.upload_queue.lock().unwrap();
637 48 : let upload_queue = guard.initialized_mut()?;
638 48 : upload_queue.dirty.last_aux_file_policy = last_aux_file_policy;
639 48 : self.schedule_index_upload(upload_queue)?;
640 48 : Ok(())
641 48 : }
642 :
643 : /// Launch an index-file upload operation in the background, with only the `archived_at` field updated.
644 : ///
645 : /// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded,
646 : /// so either if the change is already sitting in the queue, but not commited yet, or the change has not
647 : /// been in the queue yet.
648 0 : pub(crate) fn schedule_index_upload_for_timeline_archival_state(
649 0 : self: &Arc<Self>,
650 0 : state: TimelineArchivalState,
651 0 : ) -> anyhow::Result<bool> {
652 0 : let mut guard = self.upload_queue.lock().unwrap();
653 0 : let upload_queue = guard.initialized_mut()?;
654 :
655 : /// Returns Some(_) if a change is needed, and Some(true) if it's a
656 : /// change needed to set archived_at.
657 0 : fn need_change(
658 0 : archived_at: &Option<NaiveDateTime>,
659 0 : state: TimelineArchivalState,
660 0 : ) -> Option<bool> {
661 0 : match (archived_at, state) {
662 : (Some(_), TimelineArchivalState::Archived)
663 : | (None, TimelineArchivalState::Unarchived) => {
664 : // Nothing to do
665 0 : tracing::info!("intended state matches present state");
666 0 : None
667 : }
668 0 : (None, TimelineArchivalState::Archived) => Some(true),
669 0 : (Some(_), TimelineArchivalState::Unarchived) => Some(false),
670 : }
671 0 : }
672 0 : let need_upload_scheduled = need_change(&upload_queue.dirty.archived_at, state);
673 :
674 0 : if let Some(archived_at_set) = need_upload_scheduled {
675 0 : let intended_archived_at = archived_at_set.then(|| Utc::now().naive_utc());
676 0 : upload_queue.dirty.archived_at = intended_archived_at;
677 0 : self.schedule_index_upload(upload_queue)?;
678 0 : }
679 :
680 0 : let need_wait = need_change(&upload_queue.clean.0.archived_at, state).is_some();
681 0 : Ok(need_wait)
682 0 : }
683 :
684 : ///
685 : /// Launch an index-file upload operation in the background, if necessary.
686 : ///
687 : /// Use this function to schedule the update of the index file after
688 : /// scheduling file uploads or deletions. If no file uploads or deletions
689 : /// have been scheduled since the last index file upload, this does
690 : /// nothing.
691 : ///
692 : /// Like schedule_index_upload_for_metadata_update(), this merely adds
693 : /// the upload to the upload queue and returns quickly.
694 1110 : pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> Result<(), NotInitialized> {
695 1110 : let mut guard = self.upload_queue.lock().unwrap();
696 1110 : let upload_queue = guard.initialized_mut()?;
697 :
698 1110 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
699 42 : self.schedule_index_upload(upload_queue)?;
700 1068 : }
701 :
702 1110 : Ok(())
703 1110 : }
704 :
705 : /// Launch an index-file upload operation in the background (internal function)
706 4362 : fn schedule_index_upload(
707 4362 : self: &Arc<Self>,
708 4362 : upload_queue: &mut UploadQueueInitialized,
709 4362 : ) -> Result<(), NotInitialized> {
710 4362 : let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn();
711 4362 : // fix up the duplicated field
712 4362 : upload_queue.dirty.disk_consistent_lsn = disk_consistent_lsn;
713 4362 :
714 4362 : // make sure it serializes before doing it in perform_upload_task so that it doesn't
715 4362 : // look like a retryable error
716 4362 : let void = std::io::sink();
717 4362 : serde_json::to_writer(void, &upload_queue.dirty).expect("serialize index_part.json");
718 4362 :
719 4362 : let index_part = &upload_queue.dirty;
720 4362 :
721 4362 : info!(
722 0 : "scheduling metadata upload up to consistent LSN {disk_consistent_lsn} with {} files ({} changed)",
723 0 : index_part.layer_metadata.len(),
724 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
725 : );
726 :
727 4362 : let op = UploadOp::UploadMetadata {
728 4362 : uploaded: Box::new(index_part.clone()),
729 4362 : };
730 4362 : self.metric_begin(&op);
731 4362 : upload_queue.queued_operations.push_back(op);
732 4362 : upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
733 4362 :
734 4362 : // Launch the task immediately, if possible
735 4362 : self.launch_queued_tasks(upload_queue);
736 4362 : Ok(())
737 4362 : }
738 :
739 : /// Reparent this timeline to a new parent.
740 : ///
741 : /// A retryable step of timeline ancestor detach.
742 0 : pub(crate) async fn schedule_reparenting_and_wait(
743 0 : self: &Arc<Self>,
744 0 : new_parent: &TimelineId,
745 0 : ) -> anyhow::Result<()> {
746 0 : let receiver = {
747 0 : let mut guard = self.upload_queue.lock().unwrap();
748 0 : let upload_queue = guard.initialized_mut()?;
749 :
750 0 : let Some(prev) = upload_queue.dirty.metadata.ancestor_timeline() else {
751 0 : return Err(anyhow::anyhow!(
752 0 : "cannot reparent without a current ancestor"
753 0 : ));
754 : };
755 :
756 0 : let uploaded = &upload_queue.clean.0.metadata;
757 0 :
758 0 : if uploaded.ancestor_timeline().is_none() && !uploaded.ancestor_lsn().is_valid() {
759 : // nothing to do
760 0 : None
761 : } else {
762 0 : upload_queue.dirty.metadata.reparent(new_parent);
763 0 : upload_queue.dirty.lineage.record_previous_ancestor(&prev);
764 0 :
765 0 : self.schedule_index_upload(upload_queue)?;
766 :
767 0 : Some(self.schedule_barrier0(upload_queue))
768 : }
769 : };
770 :
771 0 : if let Some(receiver) = receiver {
772 0 : Self::wait_completion0(receiver).await?;
773 0 : }
774 0 : Ok(())
775 0 : }
776 :
777 : /// Schedules uploading a new version of `index_part.json` with the given layers added,
778 : /// detaching from ancestor and waits for it to complete.
779 : ///
780 : /// This is used with `Timeline::detach_ancestor` functionality.
781 0 : pub(crate) async fn schedule_adding_existing_layers_to_index_detach_and_wait(
782 0 : self: &Arc<Self>,
783 0 : layers: &[Layer],
784 0 : adopted: (TimelineId, Lsn),
785 0 : ) -> anyhow::Result<()> {
786 0 : let barrier = {
787 0 : let mut guard = self.upload_queue.lock().unwrap();
788 0 : let upload_queue = guard.initialized_mut()?;
789 :
790 0 : if upload_queue.clean.0.lineage.detached_previous_ancestor() == Some(adopted) {
791 0 : None
792 : } else {
793 0 : upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
794 0 : upload_queue.dirty.lineage.record_detaching(&adopted);
795 :
796 0 : for layer in layers {
797 0 : let prev = upload_queue
798 0 : .dirty
799 0 : .layer_metadata
800 0 : .insert(layer.layer_desc().layer_name(), layer.metadata());
801 0 : assert!(prev.is_none(), "copied layer existed already {layer}");
802 : }
803 :
804 0 : self.schedule_index_upload(upload_queue)?;
805 :
806 0 : Some(self.schedule_barrier0(upload_queue))
807 : }
808 : };
809 :
810 0 : if let Some(barrier) = barrier {
811 0 : Self::wait_completion0(barrier).await?;
812 0 : }
813 0 : Ok(())
814 0 : }
815 :
816 : /// Adds a gc blocking reason for this timeline if one does not exist already.
817 : ///
818 : /// A retryable step of timeline detach ancestor.
819 : ///
820 : /// Returns a future which waits until the completion of the upload.
821 0 : pub(crate) fn schedule_insert_gc_block_reason(
822 0 : self: &Arc<Self>,
823 0 : reason: index::GcBlockingReason,
824 0 : ) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
825 0 : {
826 0 : let maybe_barrier = {
827 0 : let mut guard = self.upload_queue.lock().unwrap();
828 0 : let upload_queue = guard.initialized_mut()?;
829 :
830 0 : if let index::GcBlockingReason::DetachAncestor = reason {
831 0 : if upload_queue.dirty.metadata.ancestor_timeline().is_none() {
832 0 : drop(guard);
833 0 : panic!("cannot start detach ancestor if there is nothing to detach from");
834 0 : }
835 0 : }
836 :
837 0 : let wanted = |x: Option<&index::GcBlocking>| x.is_some_and(|x| x.blocked_by(reason));
838 :
839 0 : let current = upload_queue.dirty.gc_blocking.as_ref();
840 0 : let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
841 0 :
842 0 : match (current, uploaded) {
843 0 : (x, y) if wanted(x) && wanted(y) => None,
844 0 : (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
845 : // Usual case: !wanted(x) && !wanted(y)
846 : //
847 : // Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to
848 : // turn on and off some reason.
849 0 : (x, y) => {
850 0 : if !wanted(x) && wanted(y) {
851 : // this could be avoided by having external in-memory synchronization, like
852 : // timeline detach ancestor
853 0 : warn!(?reason, op="insert", "unexpected: two racing processes to enable and disable a gc blocking reason");
854 0 : }
855 :
856 : // at this point, the metadata must always show that there is a parent
857 0 : upload_queue.dirty.gc_blocking = current
858 0 : .map(|x| x.with_reason(reason))
859 0 : .or_else(|| Some(index::GcBlocking::started_now_for(reason)));
860 0 : self.schedule_index_upload(upload_queue)?;
861 0 : Some(self.schedule_barrier0(upload_queue))
862 : }
863 : }
864 : };
865 :
866 0 : Ok(async move {
867 0 : if let Some(barrier) = maybe_barrier {
868 0 : Self::wait_completion0(barrier).await?;
869 0 : }
870 0 : Ok(())
871 0 : })
872 0 : }
873 :
874 : /// Removes a gc blocking reason for this timeline if one exists.
875 : ///
876 : /// A retryable step of timeline detach ancestor.
877 : ///
878 : /// Returns a future which waits until the completion of the upload.
879 0 : pub(crate) fn schedule_remove_gc_block_reason(
880 0 : self: &Arc<Self>,
881 0 : reason: index::GcBlockingReason,
882 0 : ) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
883 0 : {
884 0 : let maybe_barrier = {
885 0 : let mut guard = self.upload_queue.lock().unwrap();
886 0 : let upload_queue = guard.initialized_mut()?;
887 :
888 0 : if let index::GcBlockingReason::DetachAncestor = reason {
889 0 : if !upload_queue.clean.0.lineage.is_detached_from_ancestor() {
890 0 : drop(guard);
891 0 : panic!("cannot complete timeline_ancestor_detach while not detached");
892 0 : }
893 0 : }
894 :
895 0 : let wanted = |x: Option<&index::GcBlocking>| {
896 0 : x.is_none() || x.is_some_and(|b| !b.blocked_by(reason))
897 0 : };
898 :
899 0 : let current = upload_queue.dirty.gc_blocking.as_ref();
900 0 : let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
901 0 :
902 0 : match (current, uploaded) {
903 0 : (x, y) if wanted(x) && wanted(y) => None,
904 0 : (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
905 0 : (x, y) => {
906 0 : if !wanted(x) && wanted(y) {
907 0 : warn!(?reason, op="remove", "unexpected: two racing processes to enable and disable a gc blocking reason (remove)");
908 0 : }
909 :
910 0 : upload_queue.dirty.gc_blocking =
911 0 : current.as_ref().and_then(|x| x.without_reason(reason));
912 0 : assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
913 : // FIXME: bogus ?
914 0 : self.schedule_index_upload(upload_queue)?;
915 0 : Some(self.schedule_barrier0(upload_queue))
916 : }
917 : }
918 : };
919 :
920 0 : Ok(async move {
921 0 : if let Some(barrier) = maybe_barrier {
922 0 : Self::wait_completion0(barrier).await?;
923 0 : }
924 0 : Ok(())
925 0 : })
926 0 : }
927 :
928 : /// Launch an upload operation in the background; the file is added to be included in next
929 : /// `index_part.json` upload.
930 3558 : pub(crate) fn schedule_layer_file_upload(
931 3558 : self: &Arc<Self>,
932 3558 : layer: ResidentLayer,
933 3558 : ) -> Result<(), NotInitialized> {
934 3558 : let mut guard = self.upload_queue.lock().unwrap();
935 3558 : let upload_queue = guard.initialized_mut()?;
936 :
937 3558 : self.schedule_layer_file_upload0(upload_queue, layer);
938 3558 : self.launch_queued_tasks(upload_queue);
939 3558 : Ok(())
940 3558 : }
941 :
942 4548 : fn schedule_layer_file_upload0(
943 4548 : self: &Arc<Self>,
944 4548 : upload_queue: &mut UploadQueueInitialized,
945 4548 : layer: ResidentLayer,
946 4548 : ) {
947 4548 : let metadata = layer.metadata();
948 4548 :
949 4548 : upload_queue
950 4548 : .dirty
951 4548 : .layer_metadata
952 4548 : .insert(layer.layer_desc().layer_name(), metadata.clone());
953 4548 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
954 4548 :
955 4548 : info!(
956 : gen=?metadata.generation,
957 : shard=?metadata.shard,
958 0 : "scheduled layer file upload {layer}",
959 : );
960 :
961 4548 : let op = UploadOp::UploadLayer(layer, metadata);
962 4548 : self.metric_begin(&op);
963 4548 : upload_queue.queued_operations.push_back(op);
964 4548 : }
965 :
966 : /// Launch a delete operation in the background.
967 : ///
968 : /// The operation does not modify local filesystem state.
969 : ///
970 : /// Note: This schedules an index file upload before the deletions. The
971 : /// deletion won't actually be performed, until all previously scheduled
972 : /// upload operations, and the index file upload, have completed
973 : /// successfully.
974 24 : pub fn schedule_layer_file_deletion(
975 24 : self: &Arc<Self>,
976 24 : names: &[LayerName],
977 24 : ) -> anyhow::Result<()> {
978 24 : let mut guard = self.upload_queue.lock().unwrap();
979 24 : let upload_queue = guard.initialized_mut()?;
980 :
981 24 : let with_metadata = self
982 24 : .schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned())?;
983 :
984 24 : self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
985 24 :
986 24 : // Launch the tasks immediately, if possible
987 24 : self.launch_queued_tasks(upload_queue);
988 24 : Ok(())
989 24 : }
990 :
991 : /// Unlinks the layer files from `index_part.json` but does not yet schedule deletion for the
992 : /// layer files, leaving them dangling.
993 : ///
994 : /// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
995 : /// is invoked on them.
996 24 : pub(crate) fn schedule_gc_update(
997 24 : self: &Arc<Self>,
998 24 : gc_layers: &[Layer],
999 24 : ) -> Result<(), NotInitialized> {
1000 24 : let mut guard = self.upload_queue.lock().unwrap();
1001 24 : let upload_queue = guard.initialized_mut()?;
1002 :
1003 : // just forget the return value; after uploading the next index_part.json, we can consider
1004 : // the layer files as "dangling". this is fine, at worst case we create work for the
1005 : // scrubber.
1006 :
1007 30 : let names = gc_layers.iter().map(|x| x.layer_desc().layer_name());
1008 24 :
1009 24 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names)?;
1010 :
1011 24 : self.launch_queued_tasks(upload_queue);
1012 24 :
1013 24 : Ok(())
1014 24 : }
1015 :
1016 : /// Update the remote index file, removing the to-be-deleted files from the index,
1017 : /// allowing scheduling of actual deletions later.
1018 198 : fn schedule_unlinking_of_layers_from_index_part0<I>(
1019 198 : self: &Arc<Self>,
1020 198 : upload_queue: &mut UploadQueueInitialized,
1021 198 : names: I,
1022 198 : ) -> Result<Vec<(LayerName, LayerFileMetadata)>, NotInitialized>
1023 198 : where
1024 198 : I: IntoIterator<Item = LayerName>,
1025 198 : {
1026 198 : // Decorate our list of names with each name's metadata, dropping
1027 198 : // names that are unexpectedly missing from our metadata. This metadata
1028 198 : // is later used when physically deleting layers, to construct key paths.
1029 198 : let with_metadata: Vec<_> = names
1030 198 : .into_iter()
1031 1386 : .filter_map(|name| {
1032 1386 : let meta = upload_queue.dirty.layer_metadata.remove(&name);
1033 :
1034 1386 : if let Some(meta) = meta {
1035 1284 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
1036 1284 : Some((name, meta))
1037 : } else {
1038 : // This can only happen if we forgot to to schedule the file upload
1039 : // before scheduling the delete. Log it because it is a rare/strange
1040 : // situation, and in case something is misbehaving, we'd like to know which
1041 : // layers experienced this.
1042 102 : info!("Deleting layer {name} not found in latest_files list, never uploaded?");
1043 102 : None
1044 : }
1045 1386 : })
1046 198 : .collect();
1047 :
1048 : #[cfg(feature = "testing")]
1049 1482 : for (name, metadata) in &with_metadata {
1050 1284 : let gen = metadata.generation;
1051 1284 : if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen) {
1052 0 : if unexpected == gen {
1053 0 : tracing::error!("{name} was unlinked twice with same generation");
1054 : } else {
1055 0 : tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
1056 : }
1057 1284 : }
1058 : }
1059 :
1060 : // after unlinking files from the upload_queue.latest_files we must always schedule an
1061 : // index_part update, because that needs to be uploaded before we can actually delete the
1062 : // files.
1063 198 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
1064 150 : self.schedule_index_upload(upload_queue)?;
1065 48 : }
1066 :
1067 198 : Ok(with_metadata)
1068 198 : }
1069 :
1070 : /// Schedules deletion for layer files which have previously been unlinked from the
1071 : /// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
1072 1384 : pub(crate) fn schedule_deletion_of_unlinked(
1073 1384 : self: &Arc<Self>,
1074 1384 : layers: Vec<(LayerName, LayerFileMetadata)>,
1075 1384 : ) -> anyhow::Result<()> {
1076 1384 : let mut guard = self.upload_queue.lock().unwrap();
1077 1384 : let upload_queue = guard.initialized_mut()?;
1078 :
1079 1384 : self.schedule_deletion_of_unlinked0(upload_queue, layers);
1080 1384 : self.launch_queued_tasks(upload_queue);
1081 1384 : Ok(())
1082 1384 : }
1083 :
1084 1408 : fn schedule_deletion_of_unlinked0(
1085 1408 : self: &Arc<Self>,
1086 1408 : upload_queue: &mut UploadQueueInitialized,
1087 1408 : mut with_metadata: Vec<(LayerName, LayerFileMetadata)>,
1088 1408 : ) {
1089 1408 : // Filter out any layers which were not created by this tenant shard. These are
1090 1408 : // layers that originate from some ancestor shard after a split, and may still
1091 1408 : // be referenced by other shards. We are free to delete them locally and remove
1092 1408 : // them from our index (and would have already done so when we reach this point
1093 1408 : // in the code), but we may not delete them remotely.
1094 1408 : with_metadata.retain(|(name, meta)| {
1095 1390 : let retain = meta.shard.shard_number == self.tenant_shard_id.shard_number
1096 1390 : && meta.shard.shard_count == self.tenant_shard_id.shard_count;
1097 1390 : if !retain {
1098 0 : tracing::debug!(
1099 0 : "Skipping deletion of ancestor-shard layer {name}, from shard {}",
1100 : meta.shard
1101 : );
1102 1390 : }
1103 1390 : retain
1104 1408 : });
1105 :
1106 2798 : for (name, meta) in &with_metadata {
1107 1390 : info!(
1108 0 : "scheduling deletion of layer {}{} (shard {})",
1109 0 : name,
1110 0 : meta.generation.get_suffix(),
1111 : meta.shard
1112 : );
1113 : }
1114 :
1115 : #[cfg(feature = "testing")]
1116 2798 : for (name, meta) in &with_metadata {
1117 1390 : let gen = meta.generation;
1118 1390 : match upload_queue.dangling_files.remove(name) {
1119 1272 : Some(same) if same == gen => { /* expected */ }
1120 0 : Some(other) => {
1121 0 : tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
1122 : }
1123 : None => {
1124 118 : tracing::error!("{name} was unlinked but was not dangling");
1125 : }
1126 : }
1127 : }
1128 :
1129 : // schedule the actual deletions
1130 1408 : if with_metadata.is_empty() {
1131 : // avoid scheduling the op & bumping the metric
1132 18 : return;
1133 1390 : }
1134 1390 : let op = UploadOp::Delete(Delete {
1135 1390 : layers: with_metadata,
1136 1390 : });
1137 1390 : self.metric_begin(&op);
1138 1390 : upload_queue.queued_operations.push_back(op);
1139 1408 : }
1140 :
1141 : /// Schedules a compaction update to the remote `index_part.json`.
1142 : ///
1143 : /// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
1144 150 : pub(crate) fn schedule_compaction_update(
1145 150 : self: &Arc<Self>,
1146 150 : compacted_from: &[Layer],
1147 150 : compacted_to: &[ResidentLayer],
1148 150 : ) -> Result<(), NotInitialized> {
1149 150 : let mut guard = self.upload_queue.lock().unwrap();
1150 150 : let upload_queue = guard.initialized_mut()?;
1151 :
1152 1140 : for layer in compacted_to {
1153 990 : self.schedule_layer_file_upload0(upload_queue, layer.clone());
1154 990 : }
1155 :
1156 1350 : let names = compacted_from.iter().map(|x| x.layer_desc().layer_name());
1157 150 :
1158 150 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names)?;
1159 150 : self.launch_queued_tasks(upload_queue);
1160 150 :
1161 150 : Ok(())
1162 150 : }
1163 :
1164 : /// Wait for all previously scheduled uploads/deletions to complete
1165 3960 : pub(crate) async fn wait_completion(self: &Arc<Self>) -> Result<(), WaitCompletionError> {
1166 3960 : let receiver = {
1167 3960 : let mut guard = self.upload_queue.lock().unwrap();
1168 3960 : let upload_queue = guard
1169 3960 : .initialized_mut()
1170 3960 : .map_err(WaitCompletionError::NotInitialized)?;
1171 3960 : self.schedule_barrier0(upload_queue)
1172 3960 : };
1173 3960 :
1174 3960 : Self::wait_completion0(receiver).await
1175 3960 : }
1176 :
1177 3960 : async fn wait_completion0(
1178 3960 : mut receiver: tokio::sync::watch::Receiver<()>,
1179 3960 : ) -> Result<(), WaitCompletionError> {
1180 3960 : if receiver.changed().await.is_err() {
1181 0 : return Err(WaitCompletionError::UploadQueueShutDownOrStopped);
1182 3960 : }
1183 3960 :
1184 3960 : Ok(())
1185 3960 : }
1186 :
1187 18 : pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
1188 18 : let mut guard = self.upload_queue.lock().unwrap();
1189 18 : let upload_queue = guard.initialized_mut()?;
1190 18 : self.schedule_barrier0(upload_queue);
1191 18 : Ok(())
1192 18 : }
1193 :
1194 3978 : fn schedule_barrier0(
1195 3978 : self: &Arc<Self>,
1196 3978 : upload_queue: &mut UploadQueueInitialized,
1197 3978 : ) -> tokio::sync::watch::Receiver<()> {
1198 3978 : let (sender, receiver) = tokio::sync::watch::channel(());
1199 3978 : let barrier_op = UploadOp::Barrier(sender);
1200 3978 :
1201 3978 : upload_queue.queued_operations.push_back(barrier_op);
1202 3978 : // Don't count this kind of operation!
1203 3978 :
1204 3978 : // Launch the task immediately, if possible
1205 3978 : self.launch_queued_tasks(upload_queue);
1206 3978 :
1207 3978 : receiver
1208 3978 : }
1209 :
1210 : /// Wait for all previously scheduled operations to complete, and then stop.
1211 : ///
1212 : /// Not cancellation safe
1213 18 : pub(crate) async fn shutdown(self: &Arc<Self>) {
1214 18 : // On cancellation the queue is left in ackward state of refusing new operations but
1215 18 : // proper stop is yet to be called. On cancel the original or some later task must call
1216 18 : // `stop` or `shutdown`.
1217 18 : let sg = scopeguard::guard((), |_| {
1218 0 : tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
1219 18 : });
1220 :
1221 18 : let fut = {
1222 18 : let mut guard = self.upload_queue.lock().unwrap();
1223 18 : let upload_queue = match &mut *guard {
1224 0 : UploadQueue::Stopped(_) => return,
1225 : UploadQueue::Uninitialized => {
1226 : // transition into Stopped state
1227 0 : self.stop_impl(&mut guard);
1228 0 : return;
1229 : }
1230 18 : UploadQueue::Initialized(ref mut init) => init,
1231 18 : };
1232 18 :
1233 18 : // if the queue is already stuck due to a shutdown operation which was cancelled, then
1234 18 : // just don't add more of these as they would never complete.
1235 18 : //
1236 18 : // TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue
1237 18 : // in every place we would not have to jump through this hoop, and this method could be
1238 18 : // made cancellable.
1239 18 : if !upload_queue.shutting_down {
1240 18 : upload_queue.shutting_down = true;
1241 18 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
1242 18 : // this operation is not counted similar to Barrier
1243 18 :
1244 18 : self.launch_queued_tasks(upload_queue);
1245 18 : }
1246 :
1247 18 : upload_queue.shutdown_ready.clone().acquire_owned()
1248 : };
1249 :
1250 18 : let res = fut.await;
1251 :
1252 18 : scopeguard::ScopeGuard::into_inner(sg);
1253 18 :
1254 18 : match res {
1255 0 : Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
1256 18 : Err(_closed) => {
1257 18 : // expected
1258 18 : }
1259 18 : }
1260 18 :
1261 18 : self.stop();
1262 18 : }
1263 :
1264 : /// Set the deleted_at field in the remote index file.
1265 : ///
1266 : /// This fails if the upload queue has not been `stop()`ed.
1267 : ///
1268 : /// The caller is responsible for calling `stop()` AND for waiting
1269 : /// for any ongoing upload tasks to finish after `stop()` has succeeded.
1270 : /// Check method [`RemoteTimelineClient::stop`] for details.
1271 0 : #[instrument(skip_all)]
1272 : pub(crate) async fn persist_index_part_with_deleted_flag(
1273 : self: &Arc<Self>,
1274 : ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
1275 : let index_part_with_deleted_at = {
1276 : let mut locked = self.upload_queue.lock().unwrap();
1277 :
1278 : // We must be in stopped state because otherwise
1279 : // we can have inprogress index part upload that can overwrite the file
1280 : // with missing is_deleted flag that we going to set below
1281 : let stopped = locked.stopped_mut()?;
1282 :
1283 : match stopped.deleted_at {
1284 : SetDeletedFlagProgress::NotRunning => (), // proceed
1285 : SetDeletedFlagProgress::InProgress(at) => {
1286 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
1287 : }
1288 : SetDeletedFlagProgress::Successful(at) => {
1289 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
1290 : }
1291 : };
1292 : let deleted_at = Utc::now().naive_utc();
1293 : stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
1294 :
1295 : let mut index_part = stopped.upload_queue_for_deletion.dirty.clone();
1296 : index_part.deleted_at = Some(deleted_at);
1297 : index_part
1298 : };
1299 :
1300 0 : let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
1301 0 : let mut locked = self_clone.upload_queue.lock().unwrap();
1302 0 : let stopped = locked
1303 0 : .stopped_mut()
1304 0 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1305 0 : stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
1306 0 : });
1307 :
1308 : pausable_failpoint!("persist_deleted_index_part");
1309 :
1310 : backoff::retry(
1311 0 : || {
1312 0 : upload::upload_index_part(
1313 0 : &self.storage_impl,
1314 0 : &self.tenant_shard_id,
1315 0 : &self.timeline_id,
1316 0 : self.generation,
1317 0 : &index_part_with_deleted_at,
1318 0 : &self.cancel,
1319 0 : )
1320 0 : },
1321 0 : |_e| false,
1322 : 1,
1323 : // have just a couple of attempts
1324 : // when executed as part of timeline deletion this happens in context of api call
1325 : // when executed as part of tenant deletion this happens in the background
1326 : 2,
1327 : "persist_index_part_with_deleted_flag",
1328 : &self.cancel,
1329 : )
1330 : .await
1331 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1332 0 : .and_then(|x| x)?;
1333 :
1334 : // all good, disarm the guard and mark as success
1335 : ScopeGuard::into_inner(undo_deleted_at);
1336 : {
1337 : let mut locked = self.upload_queue.lock().unwrap();
1338 :
1339 : let stopped = locked
1340 : .stopped_mut()
1341 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1342 : stopped.deleted_at = SetDeletedFlagProgress::Successful(
1343 : index_part_with_deleted_at
1344 : .deleted_at
1345 : .expect("we set it above"),
1346 : );
1347 : }
1348 :
1349 : Ok(())
1350 : }
1351 :
1352 0 : pub(crate) fn is_deleting(&self) -> bool {
1353 0 : let mut locked = self.upload_queue.lock().unwrap();
1354 0 : locked.stopped_mut().is_ok()
1355 0 : }
1356 :
1357 0 : pub(crate) async fn preserve_initdb_archive(
1358 0 : self: &Arc<Self>,
1359 0 : tenant_id: &TenantId,
1360 0 : timeline_id: &TimelineId,
1361 0 : cancel: &CancellationToken,
1362 0 : ) -> anyhow::Result<()> {
1363 0 : backoff::retry(
1364 0 : || async {
1365 0 : upload::preserve_initdb_archive(&self.storage_impl, tenant_id, timeline_id, cancel)
1366 0 : .await
1367 0 : },
1368 0 : TimeoutOrCancel::caused_by_cancel,
1369 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
1370 0 : FAILED_REMOTE_OP_RETRIES,
1371 0 : "preserve_initdb_tar_zst",
1372 0 : &cancel.clone(),
1373 0 : )
1374 0 : .await
1375 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1376 0 : .and_then(|x| x)
1377 0 : .context("backing up initdb archive")?;
1378 0 : Ok(())
1379 0 : }
1380 :
1381 : /// Uploads the given layer **without** adding it to be part of a future `index_part.json` upload.
1382 : ///
1383 : /// This is not normally needed.
1384 0 : pub(crate) async fn upload_layer_file(
1385 0 : self: &Arc<Self>,
1386 0 : uploaded: &ResidentLayer,
1387 0 : cancel: &CancellationToken,
1388 0 : ) -> anyhow::Result<()> {
1389 0 : let remote_path = remote_layer_path(
1390 0 : &self.tenant_shard_id.tenant_id,
1391 0 : &self.timeline_id,
1392 0 : self.tenant_shard_id.to_index(),
1393 0 : &uploaded.layer_desc().layer_name(),
1394 0 : uploaded.metadata().generation,
1395 0 : );
1396 0 :
1397 0 : backoff::retry(
1398 0 : || async {
1399 0 : upload::upload_timeline_layer(
1400 0 : &self.storage_impl,
1401 0 : uploaded.local_path(),
1402 0 : &remote_path,
1403 0 : uploaded.metadata().file_size,
1404 0 : cancel,
1405 0 : )
1406 0 : .await
1407 0 : },
1408 0 : TimeoutOrCancel::caused_by_cancel,
1409 0 : FAILED_UPLOAD_WARN_THRESHOLD,
1410 0 : FAILED_REMOTE_OP_RETRIES,
1411 0 : "upload a layer without adding it to latest files",
1412 0 : cancel,
1413 0 : )
1414 0 : .await
1415 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1416 0 : .and_then(|x| x)
1417 0 : .context("upload a layer without adding it to latest files")
1418 0 : }
1419 :
1420 : /// Copies the `adopted` remote existing layer to the remote path of `adopted_as`. The layer is
1421 : /// not added to be part of a future `index_part.json` upload.
1422 0 : pub(crate) async fn copy_timeline_layer(
1423 0 : self: &Arc<Self>,
1424 0 : adopted: &Layer,
1425 0 : adopted_as: &Layer,
1426 0 : cancel: &CancellationToken,
1427 0 : ) -> anyhow::Result<()> {
1428 0 : let source_remote_path = remote_layer_path(
1429 0 : &self.tenant_shard_id.tenant_id,
1430 0 : &adopted
1431 0 : .get_timeline_id()
1432 0 : .expect("Source timeline should be alive"),
1433 0 : self.tenant_shard_id.to_index(),
1434 0 : &adopted.layer_desc().layer_name(),
1435 0 : adopted.metadata().generation,
1436 0 : );
1437 0 :
1438 0 : let target_remote_path = remote_layer_path(
1439 0 : &self.tenant_shard_id.tenant_id,
1440 0 : &self.timeline_id,
1441 0 : self.tenant_shard_id.to_index(),
1442 0 : &adopted_as.layer_desc().layer_name(),
1443 0 : adopted_as.metadata().generation,
1444 0 : );
1445 0 :
1446 0 : backoff::retry(
1447 0 : || async {
1448 0 : upload::copy_timeline_layer(
1449 0 : &self.storage_impl,
1450 0 : &source_remote_path,
1451 0 : &target_remote_path,
1452 0 : cancel,
1453 0 : )
1454 0 : .await
1455 0 : },
1456 0 : TimeoutOrCancel::caused_by_cancel,
1457 0 : FAILED_UPLOAD_WARN_THRESHOLD,
1458 0 : FAILED_REMOTE_OP_RETRIES,
1459 0 : "copy timeline layer",
1460 0 : cancel,
1461 0 : )
1462 0 : .await
1463 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1464 0 : .and_then(|x| x)
1465 0 : .context("remote copy timeline layer")
1466 0 : }
1467 :
1468 0 : async fn flush_deletion_queue(&self) -> Result<(), DeletionQueueError> {
1469 0 : match tokio::time::timeout(
1470 0 : DELETION_QUEUE_FLUSH_TIMEOUT,
1471 0 : self.deletion_queue_client.flush_immediate(),
1472 0 : )
1473 0 : .await
1474 : {
1475 0 : Ok(result) => result,
1476 0 : Err(_timeout) => {
1477 0 : // Flushing remote deletions is not mandatory: we flush here to make the system easier to test, and
1478 0 : // to ensure that _usually_ objects are really gone after a DELETE is acked. However, in case of deletion
1479 0 : // queue issues (https://github.com/neondatabase/neon/issues/6440), we don't want to wait indefinitely here.
1480 0 : tracing::warn!(
1481 0 : "Timed out waiting for deletion queue flush, acking deletion anyway"
1482 : );
1483 0 : Ok(())
1484 : }
1485 : }
1486 0 : }
1487 :
1488 : /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
1489 : /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
1490 : /// deletes leaked files if any and proceeds with deletion of index file at the end.
1491 0 : pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
1492 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1493 :
1494 0 : let layers: Vec<RemotePath> = {
1495 0 : let mut locked = self.upload_queue.lock().unwrap();
1496 0 : let stopped = locked.stopped_mut()?;
1497 :
1498 0 : if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
1499 0 : anyhow::bail!("deleted_at is not set")
1500 0 : }
1501 0 :
1502 0 : debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
1503 :
1504 0 : stopped
1505 0 : .upload_queue_for_deletion
1506 0 : .dirty
1507 0 : .layer_metadata
1508 0 : .drain()
1509 0 : .filter(|(_file_name, meta)| {
1510 0 : // Filter out layers that belonged to an ancestor shard. Since we are deleting the whole timeline from
1511 0 : // all shards anyway, we _could_ delete these, but
1512 0 : // - it creates a potential race if other shards are still
1513 0 : // using the layers while this shard deletes them.
1514 0 : // - it means that if we rolled back the shard split, the ancestor shards would be in a state where
1515 0 : // these timelines are present but corrupt (their index exists but some layers don't)
1516 0 : //
1517 0 : // These layers will eventually be cleaned up by the scrubber when it does physical GC.
1518 0 : meta.shard.shard_number == self.tenant_shard_id.shard_number
1519 0 : && meta.shard.shard_count == self.tenant_shard_id.shard_count
1520 0 : })
1521 0 : .map(|(file_name, meta)| {
1522 0 : remote_layer_path(
1523 0 : &self.tenant_shard_id.tenant_id,
1524 0 : &self.timeline_id,
1525 0 : meta.shard,
1526 0 : &file_name,
1527 0 : meta.generation,
1528 0 : )
1529 0 : })
1530 0 : .collect()
1531 0 : };
1532 0 :
1533 0 : let layer_deletion_count = layers.len();
1534 0 : self.deletion_queue_client.push_immediate(layers).await?;
1535 :
1536 : // Delete the initdb.tar.zst, which is not always present, but deletion attempts of
1537 : // inexistant objects are not considered errors.
1538 0 : let initdb_path =
1539 0 : remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id);
1540 0 : self.deletion_queue_client
1541 0 : .push_immediate(vec![initdb_path])
1542 0 : .await?;
1543 :
1544 : // Do not delete index part yet, it is needed for possible retry. If we remove it first
1545 : // and retry will arrive to different pageserver there wont be any traces of it on remote storage
1546 0 : let timeline_storage_path = remote_timeline_path(&self.tenant_shard_id, &self.timeline_id);
1547 0 :
1548 0 : // Execute all pending deletions, so that when we proceed to do a listing below, we aren't
1549 0 : // taking the burden of listing all the layers that we already know we should delete.
1550 0 : self.flush_deletion_queue().await?;
1551 :
1552 0 : let cancel = shutdown_token();
1553 :
1554 0 : let remaining = download_retry(
1555 0 : || async {
1556 0 : self.storage_impl
1557 0 : .list(
1558 0 : Some(&timeline_storage_path),
1559 0 : ListingMode::NoDelimiter,
1560 0 : None,
1561 0 : &cancel,
1562 0 : )
1563 0 : .await
1564 0 : },
1565 0 : "list remaining files",
1566 0 : &cancel,
1567 0 : )
1568 0 : .await
1569 0 : .context("list files remaining files")?
1570 : .keys;
1571 :
1572 : // We will delete the current index_part object last, since it acts as a deletion
1573 : // marker via its deleted_at attribute
1574 0 : let latest_index = remaining
1575 0 : .iter()
1576 0 : .filter(|o| {
1577 0 : o.key
1578 0 : .object_name()
1579 0 : .map(|n| n.starts_with(IndexPart::FILE_NAME))
1580 0 : .unwrap_or(false)
1581 0 : })
1582 0 : .filter_map(|o| parse_remote_index_path(o.key.clone()).map(|gen| (o.key.clone(), gen)))
1583 0 : .max_by_key(|i| i.1)
1584 0 : .map(|i| i.0.clone())
1585 0 : .unwrap_or(
1586 0 : // No generation-suffixed indices, assume we are dealing with
1587 0 : // a legacy index.
1588 0 : remote_index_path(&self.tenant_shard_id, &self.timeline_id, Generation::none()),
1589 0 : );
1590 0 :
1591 0 : let remaining_layers: Vec<RemotePath> = remaining
1592 0 : .into_iter()
1593 0 : .filter_map(|o| {
1594 0 : if o.key == latest_index || o.key.object_name() == Some(INITDB_PRESERVED_PATH) {
1595 0 : None
1596 : } else {
1597 0 : Some(o.key)
1598 : }
1599 0 : })
1600 0 : .inspect(|path| {
1601 0 : if let Some(name) = path.object_name() {
1602 0 : info!(%name, "deleting a file not referenced from index_part.json");
1603 : } else {
1604 0 : warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
1605 : }
1606 0 : })
1607 0 : .collect();
1608 0 :
1609 0 : let not_referenced_count = remaining_layers.len();
1610 0 : if !remaining_layers.is_empty() {
1611 0 : self.deletion_queue_client
1612 0 : .push_immediate(remaining_layers)
1613 0 : .await?;
1614 0 : }
1615 :
1616 0 : fail::fail_point!("timeline-delete-before-index-delete", |_| {
1617 0 : Err(anyhow::anyhow!(
1618 0 : "failpoint: timeline-delete-before-index-delete"
1619 0 : ))?
1620 0 : });
1621 :
1622 0 : debug!("enqueuing index part deletion");
1623 0 : self.deletion_queue_client
1624 0 : .push_immediate([latest_index].to_vec())
1625 0 : .await?;
1626 :
1627 : // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
1628 : // for a flush to a persistent deletion list so that we may be sure deletion will occur.
1629 0 : self.flush_deletion_queue().await?;
1630 :
1631 0 : fail::fail_point!("timeline-delete-after-index-delete", |_| {
1632 0 : Err(anyhow::anyhow!(
1633 0 : "failpoint: timeline-delete-after-index-delete"
1634 0 : ))?
1635 0 : });
1636 :
1637 0 : info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
1638 :
1639 0 : Ok(())
1640 0 : }
1641 :
1642 : ///
1643 : /// Pick next tasks from the queue, and start as many of them as possible without violating
1644 : /// the ordering constraints.
1645 : ///
1646 : /// The caller needs to already hold the `upload_queue` lock.
1647 22179 : fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
1648 35702 : while let Some(next_op) = upload_queue.queued_operations.front() {
1649 : // Can we run this task now?
1650 25853 : let can_run_now = match next_op {
1651 : UploadOp::UploadLayer(..) => {
1652 : // Can always be scheduled.
1653 4542 : true
1654 : }
1655 : UploadOp::UploadMetadata { .. } => {
1656 : // These can only be performed after all the preceding operations
1657 : // have finished.
1658 13040 : upload_queue.inprogress_tasks.is_empty()
1659 : }
1660 : UploadOp::Delete(_) => {
1661 : // Wait for preceding uploads to finish. Concurrent deletions are OK, though.
1662 836 : upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
1663 : }
1664 :
1665 : UploadOp::Barrier(_) | UploadOp::Shutdown => {
1666 7435 : upload_queue.inprogress_tasks.is_empty()
1667 : }
1668 : };
1669 :
1670 : // If we cannot launch this task, don't look any further.
1671 : //
1672 : // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
1673 : // them now, but we don't try to do that currently. For example, if the frontmost task
1674 : // is an index-file upload that cannot proceed until preceding uploads have finished, we
1675 : // could still start layer uploads that were scheduled later.
1676 25853 : if !can_run_now {
1677 12312 : break;
1678 13541 : }
1679 13541 :
1680 13541 : if let UploadOp::Shutdown = next_op {
1681 : // leave the op in the queue but do not start more tasks; it will be dropped when
1682 : // the stop is called.
1683 18 : upload_queue.shutdown_ready.close();
1684 18 : break;
1685 13523 : }
1686 13523 :
1687 13523 : // We can launch this task. Remove it from the queue first.
1688 13523 : let next_op = upload_queue.queued_operations.pop_front().unwrap();
1689 13523 :
1690 13523 : debug!("starting op: {}", next_op);
1691 :
1692 : // Update the counters
1693 13523 : match next_op {
1694 4542 : UploadOp::UploadLayer(_, _) => {
1695 4542 : upload_queue.num_inprogress_layer_uploads += 1;
1696 4542 : }
1697 4287 : UploadOp::UploadMetadata { .. } => {
1698 4287 : upload_queue.num_inprogress_metadata_uploads += 1;
1699 4287 : }
1700 716 : UploadOp::Delete(_) => {
1701 716 : upload_queue.num_inprogress_deletions += 1;
1702 716 : }
1703 3978 : UploadOp::Barrier(sender) => {
1704 3978 : sender.send_replace(());
1705 3978 : continue;
1706 : }
1707 0 : UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
1708 : };
1709 :
1710 : // Assign unique ID to this task
1711 9545 : upload_queue.task_counter += 1;
1712 9545 : let upload_task_id = upload_queue.task_counter;
1713 9545 :
1714 9545 : // Add it to the in-progress map
1715 9545 : let task = Arc::new(UploadTask {
1716 9545 : task_id: upload_task_id,
1717 9545 : op: next_op,
1718 9545 : retries: AtomicU32::new(0),
1719 9545 : });
1720 9545 : upload_queue
1721 9545 : .inprogress_tasks
1722 9545 : .insert(task.task_id, Arc::clone(&task));
1723 9545 :
1724 9545 : // Spawn task to perform the task
1725 9545 : let self_rc = Arc::clone(self);
1726 9545 : let tenant_shard_id = self.tenant_shard_id;
1727 9545 : let timeline_id = self.timeline_id;
1728 9545 : task_mgr::spawn(
1729 9545 : &self.runtime,
1730 9545 : TaskKind::RemoteUploadTask,
1731 9545 : self.tenant_shard_id,
1732 9545 : Some(self.timeline_id),
1733 9545 : "remote upload",
1734 9049 : async move {
1735 113924 : self_rc.perform_upload_task(task).await;
1736 8681 : Ok(())
1737 8681 : }
1738 9545 : .instrument(info_span!(parent: None, "remote_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, %upload_task_id)),
1739 : );
1740 :
1741 : // Loop back to process next task
1742 : }
1743 22179 : }
1744 :
1745 : ///
1746 : /// Perform an upload task.
1747 : ///
1748 : /// The task is in the `inprogress_tasks` list. This function will try to
1749 : /// execute it, retrying forever. On successful completion, the task is
1750 : /// removed it from the `inprogress_tasks` list, and any next task(s) in the
1751 : /// queue that were waiting by the completion are launched.
1752 : ///
1753 : /// The task can be shut down, however. That leads to stopping the whole
1754 : /// queue.
1755 : ///
1756 9049 : async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
1757 9049 : let cancel = shutdown_token();
1758 : // Loop to retry until it completes.
1759 : loop {
1760 : // If we're requested to shut down, close up shop and exit.
1761 : //
1762 : // Note: We only check for the shutdown requests between retries, so
1763 : // if a shutdown request arrives while we're busy uploading, in the
1764 : // upload::upload:*() call below, we will wait not exit until it has
1765 : // finished. We probably could cancel the upload by simply dropping
1766 : // the Future, but we're not 100% sure if the remote storage library
1767 : // is cancellation safe, so we don't dare to do that. Hopefully, the
1768 : // upload finishes or times out soon enough.
1769 9049 : if cancel.is_cancelled() {
1770 0 : info!("upload task cancelled by shutdown request");
1771 0 : self.stop();
1772 0 : return;
1773 9049 : }
1774 :
1775 9049 : let upload_result: anyhow::Result<()> = match &task.op {
1776 4095 : UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
1777 4095 : let local_path = layer.local_path();
1778 4095 :
1779 4095 : // We should only be uploading layers created by this `Tenant`'s lifetime, so
1780 4095 : // the metadata in the upload should always match our current generation.
1781 4095 : assert_eq!(layer_metadata.generation, self.generation);
1782 :
1783 4095 : let remote_path = remote_layer_path(
1784 4095 : &self.tenant_shard_id.tenant_id,
1785 4095 : &self.timeline_id,
1786 4095 : layer_metadata.shard,
1787 4095 : &layer.layer_desc().layer_name(),
1788 4095 : layer_metadata.generation,
1789 4095 : );
1790 4095 :
1791 4095 : upload::upload_timeline_layer(
1792 4095 : &self.storage_impl,
1793 4095 : local_path,
1794 4095 : &remote_path,
1795 4095 : layer_metadata.file_size,
1796 4095 : &self.cancel,
1797 4095 : )
1798 4095 : .measure_remote_op(
1799 4095 : RemoteOpFileKind::Layer,
1800 4095 : RemoteOpKind::Upload,
1801 4095 : Arc::clone(&self.metrics),
1802 4095 : )
1803 96570 : .await
1804 : }
1805 4238 : UploadOp::UploadMetadata { ref uploaded } => {
1806 4238 : let res = upload::upload_index_part(
1807 4238 : &self.storage_impl,
1808 4238 : &self.tenant_shard_id,
1809 4238 : &self.timeline_id,
1810 4238 : self.generation,
1811 4238 : uploaded,
1812 4238 : &self.cancel,
1813 4238 : )
1814 4238 : .measure_remote_op(
1815 4238 : RemoteOpFileKind::Index,
1816 4238 : RemoteOpKind::Upload,
1817 4238 : Arc::clone(&self.metrics),
1818 4238 : )
1819 16672 : .await;
1820 4206 : if res.is_ok() {
1821 4206 : self.update_remote_physical_size_gauge(Some(uploaded));
1822 4206 : let mention_having_future_layers = if cfg!(feature = "testing") {
1823 4206 : uploaded
1824 4206 : .layer_metadata
1825 4206 : .keys()
1826 52464 : .any(|x| x.is_in_future(uploaded.metadata.disk_consistent_lsn()))
1827 : } else {
1828 0 : false
1829 : };
1830 4206 : if mention_having_future_layers {
1831 : // find rationale near crate::tenant::timeline::init::cleanup_future_layer
1832 30 : tracing::info!(
1833 0 : disk_consistent_lsn = %uploaded.metadata.disk_consistent_lsn(),
1834 0 : "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup"
1835 : );
1836 4176 : }
1837 0 : }
1838 4206 : res
1839 : }
1840 716 : UploadOp::Delete(delete) => {
1841 716 : pausable_failpoint!("before-delete-layer-pausable");
1842 716 : self.deletion_queue_client
1843 716 : .push_layers(
1844 716 : self.tenant_shard_id,
1845 716 : self.timeline_id,
1846 716 : self.generation,
1847 716 : delete.layers.clone(),
1848 716 : )
1849 0 : .await
1850 716 : .map_err(|e| anyhow::anyhow!(e))
1851 : }
1852 0 : unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
1853 : // unreachable. Barrier operations are handled synchronously in
1854 : // launch_queued_tasks
1855 0 : warn!("unexpected {unexpected:?} operation in perform_upload_task");
1856 0 : break;
1857 : }
1858 : };
1859 :
1860 0 : match upload_result {
1861 : Ok(()) => {
1862 8681 : break;
1863 : }
1864 0 : Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => {
1865 0 : // loop around to do the proper stopping
1866 0 : continue;
1867 : }
1868 0 : Err(e) => {
1869 0 : let retries = task.retries.fetch_add(1, Ordering::SeqCst);
1870 0 :
1871 0 : // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
1872 0 : // or other external reasons. Such issues are relatively regular, so log them
1873 0 : // at info level at first, and only WARN if the operation fails repeatedly.
1874 0 : //
1875 0 : // (See similar logic for downloads in `download::download_retry`)
1876 0 : if retries < FAILED_UPLOAD_WARN_THRESHOLD {
1877 0 : info!(
1878 0 : "failed to perform remote task {}, will retry (attempt {}): {:#}",
1879 0 : task.op, retries, e
1880 : );
1881 : } else {
1882 0 : warn!(
1883 0 : "failed to perform remote task {}, will retry (attempt {}): {:?}",
1884 0 : task.op, retries, e
1885 : );
1886 : }
1887 :
1888 : // sleep until it's time to retry, or we're cancelled
1889 0 : exponential_backoff(
1890 0 : retries,
1891 0 : DEFAULT_BASE_BACKOFF_SECONDS,
1892 0 : DEFAULT_MAX_BACKOFF_SECONDS,
1893 0 : &cancel,
1894 0 : )
1895 0 : .await;
1896 : }
1897 : }
1898 : }
1899 :
1900 8681 : let retries = task.retries.load(Ordering::SeqCst);
1901 8681 : if retries > 0 {
1902 0 : info!(
1903 0 : "remote task {} completed successfully after {} retries",
1904 0 : task.op, retries
1905 : );
1906 : } else {
1907 8681 : debug!("remote task {} completed successfully", task.op);
1908 : }
1909 :
1910 : // The task has completed successfully. Remove it from the in-progress list.
1911 8681 : let lsn_update = {
1912 8681 : let mut upload_queue_guard = self.upload_queue.lock().unwrap();
1913 8681 : let upload_queue = match upload_queue_guard.deref_mut() {
1914 0 : UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
1915 0 : UploadQueue::Stopped(_stopped) => {
1916 0 : None
1917 : },
1918 8681 : UploadQueue::Initialized(qi) => { Some(qi) }
1919 : };
1920 :
1921 8681 : let upload_queue = match upload_queue {
1922 8681 : Some(upload_queue) => upload_queue,
1923 : None => {
1924 0 : info!("another concurrent task already stopped the queue");
1925 0 : return;
1926 : }
1927 : };
1928 :
1929 8681 : upload_queue.inprogress_tasks.remove(&task.task_id);
1930 :
1931 8681 : let lsn_update = match task.op {
1932 : UploadOp::UploadLayer(_, _) => {
1933 3759 : upload_queue.num_inprogress_layer_uploads -= 1;
1934 3759 : None
1935 : }
1936 4206 : UploadOp::UploadMetadata { ref uploaded } => {
1937 4206 : upload_queue.num_inprogress_metadata_uploads -= 1;
1938 4206 :
1939 4206 : // the task id is reused as a monotonicity check for storing the "clean"
1940 4206 : // IndexPart.
1941 4206 : let last_updater = upload_queue.clean.1;
1942 4206 : let is_later = last_updater.is_some_and(|task_id| task_id < task.task_id);
1943 4206 : let monotone = is_later || last_updater.is_none();
1944 :
1945 4206 : assert!(monotone, "no two index uploads should be completing at the same time, prev={last_updater:?}, task.task_id={}", task.task_id);
1946 :
1947 : // not taking ownership is wasteful
1948 4206 : upload_queue.clean.0.clone_from(uploaded);
1949 4206 : upload_queue.clean.1 = Some(task.task_id);
1950 4206 :
1951 4206 : let lsn = upload_queue.clean.0.metadata.disk_consistent_lsn();
1952 4206 :
1953 4206 : if self.generation.is_none() {
1954 : // Legacy mode: skip validating generation
1955 0 : upload_queue.visible_remote_consistent_lsn.store(lsn);
1956 0 : None
1957 : } else {
1958 4206 : Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
1959 : }
1960 : }
1961 : UploadOp::Delete(_) => {
1962 716 : upload_queue.num_inprogress_deletions -= 1;
1963 716 : None
1964 : }
1965 0 : UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
1966 : };
1967 :
1968 : // Launch any queued tasks that were unblocked by this one.
1969 8681 : self.launch_queued_tasks(upload_queue);
1970 8681 : lsn_update
1971 : };
1972 :
1973 8681 : if let Some((lsn, slot)) = lsn_update {
1974 : // Updates to the remote_consistent_lsn we advertise to pageservers
1975 : // are all routed through the DeletionQueue, to enforce important
1976 : // data safety guarantees (see docs/rfcs/025-generation-numbers.md)
1977 4206 : self.deletion_queue_client
1978 4206 : .update_remote_consistent_lsn(
1979 4206 : self.tenant_shard_id,
1980 4206 : self.timeline_id,
1981 4206 : self.generation,
1982 4206 : lsn,
1983 4206 : slot,
1984 4206 : )
1985 0 : .await;
1986 4475 : }
1987 :
1988 8681 : self.metric_end(&task.op);
1989 8681 : }
1990 :
1991 18999 : fn metric_impl(
1992 18999 : &self,
1993 18999 : op: &UploadOp,
1994 18999 : ) -> Option<(
1995 18999 : RemoteOpFileKind,
1996 18999 : RemoteOpKind,
1997 18999 : RemoteTimelineClientMetricsCallTrackSize,
1998 18999 : )> {
1999 : use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
2000 18999 : let res = match op {
2001 8307 : UploadOp::UploadLayer(_, m) => (
2002 8307 : RemoteOpFileKind::Layer,
2003 8307 : RemoteOpKind::Upload,
2004 8307 : RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
2005 8307 : ),
2006 8568 : UploadOp::UploadMetadata { .. } => (
2007 8568 : RemoteOpFileKind::Index,
2008 8568 : RemoteOpKind::Upload,
2009 8568 : DontTrackSize {
2010 8568 : reason: "metadata uploads are tiny",
2011 8568 : },
2012 8568 : ),
2013 2106 : UploadOp::Delete(_delete) => (
2014 2106 : RemoteOpFileKind::Layer,
2015 2106 : RemoteOpKind::Delete,
2016 2106 : DontTrackSize {
2017 2106 : reason: "should we track deletes? positive or negative sign?",
2018 2106 : },
2019 2106 : ),
2020 : UploadOp::Barrier(..) | UploadOp::Shutdown => {
2021 : // we do not account these
2022 18 : return None;
2023 : }
2024 : };
2025 18981 : Some(res)
2026 18999 : }
2027 :
2028 10300 : fn metric_begin(&self, op: &UploadOp) {
2029 10300 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
2030 10300 : Some(x) => x,
2031 0 : None => return,
2032 : };
2033 10300 : let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
2034 10300 : guard.will_decrement_manually(); // in metric_end(), see right below
2035 10300 : }
2036 :
2037 8699 : fn metric_end(&self, op: &UploadOp) {
2038 8699 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
2039 8681 : Some(x) => x,
2040 18 : None => return,
2041 : };
2042 8681 : self.metrics.call_end(&file_kind, &op_kind, track_bytes);
2043 8699 : }
2044 :
2045 : /// Close the upload queue for new operations and cancel queued operations.
2046 : ///
2047 : /// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
2048 : ///
2049 : /// In-progress operations will still be running after this function returns.
2050 : /// Use `task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), Some(self.tenant_shard_id), Some(timeline_id))`
2051 : /// to wait for them to complete, after calling this function.
2052 42 : pub(crate) fn stop(&self) {
2053 42 : // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
2054 42 : // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
2055 42 : // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
2056 42 : let mut guard = self.upload_queue.lock().unwrap();
2057 42 : self.stop_impl(&mut guard);
2058 42 : }
2059 :
2060 42 : fn stop_impl(&self, guard: &mut std::sync::MutexGuard<UploadQueue>) {
2061 42 : match &mut **guard {
2062 : UploadQueue::Uninitialized => {
2063 0 : info!("UploadQueue is in state Uninitialized, nothing to do");
2064 0 : **guard = UploadQueue::Stopped(UploadQueueStopped::Uninitialized);
2065 : }
2066 : UploadQueue::Stopped(_) => {
2067 : // nothing to do
2068 18 : info!("another concurrent task already shut down the queue");
2069 : }
2070 24 : UploadQueue::Initialized(initialized) => {
2071 24 : info!("shutting down upload queue");
2072 :
2073 : // Replace the queue with the Stopped state, taking ownership of the old
2074 : // Initialized queue. We will do some checks on it, and then drop it.
2075 24 : let qi = {
2076 : // Here we preserve working version of the upload queue for possible use during deletions.
2077 : // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
2078 : // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
2079 : // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
2080 24 : let upload_queue_for_deletion = UploadQueueInitialized {
2081 24 : task_counter: 0,
2082 24 : dirty: initialized.dirty.clone(),
2083 24 : clean: initialized.clean.clone(),
2084 24 : latest_files_changes_since_metadata_upload_scheduled: 0,
2085 24 : visible_remote_consistent_lsn: initialized
2086 24 : .visible_remote_consistent_lsn
2087 24 : .clone(),
2088 24 : num_inprogress_layer_uploads: 0,
2089 24 : num_inprogress_metadata_uploads: 0,
2090 24 : num_inprogress_deletions: 0,
2091 24 : inprogress_tasks: HashMap::default(),
2092 24 : queued_operations: VecDeque::default(),
2093 24 : #[cfg(feature = "testing")]
2094 24 : dangling_files: HashMap::default(),
2095 24 : shutting_down: false,
2096 24 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
2097 24 : };
2098 24 :
2099 24 : let upload_queue = std::mem::replace(
2100 24 : &mut **guard,
2101 24 : UploadQueue::Stopped(UploadQueueStopped::Deletable(
2102 24 : UploadQueueStoppedDeletable {
2103 24 : upload_queue_for_deletion,
2104 24 : deleted_at: SetDeletedFlagProgress::NotRunning,
2105 24 : },
2106 24 : )),
2107 24 : );
2108 24 : if let UploadQueue::Initialized(qi) = upload_queue {
2109 24 : qi
2110 : } else {
2111 0 : unreachable!("we checked in the match above that it is Initialized");
2112 : }
2113 : };
2114 :
2115 : // consistency check
2116 24 : assert_eq!(
2117 24 : qi.num_inprogress_layer_uploads
2118 24 : + qi.num_inprogress_metadata_uploads
2119 24 : + qi.num_inprogress_deletions,
2120 24 : qi.inprogress_tasks.len()
2121 24 : );
2122 :
2123 : // We don't need to do anything here for in-progress tasks. They will finish
2124 : // on their own, decrement the unfinished-task counter themselves, and observe
2125 : // that the queue is Stopped.
2126 24 : drop(qi.inprogress_tasks);
2127 :
2128 : // Tear down queued ops
2129 24 : for op in qi.queued_operations.into_iter() {
2130 18 : self.metric_end(&op);
2131 18 : // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
2132 18 : // which is exactly what we want to happen.
2133 18 : drop(op);
2134 18 : }
2135 : }
2136 : }
2137 42 : }
2138 :
2139 : /// Returns an accessor which will hold the UploadQueue mutex for accessing the upload queue
2140 : /// externally to RemoteTimelineClient.
2141 0 : pub(crate) fn initialized_upload_queue(
2142 0 : &self,
2143 0 : ) -> Result<UploadQueueAccessor<'_>, NotInitialized> {
2144 0 : let mut inner = self.upload_queue.lock().unwrap();
2145 0 : inner.initialized_mut()?;
2146 0 : Ok(UploadQueueAccessor { inner })
2147 0 : }
2148 : }
2149 :
2150 : pub(crate) struct UploadQueueAccessor<'a> {
2151 : inner: std::sync::MutexGuard<'a, UploadQueue>,
2152 : }
2153 :
2154 : impl<'a> UploadQueueAccessor<'a> {
2155 0 : pub(crate) fn latest_uploaded_index_part(&self) -> &IndexPart {
2156 0 : match &*self.inner {
2157 0 : UploadQueue::Initialized(x) => &x.clean.0,
2158 : UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
2159 0 : unreachable!("checked before constructing")
2160 : }
2161 : }
2162 0 : }
2163 : }
2164 :
2165 0 : pub fn remote_tenant_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2166 0 : let path = format!("tenants/{tenant_shard_id}");
2167 0 : RemotePath::from_string(&path).expect("Failed to construct path")
2168 0 : }
2169 :
2170 654 : pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2171 654 : let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
2172 654 : RemotePath::from_string(&path).expect("Failed to construct path")
2173 654 : }
2174 :
2175 0 : fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath {
2176 0 : let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
2177 0 : RemotePath::from_string(&path).expect("Failed to construct path")
2178 0 : }
2179 :
2180 90 : pub fn remote_timeline_path(
2181 90 : tenant_shard_id: &TenantShardId,
2182 90 : timeline_id: &TimelineId,
2183 90 : ) -> RemotePath {
2184 90 : remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
2185 90 : }
2186 :
2187 : /// Obtains the path of the given Layer in the remote
2188 : ///
2189 : /// Note that the shard component of a remote layer path is _not_ always the same
2190 : /// as in the TenantShardId of the caller: tenants may reference layers from a different
2191 : /// ShardIndex. Use the ShardIndex from the layer's metadata.
2192 4149 : pub fn remote_layer_path(
2193 4149 : tenant_id: &TenantId,
2194 4149 : timeline_id: &TimelineId,
2195 4149 : shard: ShardIndex,
2196 4149 : layer_file_name: &LayerName,
2197 4149 : generation: Generation,
2198 4149 : ) -> RemotePath {
2199 4149 : // Generation-aware key format
2200 4149 : let path = format!(
2201 4149 : "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
2202 4149 : shard.get_suffix(),
2203 4149 : layer_file_name,
2204 4149 : generation.get_suffix()
2205 4149 : );
2206 4149 :
2207 4149 : RemotePath::from_string(&path).expect("Failed to construct path")
2208 4149 : }
2209 :
2210 12 : pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
2211 12 : RemotePath::from_string(&format!(
2212 12 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
2213 12 : ))
2214 12 : .expect("Failed to construct path")
2215 12 : }
2216 :
2217 6 : pub fn remote_initdb_preserved_archive_path(
2218 6 : tenant_id: &TenantId,
2219 6 : timeline_id: &TimelineId,
2220 6 : ) -> RemotePath {
2221 6 : RemotePath::from_string(&format!(
2222 6 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PRESERVED_PATH}"
2223 6 : ))
2224 6 : .expect("Failed to construct path")
2225 6 : }
2226 :
2227 4386 : pub fn remote_index_path(
2228 4386 : tenant_shard_id: &TenantShardId,
2229 4386 : timeline_id: &TimelineId,
2230 4386 : generation: Generation,
2231 4386 : ) -> RemotePath {
2232 4386 : RemotePath::from_string(&format!(
2233 4386 : "tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
2234 4386 : IndexPart::FILE_NAME,
2235 4386 : generation.get_suffix()
2236 4386 : ))
2237 4386 : .expect("Failed to construct path")
2238 4386 : }
2239 :
2240 0 : pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2241 0 : RemotePath::from_string(&format!(
2242 0 : "tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
2243 0 : ))
2244 0 : .expect("Failed to construct path")
2245 0 : }
2246 :
2247 : /// Given the key of an index, parse out the generation part of the name
2248 54 : pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
2249 54 : let file_name = match path.get_path().file_name() {
2250 54 : Some(f) => f,
2251 : None => {
2252 : // Unexpected: we should be seeing index_part.json paths only
2253 0 : tracing::warn!("Malformed index key {}", path);
2254 0 : return None;
2255 : }
2256 : };
2257 :
2258 54 : match file_name.split_once('-') {
2259 36 : Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
2260 18 : None => None,
2261 : }
2262 54 : }
2263 :
2264 : #[cfg(test)]
2265 : mod tests {
2266 : use super::*;
2267 : use crate::{
2268 : context::RequestContext,
2269 : tenant::{
2270 : harness::{TenantHarness, TIMELINE_ID},
2271 : storage_layer::layer::local_layer_path,
2272 : Tenant, Timeline,
2273 : },
2274 : DEFAULT_PG_VERSION,
2275 : };
2276 :
2277 : use std::collections::HashSet;
2278 :
2279 24 : pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
2280 24 : format!("contents for {name}").into()
2281 24 : }
2282 :
2283 6 : pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
2284 6 : let metadata = TimelineMetadata::new(
2285 6 : disk_consistent_lsn,
2286 6 : None,
2287 6 : None,
2288 6 : Lsn(0),
2289 6 : Lsn(0),
2290 6 : Lsn(0),
2291 6 : // Any version will do
2292 6 : // but it should be consistent with the one in the tests
2293 6 : crate::DEFAULT_PG_VERSION,
2294 6 : );
2295 6 :
2296 6 : // go through serialize + deserialize to fix the header, including checksum
2297 6 : TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
2298 6 : }
2299 :
2300 6 : fn assert_file_list(a: &HashSet<LayerName>, b: &[&str]) {
2301 18 : let mut avec: Vec<String> = a.iter().map(|x| x.to_string()).collect();
2302 6 : avec.sort();
2303 6 :
2304 6 : let mut bvec = b.to_vec();
2305 6 : bvec.sort_unstable();
2306 6 :
2307 6 : assert_eq!(avec, bvec);
2308 6 : }
2309 :
2310 12 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
2311 12 : let mut expected: Vec<String> = expected
2312 12 : .iter()
2313 48 : .map(|x| format!("{}{}", x, generation.get_suffix()))
2314 12 : .collect();
2315 12 : expected.sort();
2316 12 :
2317 12 : let mut found: Vec<String> = Vec::new();
2318 48 : for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
2319 48 : let entry_name = entry.file_name();
2320 48 : let fname = entry_name.to_str().unwrap();
2321 48 : found.push(String::from(fname));
2322 48 : }
2323 12 : found.sort();
2324 12 :
2325 12 : assert_eq!(found, expected);
2326 12 : }
2327 :
2328 : struct TestSetup {
2329 : harness: TenantHarness,
2330 : tenant: Arc<Tenant>,
2331 : timeline: Arc<Timeline>,
2332 : tenant_ctx: RequestContext,
2333 : }
2334 :
2335 : impl TestSetup {
2336 24 : async fn new(test_name: &str) -> anyhow::Result<Self> {
2337 24 : let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
2338 24 : let harness = TenantHarness::create(test_name).await?;
2339 96 : let (tenant, ctx) = harness.load().await;
2340 :
2341 24 : let timeline = tenant
2342 24 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2343 48 : .await?;
2344 :
2345 24 : Ok(Self {
2346 24 : harness,
2347 24 : tenant,
2348 24 : timeline,
2349 24 : tenant_ctx: ctx,
2350 24 : })
2351 24 : }
2352 :
2353 : /// Construct a RemoteTimelineClient in an arbitrary generation
2354 30 : fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
2355 30 : Arc::new(RemoteTimelineClient {
2356 30 : conf: self.harness.conf,
2357 30 : runtime: tokio::runtime::Handle::current(),
2358 30 : tenant_shard_id: self.harness.tenant_shard_id,
2359 30 : timeline_id: TIMELINE_ID,
2360 30 : generation,
2361 30 : storage_impl: self.harness.remote_storage.clone(),
2362 30 : deletion_queue_client: self.harness.deletion_queue.new_client(),
2363 30 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
2364 30 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
2365 30 : &self.harness.tenant_shard_id,
2366 30 : &TIMELINE_ID,
2367 30 : )),
2368 30 : cancel: CancellationToken::new(),
2369 30 : })
2370 30 : }
2371 :
2372 : /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
2373 : /// and timeline_id are present.
2374 18 : fn span(&self) -> tracing::Span {
2375 18 : tracing::info_span!(
2376 : "test",
2377 : tenant_id = %self.harness.tenant_shard_id.tenant_id,
2378 0 : shard_id = %self.harness.tenant_shard_id.shard_slug(),
2379 : timeline_id = %TIMELINE_ID
2380 : )
2381 18 : }
2382 : }
2383 :
2384 : // Test scheduling
2385 : #[tokio::test]
2386 6 : async fn upload_scheduling() {
2387 6 : // Test outline:
2388 6 : //
2389 6 : // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
2390 6 : // Schedule upload of index. Check that it is queued
2391 6 : // let the layer file uploads finish. Check that the index-upload is now started
2392 6 : // let the index-upload finish.
2393 6 : //
2394 6 : // Download back the index.json. Check that the list of files is correct
2395 6 : //
2396 6 : // Schedule upload. Schedule deletion. Check that the deletion is queued
2397 6 : // let upload finish. Check that deletion is now started
2398 6 : // Schedule another deletion. Check that it's launched immediately.
2399 6 : // Schedule index upload. Check that it's queued
2400 6 :
2401 36 : let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
2402 6 : let span = test_setup.span();
2403 6 : let _guard = span.enter();
2404 6 :
2405 6 : let TestSetup {
2406 6 : harness,
2407 6 : tenant: _tenant,
2408 6 : timeline,
2409 6 : tenant_ctx: _tenant_ctx,
2410 6 : } = test_setup;
2411 6 :
2412 6 : let client = &timeline.remote_client;
2413 6 :
2414 6 : // Download back the index.json, and check that the list of files is correct
2415 6 : let initial_index_part = match client
2416 6 : .download_index_file(&CancellationToken::new())
2417 24 : .await
2418 6 : .unwrap()
2419 6 : {
2420 6 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2421 6 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2422 6 : };
2423 6 : let initial_layers = initial_index_part
2424 6 : .layer_metadata
2425 6 : .keys()
2426 6 : .map(|f| f.to_owned())
2427 6 : .collect::<HashSet<LayerName>>();
2428 6 : let initial_layer = {
2429 6 : assert!(initial_layers.len() == 1);
2430 6 : initial_layers.into_iter().next().unwrap()
2431 6 : };
2432 6 :
2433 6 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
2434 6 :
2435 6 : println!("workdir: {}", harness.conf.workdir);
2436 6 :
2437 6 : let remote_timeline_dir = harness
2438 6 : .remote_fs_dir
2439 6 : .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
2440 6 : println!("remote_timeline_dir: {remote_timeline_dir}");
2441 6 :
2442 6 : let generation = harness.generation;
2443 6 : let shard = harness.shard;
2444 6 :
2445 6 : // Create a couple of dummy files, schedule upload for them
2446 6 :
2447 6 : let layers = [
2448 6 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
2449 6 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
2450 6 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
2451 6 : ]
2452 6 : .into_iter()
2453 18 : .map(|(name, contents): (LayerName, Vec<u8>)| {
2454 18 :
2455 18 : let local_path = local_layer_path(
2456 18 : harness.conf,
2457 18 : &timeline.tenant_shard_id,
2458 18 : &timeline.timeline_id,
2459 18 : &name,
2460 18 : &generation,
2461 18 : );
2462 18 : std::fs::write(&local_path, &contents).unwrap();
2463 18 :
2464 18 : Layer::for_resident(
2465 18 : harness.conf,
2466 18 : &timeline,
2467 18 : local_path,
2468 18 : name,
2469 18 : LayerFileMetadata::new(contents.len() as u64, generation, shard),
2470 18 : )
2471 18 : }).collect::<Vec<_>>();
2472 6 :
2473 6 : client
2474 6 : .schedule_layer_file_upload(layers[0].clone())
2475 6 : .unwrap();
2476 6 : client
2477 6 : .schedule_layer_file_upload(layers[1].clone())
2478 6 : .unwrap();
2479 6 :
2480 6 : // Check that they are started immediately, not queued
2481 6 : //
2482 6 : // this works because we running within block_on, so any futures are now queued up until
2483 6 : // our next await point.
2484 6 : {
2485 6 : let mut guard = client.upload_queue.lock().unwrap();
2486 6 : let upload_queue = guard.initialized_mut().unwrap();
2487 6 : assert!(upload_queue.queued_operations.is_empty());
2488 6 : assert!(upload_queue.inprogress_tasks.len() == 2);
2489 6 : assert!(upload_queue.num_inprogress_layer_uploads == 2);
2490 6 :
2491 6 : // also check that `latest_file_changes` was updated
2492 6 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
2493 6 : }
2494 6 :
2495 6 : // Schedule upload of index. Check that it is queued
2496 6 : let metadata = dummy_metadata(Lsn(0x20));
2497 6 : client
2498 6 : .schedule_index_upload_for_full_metadata_update(&metadata)
2499 6 : .unwrap();
2500 6 : {
2501 6 : let mut guard = client.upload_queue.lock().unwrap();
2502 6 : let upload_queue = guard.initialized_mut().unwrap();
2503 6 : assert!(upload_queue.queued_operations.len() == 1);
2504 6 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
2505 6 : }
2506 6 :
2507 6 : // Wait for the uploads to finish
2508 6 : client.wait_completion().await.unwrap();
2509 6 : {
2510 6 : let mut guard = client.upload_queue.lock().unwrap();
2511 6 : let upload_queue = guard.initialized_mut().unwrap();
2512 6 :
2513 6 : assert!(upload_queue.queued_operations.is_empty());
2514 6 : assert!(upload_queue.inprogress_tasks.is_empty());
2515 6 : }
2516 6 :
2517 6 : // Download back the index.json, and check that the list of files is correct
2518 6 : let index_part = match client
2519 6 : .download_index_file(&CancellationToken::new())
2520 22 : .await
2521 6 : .unwrap()
2522 6 : {
2523 6 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2524 6 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2525 6 : };
2526 6 :
2527 6 : assert_file_list(
2528 6 : &index_part
2529 6 : .layer_metadata
2530 6 : .keys()
2531 18 : .map(|f| f.to_owned())
2532 6 : .collect(),
2533 6 : &[
2534 6 : &initial_layer.to_string(),
2535 6 : &layers[0].layer_desc().layer_name().to_string(),
2536 6 : &layers[1].layer_desc().layer_name().to_string(),
2537 6 : ],
2538 6 : );
2539 6 : assert_eq!(index_part.metadata, metadata);
2540 6 :
2541 6 : // Schedule upload and then a deletion. Check that the deletion is queued
2542 6 : client
2543 6 : .schedule_layer_file_upload(layers[2].clone())
2544 6 : .unwrap();
2545 6 :
2546 6 : // this is no longer consistent with how deletion works with Layer::drop, but in this test
2547 6 : // keep using schedule_layer_file_deletion because we don't have a way to wait for the
2548 6 : // spawn_blocking started by the drop.
2549 6 : client
2550 6 : .schedule_layer_file_deletion(&[layers[0].layer_desc().layer_name()])
2551 6 : .unwrap();
2552 6 : {
2553 6 : let mut guard = client.upload_queue.lock().unwrap();
2554 6 : let upload_queue = guard.initialized_mut().unwrap();
2555 6 :
2556 6 : // Deletion schedules upload of the index file, and the file deletion itself
2557 6 : assert_eq!(upload_queue.queued_operations.len(), 2);
2558 6 : assert_eq!(upload_queue.inprogress_tasks.len(), 1);
2559 6 : assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
2560 6 : assert_eq!(upload_queue.num_inprogress_deletions, 0);
2561 6 : assert_eq!(
2562 6 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
2563 6 : 0
2564 6 : );
2565 6 : }
2566 6 : assert_remote_files(
2567 6 : &[
2568 6 : &initial_layer.to_string(),
2569 6 : &layers[0].layer_desc().layer_name().to_string(),
2570 6 : &layers[1].layer_desc().layer_name().to_string(),
2571 6 : "index_part.json",
2572 6 : ],
2573 6 : &remote_timeline_dir,
2574 6 : generation,
2575 6 : );
2576 6 :
2577 6 : // Finish them
2578 6 : client.wait_completion().await.unwrap();
2579 6 : harness.deletion_queue.pump().await;
2580 6 :
2581 6 : assert_remote_files(
2582 6 : &[
2583 6 : &initial_layer.to_string(),
2584 6 : &layers[1].layer_desc().layer_name().to_string(),
2585 6 : &layers[2].layer_desc().layer_name().to_string(),
2586 6 : "index_part.json",
2587 6 : ],
2588 6 : &remote_timeline_dir,
2589 6 : generation,
2590 6 : );
2591 6 : }
2592 :
2593 : #[tokio::test]
2594 6 : async fn bytes_unfinished_gauge_for_layer_file_uploads() {
2595 6 : // Setup
2596 6 :
2597 6 : let TestSetup {
2598 6 : harness,
2599 6 : tenant: _tenant,
2600 6 : timeline,
2601 6 : ..
2602 36 : } = TestSetup::new("metrics").await.unwrap();
2603 6 : let client = &timeline.remote_client;
2604 6 :
2605 6 : let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
2606 6 : let local_path = local_layer_path(
2607 6 : harness.conf,
2608 6 : &timeline.tenant_shard_id,
2609 6 : &timeline.timeline_id,
2610 6 : &layer_file_name_1,
2611 6 : &harness.generation,
2612 6 : );
2613 6 : let content_1 = dummy_contents("foo");
2614 6 : std::fs::write(&local_path, &content_1).unwrap();
2615 6 :
2616 6 : let layer_file_1 = Layer::for_resident(
2617 6 : harness.conf,
2618 6 : &timeline,
2619 6 : local_path,
2620 6 : layer_file_name_1.clone(),
2621 6 : LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
2622 6 : );
2623 6 :
2624 6 : #[derive(Debug, PartialEq, Clone, Copy)]
2625 6 : struct BytesStartedFinished {
2626 6 : started: Option<usize>,
2627 6 : finished: Option<usize>,
2628 6 : }
2629 6 : impl std::ops::Add for BytesStartedFinished {
2630 6 : type Output = Self;
2631 12 : fn add(self, rhs: Self) -> Self::Output {
2632 12 : Self {
2633 12 : started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
2634 12 : finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
2635 12 : }
2636 12 : }
2637 6 : }
2638 18 : let get_bytes_started_stopped = || {
2639 18 : let started = client
2640 18 : .metrics
2641 18 : .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2642 18 : .map(|v| v.try_into().unwrap());
2643 18 : let stopped = client
2644 18 : .metrics
2645 18 : .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2646 18 : .map(|v| v.try_into().unwrap());
2647 18 : BytesStartedFinished {
2648 18 : started,
2649 18 : finished: stopped,
2650 18 : }
2651 18 : };
2652 6 :
2653 6 : // Test
2654 6 : tracing::info!("now doing actual test");
2655 6 :
2656 6 : let actual_a = get_bytes_started_stopped();
2657 6 :
2658 6 : client
2659 6 : .schedule_layer_file_upload(layer_file_1.clone())
2660 6 : .unwrap();
2661 6 :
2662 6 : let actual_b = get_bytes_started_stopped();
2663 6 :
2664 6 : client.wait_completion().await.unwrap();
2665 6 :
2666 6 : let actual_c = get_bytes_started_stopped();
2667 6 :
2668 6 : // Validate
2669 6 :
2670 6 : let expected_b = actual_a
2671 6 : + BytesStartedFinished {
2672 6 : started: Some(content_1.len()),
2673 6 : // assert that the _finished metric is created eagerly so that subtractions work on first sample
2674 6 : finished: Some(0),
2675 6 : };
2676 6 : assert_eq!(actual_b, expected_b);
2677 6 :
2678 6 : let expected_c = actual_a
2679 6 : + BytesStartedFinished {
2680 6 : started: Some(content_1.len()),
2681 6 : finished: Some(content_1.len()),
2682 6 : };
2683 6 : assert_eq!(actual_c, expected_c);
2684 6 : }
2685 :
2686 36 : async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
2687 36 : // An empty IndexPart, just sufficient to ensure deserialization will succeed
2688 36 : let example_index_part = IndexPart::example();
2689 36 :
2690 36 : let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
2691 36 :
2692 36 : let index_path = test_state.harness.remote_fs_dir.join(
2693 36 : remote_index_path(
2694 36 : &test_state.harness.tenant_shard_id,
2695 36 : &TIMELINE_ID,
2696 36 : generation,
2697 36 : )
2698 36 : .get_path(),
2699 36 : );
2700 36 :
2701 36 : std::fs::create_dir_all(index_path.parent().unwrap())
2702 36 : .expect("creating test dir should work");
2703 36 :
2704 36 : eprintln!("Writing {index_path}");
2705 36 : std::fs::write(&index_path, index_part_bytes).unwrap();
2706 36 : example_index_part
2707 36 : }
2708 :
2709 : /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
2710 : /// index, the IndexPart returned is equal to `expected`
2711 30 : async fn assert_got_index_part(
2712 30 : test_state: &TestSetup,
2713 30 : get_generation: Generation,
2714 30 : expected: &IndexPart,
2715 30 : ) {
2716 30 : let client = test_state.build_client(get_generation);
2717 :
2718 30 : let download_r = client
2719 30 : .download_index_file(&CancellationToken::new())
2720 179 : .await
2721 30 : .expect("download should always succeed");
2722 30 : assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
2723 30 : match download_r {
2724 30 : MaybeDeletedIndexPart::IndexPart(index_part) => {
2725 30 : assert_eq!(&index_part, expected);
2726 : }
2727 0 : MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
2728 : }
2729 30 : }
2730 :
2731 : #[tokio::test]
2732 6 : async fn index_part_download_simple() -> anyhow::Result<()> {
2733 36 : let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
2734 6 : let span = test_state.span();
2735 6 : let _guard = span.enter();
2736 6 :
2737 6 : // Simple case: we are in generation N, load the index from generation N - 1
2738 6 : let generation_n = 5;
2739 6 : let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
2740 6 :
2741 30 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
2742 6 :
2743 6 : Ok(())
2744 6 : }
2745 :
2746 : #[tokio::test]
2747 6 : async fn index_part_download_ordering() -> anyhow::Result<()> {
2748 6 : let test_state = TestSetup::new("index_part_download_ordering")
2749 36 : .await
2750 6 : .unwrap();
2751 6 :
2752 6 : let span = test_state.span();
2753 6 : let _guard = span.enter();
2754 6 :
2755 6 : // A generation-less IndexPart exists in the bucket, we should find it
2756 6 : let generation_n = 5;
2757 6 : let injected_none = inject_index_part(&test_state, Generation::none()).await;
2758 42 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
2759 6 :
2760 6 : // If a more recent-than-none generation exists, we should prefer to load that
2761 6 : let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
2762 42 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
2763 6 :
2764 6 : // If a more-recent-than-me generation exists, we should ignore it.
2765 6 : let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
2766 42 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
2767 6 :
2768 6 : // If a directly previous generation exists, _and_ an index exists in my own
2769 6 : // generation, I should prefer my own generation.
2770 6 : let _injected_prev =
2771 6 : inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
2772 6 : let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
2773 6 : assert_got_index_part(
2774 6 : &test_state,
2775 6 : Generation::new(generation_n),
2776 6 : &injected_current,
2777 6 : )
2778 23 : .await;
2779 6 :
2780 6 : Ok(())
2781 6 : }
2782 : }
|