Line data Source code
1 : //! This module manages synchronizing local FS with remote storage.
2 : //!
3 : //! # Overview
4 : //!
5 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
6 : //! It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
7 : //! when it's safe to do so.
8 : //!
9 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
10 : //!
11 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
12 : //!
13 : //! # APIs & How To Use Them
14 : //!
15 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
16 : //! unless the pageserver is configured without remote storage.
17 : //!
18 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
19 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
20 : //! timeline.
21 : //! However, the client does not become ready for use until we've initialized its upload queue:
22 : //!
23 : //! - For timelines that already have some state on the remote storage, we use
24 : //! [`RemoteTimelineClient::init_upload_queue`] .
25 : //! - For newly created timelines, we use
26 : //! [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
27 : //!
28 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
29 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
30 : //!
31 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
32 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
33 : //!
34 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`] when we've created a new layer file.
35 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
36 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
37 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
38 : //!
39 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
40 : //!
41 : //! There are also APIs for downloading files.
42 : //! These are not part of the aforementioned queuing and will not be discussed
43 : //! further here, except in the section covering tenant attach.
44 : //!
45 : //! # Remote Storage Structure & [`IndexPart`] Index File
46 : //!
47 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
48 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
49 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
50 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
51 : //! files for a given timeline.
52 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
53 : //!
54 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
55 : //!
56 : //! # Consistency
57 : //!
58 : //! To have a consistent remote structure, it's important that uploads and
59 : //! deletions are performed in the right order. For example, the index file
60 : //! contains a list of layer files, so it must not be uploaded until all the
61 : //! layer files that are in its list have been successfully uploaded.
62 : //!
63 : //! The contract between client and its user is that the user is responsible of
64 : //! scheduling operations in an order that keeps the remote consistent as
65 : //! described above.
66 : //!
67 : //! From the user's perspective, the operations are executed sequentially.
68 : //! Internally, the client knows which operations can be performed in parallel,
69 : //! and which operations act like a "barrier" that require preceding operations
70 : //! to finish. The calling code just needs to call the schedule-functions in the
71 : //! correct order, and the client will parallelize the operations in a way that
72 : //! is safe. For more details, see `UploadOp::can_bypass`.
73 : //!
74 : //! All of this relies on the following invariants:
75 : //!
76 : //! - We rely on read-after write consistency in the remote storage.
77 : //! - Layer files are immutable.
78 : //!
79 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
80 : //! storage. Different tenants can be attached to different pageservers, but if the
81 : //! same tenant is attached to two pageservers at the same time, they will overwrite
82 : //! each other's index file updates, and confusion will ensue. There's no interlock or
83 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
84 : //! that that doesn't happen.
85 : //!
86 : //! ## Implementation Note
87 : //!
88 : //! The *actual* remote state lags behind the *desired* remote state while
89 : //! there are in-flight operations.
90 : //! We keep track of the desired remote state in [`UploadQueueInitialized::dirty`].
91 : //! It is initialized based on the [`IndexPart`] that was passed during init
92 : //! and updated with every `schedule_*` function call.
93 : //! All this is necessary necessary to compute the future [`IndexPart`]s
94 : //! when scheduling an operation while other operations that also affect the
95 : //! remote [`IndexPart`] are in flight.
96 : //!
97 : //! # Retries & Error Handling
98 : //!
99 : //! The client retries operations indefinitely, using exponential back-off.
100 : //! There is no way to force a retry, i.e., interrupt the back-off.
101 : //! This could be built easily.
102 : //!
103 : //! # Cancellation
104 : //!
105 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
106 : //! the client's tenant and timeline.
107 : //! Dropping the client will drop queued operations but not executing operations.
108 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
109 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
110 : //!
111 : //! # Completion
112 : //!
113 : //! Once an operation has completed, we update [`UploadQueueInitialized::clean`] immediately,
114 : //! and submit a request through the DeletionQueue to update
115 : //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
116 : //! validated that our generation is not stale. It is this visible value
117 : //! that is advertized to safekeepers as a signal that that they can
118 : //! delete the WAL up to that LSN.
119 : //!
120 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
121 : //! for all pending operations to complete. It does not prevent more
122 : //! operations from getting scheduled.
123 : //!
124 : //! # Crash Consistency
125 : //!
126 : //! We do not persist the upload queue state.
127 : //! If we drop the client, or crash, all unfinished operations are lost.
128 : //!
129 : //! To recover, the following steps need to be taken:
130 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
131 : //! consistent remote state, assuming the user scheduled the operations in
132 : //! the correct order.
133 : //! - Initiate upload queue with that [`IndexPart`].
134 : //! - Reschedule all lost operations by comparing the local filesystem state
135 : //! and remote state as per [`IndexPart`]. This is done in
136 : //! [`TenantShard::timeline_init_and_sync`].
137 : //!
138 : //! Note that if we crash during file deletion between the index update
139 : //! that removes the file from the list of files, and deleting the remote file,
140 : //! the file is leaked in the remote storage. Similarly, if a new file is created
141 : //! and uploaded, but the pageserver dies permanently before updating the
142 : //! remote index file, the new file is leaked in remote storage. We accept and
143 : //! tolerate that for now.
144 : //! Note further that we cannot easily fix this by scheduling deletes for every
145 : //! file that is present only on the remote, because we cannot distinguish the
146 : //! following two cases:
147 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
148 : //! but crashed before it finished remotely.
149 : //! - (2) We never had the file locally because we haven't on-demand downloaded
150 : //! it yet.
151 : //!
152 : //! # Downloads
153 : //!
154 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
155 : //! downloading files from the remote storage. Downloads are performed immediately
156 : //! against the `RemoteStorage`, independently of the upload queue.
157 : //!
158 : //! When we attach a tenant, we perform the following steps:
159 : //! - create `Tenant` object in `TenantState::Attaching` state
160 : //! - List timelines that are present in remote storage, and for each:
161 : //! - download their remote [`IndexPart`]s
162 : //! - create `Timeline` struct and a `RemoteTimelineClient`
163 : //! - initialize the client's upload queue with its `IndexPart`
164 : //! - schedule uploads for layers that are only present locally.
165 : //! - After the above is done for each timeline, open the tenant for business by
166 : //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
167 : //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
168 : //!
169 : //! # Operating Without Remote Storage
170 : //!
171 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
172 : //! not created and the uploads are skipped.
173 : //!
174 : //! [`TenantShard::timeline_init_and_sync`]: super::TenantShard::timeline_init_and_sync
175 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
176 :
177 : pub(crate) mod download;
178 : pub mod index;
179 : pub mod manifest;
180 : pub(crate) mod upload;
181 :
182 : use std::collections::{HashMap, HashSet, VecDeque};
183 : use std::ops::DerefMut;
184 : use std::sync::atomic::{AtomicU32, Ordering};
185 : use std::sync::{Arc, Mutex, OnceLock};
186 : use std::time::Duration;
187 :
188 : use anyhow::Context;
189 : use camino::Utf8Path;
190 : use chrono::{NaiveDateTime, Utc};
191 : pub(crate) use download::{
192 : download_index_part, download_initdb_tar_zst, download_tenant_manifest, is_temp_download_file,
193 : list_remote_tenant_shards, list_remote_timelines,
194 : };
195 : use index::GcCompactionState;
196 : pub(crate) use index::LayerFileMetadata;
197 : use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState};
198 : use pageserver_api::shard::{ShardIndex, TenantShardId};
199 : use regex::Regex;
200 : use remote_storage::{
201 : DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
202 : };
203 : use scopeguard::ScopeGuard;
204 : use tokio_util::sync::CancellationToken;
205 : use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
206 : pub(crate) use upload::upload_initdb_dir;
207 : use utils::backoff::{
208 : self, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
209 : };
210 : use utils::id::{TenantId, TimelineId};
211 : use utils::lsn::Lsn;
212 : use utils::pausable_failpoint;
213 : use utils::shard::ShardNumber;
214 :
215 : use self::index::IndexPart;
216 : use super::config::AttachedLocationConfig;
217 : use super::metadata::MetadataUpdate;
218 : use super::storage_layer::{Layer, LayerName, ResidentLayer};
219 : use super::timeline::import_pgdata;
220 : use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
221 : use super::{DeleteTimelineError, Generation};
222 : use crate::config::PageServerConf;
223 : use crate::context::RequestContext;
224 : use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
225 : use crate::metrics::{
226 : MeasureRemoteOp, REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
227 : RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
228 : RemoteTimelineClientMetricsCallTrackSize,
229 : };
230 : use crate::task_mgr::{BACKGROUND_RUNTIME, TaskKind, shutdown_token};
231 : use crate::tenant::metadata::TimelineMetadata;
232 : use crate::tenant::remote_timeline_client::download::download_retry;
233 : use crate::tenant::storage_layer::AsLayerDesc;
234 : use crate::tenant::upload_queue::{
235 : Delete, OpType, UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped,
236 : UploadQueueStoppedDeletable, UploadTask,
237 : };
238 : use crate::tenant::{TIMELINES_SEGMENT_NAME, debug_assert_current_span_has_tenant_and_timeline_id};
239 : use crate::{TENANT_HEATMAP_BASENAME, task_mgr};
240 :
241 : // Occasional network issues and such can cause remote operations to fail, and
242 : // that's expected. If a download fails, we log it at info-level, and retry.
243 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
244 : // level instead, as repeated failures can mean a more serious problem. If it
245 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
246 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
247 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
248 :
249 : // Similarly log failed uploads and deletions at WARN level, after this many
250 : // retries. Uploads and deletions are retried forever, though.
251 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
252 :
253 : pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
254 :
255 : pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst";
256 :
257 : /// Default buffer size when interfacing with [`tokio::fs::File`].
258 : pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
259 :
260 : /// Doing non-essential flushes of deletion queue is subject to this timeout, after
261 : /// which we warn and skip.
262 : const DELETION_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_secs(10);
263 :
264 : pub enum MaybeDeletedIndexPart {
265 : IndexPart(IndexPart),
266 : Deleted(IndexPart),
267 : }
268 :
269 : #[derive(Debug, thiserror::Error)]
270 : pub enum PersistIndexPartWithDeletedFlagError {
271 : #[error("another task is already setting the deleted_flag, started at {0:?}")]
272 : AlreadyInProgress(NaiveDateTime),
273 : #[error("the deleted_flag was already set, value is {0:?}")]
274 : AlreadyDeleted(NaiveDateTime),
275 : #[error(transparent)]
276 : Other(#[from] anyhow::Error),
277 : }
278 :
279 : #[derive(Debug, thiserror::Error)]
280 : pub enum WaitCompletionError {
281 : #[error(transparent)]
282 : NotInitialized(NotInitialized),
283 : #[error("wait_completion aborted because upload queue was stopped")]
284 : UploadQueueShutDownOrStopped,
285 : }
286 :
287 : #[derive(Debug, thiserror::Error)]
288 : #[error("Upload queue either in unexpected state or hasn't downloaded manifest yet")]
289 : pub struct UploadQueueNotReadyError;
290 :
291 : #[derive(Debug, thiserror::Error)]
292 : pub enum ShutdownIfArchivedError {
293 : #[error(transparent)]
294 : NotInitialized(NotInitialized),
295 : #[error("timeline is not archived")]
296 : NotArchived,
297 : }
298 :
299 : /// Behavioral modes that enable seamless live migration.
300 : ///
301 : /// See docs/rfcs/028-pageserver-migration.md to understand how these fit in.
302 : struct RemoteTimelineClientConfig {
303 : /// If this is false, then update to remote_consistent_lsn are dropped rather
304 : /// than being submitted to DeletionQueue for validation. This behavior is
305 : /// used when a tenant attachment is known to have a stale generation number,
306 : /// such that validation attempts will always fail. This is not necessary
307 : /// for correctness, but avoids spamming error statistics with failed validations
308 : /// when doing migrations of tenants.
309 : process_remote_consistent_lsn_updates: bool,
310 :
311 : /// If this is true, then object deletions are held in a buffer in RemoteTimelineClient
312 : /// rather than being submitted to the DeletionQueue. This behavior is used when a tenant
313 : /// is known to be multi-attached, in order to avoid disrupting other attached tenants
314 : /// whose generations' metadata refers to the deleted objects.
315 : block_deletions: bool,
316 : }
317 :
318 : /// RemoteTimelineClientConfig's state is entirely driven by LocationConf, but we do
319 : /// not carry the entire LocationConf structure: it's much more than we need. The From
320 : /// impl extracts the subset of the LocationConf that is interesting to RemoteTimelineClient.
321 : impl From<&AttachedLocationConfig> for RemoteTimelineClientConfig {
322 240 : fn from(lc: &AttachedLocationConfig) -> Self {
323 240 : Self {
324 240 : block_deletions: !lc.may_delete_layers_hint(),
325 240 : process_remote_consistent_lsn_updates: lc.may_upload_layers_hint(),
326 240 : }
327 240 : }
328 : }
329 :
330 : /// A client for accessing a timeline's data in remote storage.
331 : ///
332 : /// This takes care of managing the number of connections, and balancing them
333 : /// across tenants. This also handles retries of failed uploads.
334 : ///
335 : /// Upload and delete requests are ordered so that before a deletion is
336 : /// performed, we wait for all preceding uploads to finish. This ensures sure
337 : /// that if you perform a compaction operation that reshuffles data in layer
338 : /// files, we don't have a transient state where the old files have already been
339 : /// deleted, but new files have not yet been uploaded.
340 : ///
341 : /// Similarly, this enforces an order between index-file uploads, and layer
342 : /// uploads. Before an index-file upload is performed, all preceding layer
343 : /// uploads must be finished.
344 : ///
345 : /// This also maintains a list of remote files, and automatically includes that
346 : /// in the index part file, whenever timeline metadata is uploaded.
347 : ///
348 : /// Downloads are not queued, they are performed immediately.
349 : pub(crate) struct RemoteTimelineClient {
350 : conf: &'static PageServerConf,
351 :
352 : runtime: tokio::runtime::Handle,
353 :
354 : tenant_shard_id: TenantShardId,
355 : timeline_id: TimelineId,
356 : generation: Generation,
357 :
358 : upload_queue: Mutex<UploadQueue>,
359 :
360 : pub(crate) metrics: Arc<RemoteTimelineClientMetrics>,
361 :
362 : storage_impl: GenericRemoteStorage,
363 :
364 : deletion_queue_client: DeletionQueueClient,
365 :
366 : /// Subset of tenant configuration used to control upload behaviors during migrations
367 : config: std::sync::RwLock<RemoteTimelineClientConfig>,
368 :
369 : cancel: CancellationToken,
370 : }
371 :
372 : impl Drop for RemoteTimelineClient {
373 10 : fn drop(&mut self) {
374 10 : debug!("dropping RemoteTimelineClient");
375 10 : }
376 : }
377 :
378 : impl RemoteTimelineClient {
379 : ///
380 : /// Create a remote storage client for given timeline
381 : ///
382 : /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
383 : /// by calling init_upload_queue.
384 : ///
385 235 : pub(crate) fn new(
386 235 : remote_storage: GenericRemoteStorage,
387 235 : deletion_queue_client: DeletionQueueClient,
388 235 : conf: &'static PageServerConf,
389 235 : tenant_shard_id: TenantShardId,
390 235 : timeline_id: TimelineId,
391 235 : generation: Generation,
392 235 : location_conf: &AttachedLocationConfig,
393 235 : ) -> RemoteTimelineClient {
394 : RemoteTimelineClient {
395 235 : conf,
396 235 : runtime: if cfg!(test) {
397 : // remote_timeline_client.rs tests rely on current-thread runtime
398 235 : tokio::runtime::Handle::current()
399 : } else {
400 0 : BACKGROUND_RUNTIME.handle().clone()
401 : },
402 235 : tenant_shard_id,
403 235 : timeline_id,
404 235 : generation,
405 235 : storage_impl: remote_storage,
406 235 : deletion_queue_client,
407 235 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
408 235 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
409 235 : &tenant_shard_id,
410 235 : &timeline_id,
411 : )),
412 235 : config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
413 235 : cancel: CancellationToken::new(),
414 : }
415 235 : }
416 :
417 : /// Initialize the upload queue for a remote storage that already received
418 : /// an index file upload, i.e., it's not empty.
419 : /// The given `index_part` must be the one on the remote.
420 3 : pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
421 : // Set the maximum number of inprogress tasks to the remote storage concurrency. There's
422 : // certainly no point in starting more upload tasks than this.
423 3 : let inprogress_limit = self
424 3 : .conf
425 3 : .remote_storage_config
426 3 : .as_ref()
427 3 : .map_or(0, |r| r.concurrency_limit());
428 3 : let mut upload_queue = self.upload_queue.lock().unwrap();
429 3 : upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
430 3 : self.update_remote_physical_size_gauge(Some(index_part));
431 3 : info!(
432 0 : "initialized upload queue from remote index with {} layer files",
433 0 : index_part.layer_metadata.len()
434 : );
435 3 : Ok(())
436 3 : }
437 :
438 : /// Initialize the upload queue for the case where the remote storage is empty,
439 : /// i.e., it doesn't have an `IndexPart`.
440 : ///
441 : /// `rel_size_v2_status` needs to be carried over during branching, and that's why
442 : /// it's passed in here.
443 232 : pub fn init_upload_queue_for_empty_remote(
444 232 : &self,
445 232 : local_metadata: &TimelineMetadata,
446 232 : rel_size_v2_migration: Option<RelSizeMigration>,
447 232 : rel_size_migrated_at: Option<Lsn>,
448 232 : ) -> anyhow::Result<()> {
449 : // Set the maximum number of inprogress tasks to the remote storage concurrency. There's
450 : // certainly no point in starting more upload tasks than this.
451 232 : let inprogress_limit = self
452 232 : .conf
453 232 : .remote_storage_config
454 232 : .as_ref()
455 232 : .map_or(0, |r| r.concurrency_limit());
456 232 : let mut upload_queue = self.upload_queue.lock().unwrap();
457 232 : let initialized_queue =
458 232 : upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
459 232 : initialized_queue.dirty.rel_size_migration = rel_size_v2_migration;
460 232 : initialized_queue.dirty.rel_size_migrated_at = rel_size_migrated_at;
461 232 : self.update_remote_physical_size_gauge(None);
462 232 : info!("initialized upload queue as empty");
463 232 : Ok(())
464 232 : }
465 :
466 : /// Initialize the queue in stopped state. Used in startup path
467 : /// to continue deletion operation interrupted by pageserver crash or restart.
468 0 : pub fn init_upload_queue_stopped_to_continue_deletion(
469 0 : &self,
470 0 : index_part: &IndexPart,
471 0 : ) -> anyhow::Result<()> {
472 : // FIXME: consider newtype for DeletedIndexPart.
473 0 : let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
474 0 : "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
475 0 : ))?;
476 0 : let inprogress_limit = self
477 0 : .conf
478 0 : .remote_storage_config
479 0 : .as_ref()
480 0 : .map_or(0, |r| r.concurrency_limit());
481 :
482 0 : let mut upload_queue = self.upload_queue.lock().unwrap();
483 0 : upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
484 0 : self.update_remote_physical_size_gauge(Some(index_part));
485 0 : self.stop_impl(&mut upload_queue);
486 :
487 0 : upload_queue
488 0 : .stopped_mut()
489 0 : .expect("stopped above")
490 0 : .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
491 :
492 0 : Ok(())
493 0 : }
494 :
495 : /// Notify this client of a change to its parent tenant's config, as this may cause us to
496 : /// take action (unblocking deletions when transitioning from AttachedMulti to AttachedSingle)
497 0 : pub(super) fn update_config(&self, location_conf: &AttachedLocationConfig) {
498 0 : let new_conf = RemoteTimelineClientConfig::from(location_conf);
499 0 : let unblocked = !new_conf.block_deletions;
500 :
501 : // Update config before draining deletions, so that we don't race with more being
502 : // inserted. This can result in deletions happening our of order, but that does not
503 : // violate any invariants: deletions only need to be ordered relative to upload of the index
504 : // that dereferences the deleted objects, and we are not changing that order.
505 0 : *self.config.write().unwrap() = new_conf;
506 :
507 0 : if unblocked {
508 : // If we may now delete layers, drain any that were blocked in our old
509 : // configuration state
510 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
511 :
512 0 : if let Ok(queue) = queue_locked.initialized_mut() {
513 0 : let blocked_deletions = std::mem::take(&mut queue.blocked_deletions);
514 0 : for d in blocked_deletions {
515 0 : if let Err(e) = self.deletion_queue_client.push_layers(
516 0 : self.tenant_shard_id,
517 0 : self.timeline_id,
518 0 : self.generation,
519 0 : d.layers,
520 0 : ) {
521 : // This could happen if the pageserver is shut down while a tenant
522 : // is transitioning from a deletion-blocked state: we will leak some
523 : // S3 objects in this case.
524 0 : warn!("Failed to drain blocked deletions: {}", e);
525 0 : break;
526 0 : }
527 : }
528 0 : }
529 0 : }
530 0 : }
531 :
532 : /// Returns `None` if nothing is yet uplodaded, `Some(disk_consistent_lsn)` otherwise.
533 2 : pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
534 2 : match &mut *self.upload_queue.lock().unwrap() {
535 0 : UploadQueue::Uninitialized => None,
536 2 : UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
537 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
538 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => q
539 0 : .upload_queue_for_deletion
540 0 : .get_last_remote_consistent_lsn_projected(),
541 : }
542 2 : }
543 :
544 0 : pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
545 0 : match &mut *self.upload_queue.lock().unwrap() {
546 0 : UploadQueue::Uninitialized => None,
547 0 : UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
548 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
549 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => Some(
550 0 : q.upload_queue_for_deletion
551 0 : .get_last_remote_consistent_lsn_visible(),
552 0 : ),
553 : }
554 0 : }
555 :
556 : /// Returns true if this timeline was previously detached at this Lsn and the remote timeline
557 : /// client is currently initialized.
558 0 : pub(crate) fn is_previous_ancestor_lsn(&self, lsn: Lsn) -> bool {
559 0 : self.upload_queue
560 0 : .lock()
561 0 : .unwrap()
562 0 : .initialized_mut()
563 0 : .map(|uq| uq.clean.0.lineage.is_previous_ancestor_lsn(lsn))
564 0 : .unwrap_or(false)
565 0 : }
566 :
567 : /// Returns whether the timeline is archived.
568 : /// Return None if the remote index_part hasn't been downloaded yet.
569 8 : pub(crate) fn is_archived(&self) -> Option<bool> {
570 8 : self.upload_queue
571 8 : .lock()
572 8 : .unwrap()
573 8 : .initialized_mut()
574 8 : .map(|q| q.clean.0.archived_at.is_some())
575 8 : .ok()
576 8 : }
577 :
578 : /// Returns true if the timeline is invisible in synthetic size calculations.
579 8 : pub(crate) fn is_invisible(&self) -> Option<bool> {
580 8 : self.upload_queue
581 8 : .lock()
582 8 : .unwrap()
583 8 : .initialized_mut()
584 8 : .map(|q| q.clean.0.marked_invisible_at.is_some())
585 8 : .ok()
586 8 : }
587 :
588 : /// Returns `Ok(Some(timestamp))` if the timeline has been archived, `Ok(None)` if the timeline hasn't been archived.
589 : ///
590 : /// Return Err(_) if the remote index_part hasn't been downloaded yet, or the timeline hasn't been stopped yet.
591 1 : pub(crate) fn archived_at_stopped_queue(
592 1 : &self,
593 1 : ) -> Result<Option<NaiveDateTime>, UploadQueueNotReadyError> {
594 1 : self.upload_queue
595 1 : .lock()
596 1 : .unwrap()
597 1 : .stopped_mut()
598 1 : .map(|q| q.upload_queue_for_deletion.clean.0.archived_at)
599 1 : .map_err(|_| UploadQueueNotReadyError)
600 1 : }
601 :
602 1004 : fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
603 1004 : let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
604 772 : current_remote_index_part
605 772 : .layer_metadata
606 772 : .values()
607 772 : .map(|ilmd| ilmd.file_size)
608 772 : .sum()
609 : } else {
610 232 : 0
611 : };
612 1004 : self.metrics.remote_physical_size_gauge.set(size);
613 1004 : }
614 :
615 1 : pub fn get_remote_physical_size(&self) -> u64 {
616 1 : self.metrics.remote_physical_size_gauge.get()
617 1 : }
618 :
619 : //
620 : // Download operations.
621 : //
622 : // These don't use the per-timeline queue. They do use the global semaphore in
623 : // S3Bucket, to limit the total number of concurrent operations, though.
624 : //
625 :
626 : /// Download index file
627 10 : pub async fn download_index_file(
628 10 : &self,
629 10 : cancel: &CancellationToken,
630 10 : ) -> Result<MaybeDeletedIndexPart, DownloadError> {
631 10 : let _unfinished_gauge_guard = self.metrics.call_begin(
632 10 : &RemoteOpFileKind::Index,
633 10 : &RemoteOpKind::Download,
634 10 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
635 10 : reason: "no need for a downloads gauge",
636 10 : },
637 10 : );
638 :
639 10 : let (index_part, index_generation, index_last_modified) = download::download_index_part(
640 10 : &self.storage_impl,
641 10 : &self.tenant_shard_id,
642 10 : &self.timeline_id,
643 10 : self.generation,
644 10 : cancel,
645 10 : )
646 10 : .measure_remote_op(
647 10 : Option::<TaskKind>::None,
648 10 : RemoteOpFileKind::Index,
649 10 : RemoteOpKind::Download,
650 10 : Arc::clone(&self.metrics),
651 10 : )
652 10 : .await?;
653 :
654 : // Defense in depth: monotonicity of generation numbers is an important correctness guarantee, so when we see a very
655 : // old index, we do extra checks in case this is the result of backward time-travel of the generation number (e.g.
656 : // in case of a bug in the service that issues generation numbers). Indices are allowed to be old, but we expect that
657 : // when we load an old index we are loading the _latest_ index: if we are asked to load an old index and there is
658 : // also a newer index available, that is surprising.
659 : const INDEX_AGE_CHECKS_THRESHOLD: Duration = Duration::from_secs(14 * 24 * 3600);
660 10 : let index_age = index_last_modified.elapsed().unwrap_or_else(|e| {
661 0 : if e.duration() > Duration::from_secs(5) {
662 : // We only warn if the S3 clock and our local clock are >5s out: because this is a low resolution
663 : // timestamp, it is common to be out by at least 1 second.
664 0 : tracing::warn!("Index has modification time in the future: {e}");
665 0 : }
666 0 : Duration::ZERO
667 0 : });
668 10 : if index_age > INDEX_AGE_CHECKS_THRESHOLD {
669 0 : tracing::info!(
670 : ?index_generation,
671 0 : age = index_age.as_secs_f64(),
672 0 : "Loaded an old index, checking for other indices..."
673 : );
674 :
675 : // Find the highest-generation index
676 0 : let (_latest_index_part, latest_index_generation, latest_index_mtime) =
677 0 : download::download_index_part(
678 0 : &self.storage_impl,
679 0 : &self.tenant_shard_id,
680 0 : &self.timeline_id,
681 0 : Generation::MAX,
682 0 : cancel,
683 0 : )
684 0 : .await?;
685 :
686 0 : if latest_index_generation > index_generation {
687 : // Unexpected! Why are we loading such an old index if a more recent one exists?
688 : // We will refuse to proceed, as there is no reasonable scenario where this should happen, but
689 : // there _is_ a clear bug/corruption scenario where it would happen (controller sets the generation
690 : // backwards).
691 0 : tracing::error!(
692 : ?index_generation,
693 : ?latest_index_generation,
694 : ?latest_index_mtime,
695 0 : "Found a newer index while loading an old one"
696 : );
697 0 : return Err(DownloadError::Fatal(
698 0 : "Index age exceeds threshold and a newer index exists".into(),
699 0 : ));
700 0 : }
701 10 : }
702 :
703 10 : if index_part.deleted_at.is_some() {
704 0 : Ok(MaybeDeletedIndexPart::Deleted(index_part))
705 : } else {
706 10 : Ok(MaybeDeletedIndexPart::IndexPart(index_part))
707 : }
708 10 : }
709 :
710 : /// Download a (layer) file from `path`, into local filesystem.
711 : ///
712 : /// 'layer_metadata' is the metadata from the remote index file.
713 : ///
714 : /// On success, returns the size of the downloaded file.
715 7 : pub async fn download_layer_file(
716 7 : &self,
717 7 : layer_file_name: &LayerName,
718 7 : layer_metadata: &LayerFileMetadata,
719 7 : local_path: &Utf8Path,
720 7 : gate: &utils::sync::gate::Gate,
721 7 : cancel: &CancellationToken,
722 7 : ctx: &RequestContext,
723 7 : ) -> Result<u64, DownloadError> {
724 7 : let downloaded_size = {
725 7 : let _unfinished_gauge_guard = self.metrics.call_begin(
726 7 : &RemoteOpFileKind::Layer,
727 7 : &RemoteOpKind::Download,
728 7 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
729 7 : reason: "no need for a downloads gauge",
730 7 : },
731 7 : );
732 7 : download::download_layer_file(
733 7 : self.conf,
734 7 : &self.storage_impl,
735 7 : self.tenant_shard_id,
736 7 : self.timeline_id,
737 7 : layer_file_name,
738 7 : layer_metadata,
739 7 : local_path,
740 7 : gate,
741 7 : cancel,
742 7 : ctx,
743 7 : )
744 7 : .measure_remote_op(
745 7 : Some(ctx.task_kind()),
746 7 : RemoteOpFileKind::Layer,
747 7 : RemoteOpKind::Download,
748 7 : Arc::clone(&self.metrics),
749 7 : )
750 7 : .await?
751 : };
752 :
753 7 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
754 7 : REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
755 :
756 7 : Ok(downloaded_size)
757 7 : }
758 :
759 : //
760 : // Upload operations.
761 : //
762 :
763 : /// Launch an index-file upload operation in the background, with
764 : /// fully updated metadata.
765 : ///
766 : /// This should only be used to upload initial metadata to remote storage.
767 : ///
768 : /// The upload will be added to the queue immediately, but it
769 : /// won't be performed until all previously scheduled layer file
770 : /// upload operations have completed successfully. This is to
771 : /// ensure that when the index file claims that layers X, Y and Z
772 : /// exist in remote storage, they really do. To wait for the upload
773 : /// to complete, use `wait_completion`.
774 : ///
775 : /// If there were any changes to the list of files, i.e. if any
776 : /// layer file uploads were scheduled, since the last index file
777 : /// upload, those will be included too.
778 118 : pub fn schedule_index_upload_for_full_metadata_update(
779 118 : self: &Arc<Self>,
780 118 : metadata: &TimelineMetadata,
781 118 : ) -> anyhow::Result<()> {
782 118 : let mut guard = self.upload_queue.lock().unwrap();
783 118 : let upload_queue = guard.initialized_mut()?;
784 :
785 : // As documented in the struct definition, it's ok for latest_metadata to be
786 : // ahead of what's _actually_ on the remote during index upload.
787 118 : upload_queue.dirty.metadata = metadata.clone();
788 :
789 118 : self.schedule_index_upload(upload_queue);
790 :
791 118 : Ok(())
792 118 : }
793 :
794 : /// Launch an index-file upload operation in the background, with only parts of the metadata
795 : /// updated.
796 : ///
797 : /// This is the regular way of updating metadata on layer flushes or Gc.
798 : ///
799 : /// Using this lighter update mechanism allows for reparenting and detaching without changes to
800 : /// `index_part.json`, while being more clear on what values update regularly.
801 623 : pub(crate) fn schedule_index_upload_for_metadata_update(
802 623 : self: &Arc<Self>,
803 623 : update: &MetadataUpdate,
804 623 : ) -> anyhow::Result<()> {
805 623 : let mut guard = self.upload_queue.lock().unwrap();
806 623 : let upload_queue = guard.initialized_mut()?;
807 :
808 623 : upload_queue.dirty.metadata.apply(update);
809 :
810 : // Defense in depth: if we somehow generated invalid metadata, do not persist it.
811 623 : upload_queue
812 623 : .dirty
813 623 : .validate()
814 623 : .map_err(|e| anyhow::anyhow!(e))?;
815 :
816 623 : self.schedule_index_upload(upload_queue);
817 :
818 623 : Ok(())
819 623 : }
820 :
821 : /// Launch an index-file upload operation in the background, with only the `archived_at` field updated.
822 : ///
823 : /// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded,
824 : /// so either if the change is already sitting in the queue, but not commited yet, or the change has not
825 : /// been in the queue yet.
826 1 : pub(crate) fn schedule_index_upload_for_timeline_archival_state(
827 1 : self: &Arc<Self>,
828 1 : state: TimelineArchivalState,
829 1 : ) -> anyhow::Result<bool> {
830 1 : let mut guard = self.upload_queue.lock().unwrap();
831 1 : let upload_queue = guard.initialized_mut()?;
832 :
833 : /// Returns Some(_) if a change is needed, and Some(true) if it's a
834 : /// change needed to set archived_at.
835 2 : fn need_change(
836 2 : archived_at: &Option<NaiveDateTime>,
837 2 : state: TimelineArchivalState,
838 2 : ) -> Option<bool> {
839 2 : match (archived_at, state) {
840 : (Some(_), TimelineArchivalState::Archived)
841 : | (None, TimelineArchivalState::Unarchived) => {
842 : // Nothing to do
843 0 : tracing::info!("intended state matches present state");
844 0 : None
845 : }
846 2 : (None, TimelineArchivalState::Archived) => Some(true),
847 0 : (Some(_), TimelineArchivalState::Unarchived) => Some(false),
848 : }
849 2 : }
850 1 : let need_upload_scheduled = need_change(&upload_queue.dirty.archived_at, state);
851 :
852 1 : if let Some(archived_at_set) = need_upload_scheduled {
853 1 : let intended_archived_at = archived_at_set.then(|| Utc::now().naive_utc());
854 1 : upload_queue.dirty.archived_at = intended_archived_at;
855 1 : self.schedule_index_upload(upload_queue);
856 0 : }
857 :
858 1 : let need_wait = need_change(&upload_queue.clean.0.archived_at, state).is_some();
859 1 : Ok(need_wait)
860 1 : }
861 :
862 1 : pub(crate) fn schedule_index_upload_for_timeline_invisible_state(
863 1 : self: &Arc<Self>,
864 1 : state: TimelineVisibilityState,
865 1 : ) -> anyhow::Result<()> {
866 1 : let mut guard = self.upload_queue.lock().unwrap();
867 1 : let upload_queue = guard.initialized_mut()?;
868 :
869 1 : fn need_change(
870 1 : marked_invisible_at: &Option<NaiveDateTime>,
871 1 : state: TimelineVisibilityState,
872 1 : ) -> Option<bool> {
873 1 : match (marked_invisible_at, state) {
874 0 : (Some(_), TimelineVisibilityState::Invisible) => Some(false),
875 1 : (None, TimelineVisibilityState::Invisible) => Some(true),
876 0 : (Some(_), TimelineVisibilityState::Visible) => Some(false),
877 0 : (None, TimelineVisibilityState::Visible) => Some(true),
878 : }
879 1 : }
880 :
881 1 : let need_upload_scheduled = need_change(&upload_queue.dirty.marked_invisible_at, state);
882 :
883 1 : if let Some(marked_invisible_at_set) = need_upload_scheduled {
884 1 : let intended_marked_invisible_at =
885 1 : marked_invisible_at_set.then(|| Utc::now().naive_utc());
886 1 : upload_queue.dirty.marked_invisible_at = intended_marked_invisible_at;
887 1 : self.schedule_index_upload(upload_queue);
888 0 : }
889 :
890 1 : Ok(())
891 1 : }
892 :
893 : /// Shuts the timeline client down, but only if the timeline is archived.
894 : ///
895 : /// This function and [`Self::schedule_index_upload_for_timeline_archival_state`] use the
896 : /// same lock to prevent races between unarchival and offloading: unarchival requires the
897 : /// upload queue to be initialized, and leaves behind an upload queue where either dirty
898 : /// or clean has archived_at of `None`. offloading leaves behind an uninitialized upload
899 : /// queue.
900 1 : pub(crate) async fn shutdown_if_archived(
901 1 : self: &Arc<Self>,
902 1 : ) -> Result<(), ShutdownIfArchivedError> {
903 : {
904 1 : let mut guard = self.upload_queue.lock().unwrap();
905 1 : let upload_queue = guard
906 1 : .initialized_mut()
907 1 : .map_err(ShutdownIfArchivedError::NotInitialized)?;
908 :
909 1 : match (
910 1 : upload_queue.dirty.archived_at.is_none(),
911 1 : upload_queue.clean.0.archived_at.is_none(),
912 1 : ) {
913 : // The expected case: the timeline is archived and we don't want to unarchive
914 1 : (false, false) => {}
915 : (true, false) => {
916 0 : tracing::info!("can't shut down timeline: timeline slated for unarchival");
917 0 : return Err(ShutdownIfArchivedError::NotArchived);
918 : }
919 0 : (dirty_archived, true) => {
920 0 : tracing::info!(%dirty_archived, "can't shut down timeline: timeline not archived in remote storage");
921 0 : return Err(ShutdownIfArchivedError::NotArchived);
922 : }
923 : }
924 :
925 : // Set the shutting_down flag while the guard from the archival check is held.
926 : // This prevents a race with unarchival, as initialized_mut will not return
927 : // an upload queue from this point.
928 : // Also launch the queued tasks like shutdown() does.
929 1 : if !upload_queue.shutting_down {
930 1 : upload_queue.shutting_down = true;
931 1 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
932 1 : // this operation is not counted similar to Barrier
933 1 : self.launch_queued_tasks(upload_queue);
934 1 : }
935 : }
936 :
937 1 : self.shutdown().await;
938 :
939 1 : Ok(())
940 1 : }
941 :
942 : /// Launch an index-file upload operation in the background, setting `import_pgdata` field.
943 0 : pub(crate) fn schedule_index_upload_for_import_pgdata_state_update(
944 0 : self: &Arc<Self>,
945 0 : state: Option<import_pgdata::index_part_format::Root>,
946 0 : ) -> anyhow::Result<()> {
947 0 : let mut guard = self.upload_queue.lock().unwrap();
948 0 : let upload_queue = guard.initialized_mut()?;
949 0 : upload_queue.dirty.import_pgdata = state;
950 0 : self.schedule_index_upload(upload_queue);
951 0 : Ok(())
952 0 : }
953 :
954 : /// If the `import_pgdata` field marks the timeline as having an import in progress,
955 : /// launch an index-file upload operation that transitions it to done in the background
956 0 : pub(crate) fn schedule_index_upload_for_import_pgdata_finalize(
957 0 : self: &Arc<Self>,
958 0 : ) -> anyhow::Result<()> {
959 : use import_pgdata::index_part_format;
960 :
961 0 : let mut guard = self.upload_queue.lock().unwrap();
962 0 : let upload_queue = guard.initialized_mut()?;
963 0 : let to_update = match &upload_queue.dirty.import_pgdata {
964 0 : Some(import) if !import.is_done() => Some(import),
965 0 : Some(_) | None => None,
966 : };
967 :
968 0 : if let Some(old) = to_update {
969 0 : let new =
970 0 : index_part_format::Root::V1(index_part_format::V1::Done(index_part_format::Done {
971 0 : idempotency_key: old.idempotency_key().clone(),
972 0 : started_at: *old.started_at(),
973 0 : finished_at: chrono::Utc::now().naive_utc(),
974 0 : }));
975 0 :
976 0 : upload_queue.dirty.import_pgdata = Some(new);
977 0 : self.schedule_index_upload(upload_queue);
978 0 : }
979 :
980 0 : Ok(())
981 0 : }
982 :
983 : /// Launch an index-file upload operation in the background, setting `gc_compaction_state` field.
984 0 : pub(crate) fn schedule_index_upload_for_gc_compaction_state_update(
985 0 : self: &Arc<Self>,
986 0 : gc_compaction_state: GcCompactionState,
987 0 : ) -> anyhow::Result<()> {
988 0 : let mut guard = self.upload_queue.lock().unwrap();
989 0 : let upload_queue = guard.initialized_mut()?;
990 0 : upload_queue.dirty.gc_compaction = Some(gc_compaction_state);
991 0 : self.schedule_index_upload(upload_queue);
992 0 : Ok(())
993 0 : }
994 :
995 : /// Launch an index-file upload operation in the background, setting `rel_size_v2_status` field.
996 0 : pub(crate) fn schedule_index_upload_for_rel_size_v2_status_update(
997 0 : self: &Arc<Self>,
998 0 : rel_size_v2_status: RelSizeMigration,
999 0 : rel_size_migrated_at: Option<Lsn>,
1000 0 : ) -> anyhow::Result<()> {
1001 0 : let mut guard = self.upload_queue.lock().unwrap();
1002 0 : let upload_queue = guard.initialized_mut()?;
1003 0 : upload_queue.dirty.rel_size_migration = Some(rel_size_v2_status);
1004 0 : upload_queue.dirty.rel_size_migrated_at = rel_size_migrated_at;
1005 : // TODO: allow this operation to bypass the validation check because we might upload the index part
1006 : // with no layers but the flag updated. For now, we just modify the index part in memory and the next
1007 : // upload will include the flag.
1008 : // self.schedule_index_upload(upload_queue);
1009 0 : Ok(())
1010 0 : }
1011 :
1012 : ///
1013 : /// Launch an index-file upload operation in the background, if necessary.
1014 : ///
1015 : /// Use this function to schedule the update of the index file after
1016 : /// scheduling file uploads or deletions. If no file uploads or deletions
1017 : /// have been scheduled since the last index file upload, this does
1018 : /// nothing.
1019 : ///
1020 : /// Like schedule_index_upload_for_metadata_update(), this merely adds
1021 : /// the upload to the upload queue and returns quickly.
1022 83 : pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> Result<(), NotInitialized> {
1023 83 : let mut guard = self.upload_queue.lock().unwrap();
1024 83 : let upload_queue = guard.initialized_mut()?;
1025 :
1026 83 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
1027 7 : self.schedule_index_upload(upload_queue);
1028 76 : }
1029 :
1030 83 : Ok(())
1031 83 : }
1032 :
1033 : /// Only used in the `patch_index_part` HTTP API to force trigger an index upload.
1034 0 : pub fn force_schedule_index_upload(self: &Arc<Self>) -> Result<(), NotInitialized> {
1035 0 : let mut guard = self.upload_queue.lock().unwrap();
1036 0 : let upload_queue = guard.initialized_mut()?;
1037 0 : self.schedule_index_upload(upload_queue);
1038 0 : Ok(())
1039 0 : }
1040 :
1041 : /// Launch an index-file upload operation in the background (internal function)
1042 797 : fn schedule_index_upload(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
1043 797 : let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn();
1044 : // fix up the duplicated field
1045 797 : upload_queue.dirty.disk_consistent_lsn = disk_consistent_lsn;
1046 :
1047 : // make sure it serializes before doing it in perform_upload_task so that it doesn't
1048 : // look like a retryable error
1049 797 : let void = std::io::sink();
1050 797 : serde_json::to_writer(void, &upload_queue.dirty).expect("serialize index_part.json");
1051 :
1052 797 : let index_part = &upload_queue.dirty;
1053 :
1054 797 : info!(
1055 0 : "scheduling metadata upload up to consistent LSN {disk_consistent_lsn} with {} files ({} changed)",
1056 0 : index_part.layer_metadata.len(),
1057 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
1058 : );
1059 :
1060 797 : let op = UploadOp::UploadMetadata {
1061 797 : uploaded: Box::new(index_part.clone()),
1062 797 : };
1063 797 : self.metric_begin(&op);
1064 797 : upload_queue.queued_operations.push_back(op);
1065 797 : upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
1066 :
1067 : // Launch the task immediately, if possible
1068 797 : self.launch_queued_tasks(upload_queue);
1069 797 : }
1070 :
1071 : /// Reparent this timeline to a new parent.
1072 : ///
1073 : /// A retryable step of timeline ancestor detach.
1074 0 : pub(crate) async fn schedule_reparenting_and_wait(
1075 0 : self: &Arc<Self>,
1076 0 : new_parent: &TimelineId,
1077 0 : ) -> anyhow::Result<()> {
1078 0 : let receiver = {
1079 0 : let mut guard = self.upload_queue.lock().unwrap();
1080 0 : let upload_queue = guard.initialized_mut()?;
1081 :
1082 0 : let Some(prev) = upload_queue.dirty.metadata.ancestor_timeline() else {
1083 0 : return Err(anyhow::anyhow!(
1084 0 : "cannot reparent without a current ancestor"
1085 0 : ));
1086 : };
1087 :
1088 0 : let uploaded = &upload_queue.clean.0.metadata;
1089 :
1090 0 : if uploaded.ancestor_timeline().is_none() && !uploaded.ancestor_lsn().is_valid() {
1091 : // nothing to do
1092 0 : None
1093 : } else {
1094 0 : upload_queue.dirty.metadata.reparent(new_parent);
1095 0 : upload_queue.dirty.lineage.record_previous_ancestor(&prev);
1096 :
1097 0 : self.schedule_index_upload(upload_queue);
1098 :
1099 0 : Some(self.schedule_barrier0(upload_queue))
1100 : }
1101 : };
1102 :
1103 0 : if let Some(receiver) = receiver {
1104 0 : Self::wait_completion0(receiver).await?;
1105 0 : }
1106 0 : Ok(())
1107 0 : }
1108 :
1109 : /// Schedules uploading a new version of `index_part.json` with the given layers added,
1110 : /// detaching from ancestor and waits for it to complete.
1111 : ///
1112 : /// This is used with `Timeline::detach_ancestor` functionality.
1113 0 : pub(crate) async fn schedule_adding_existing_layers_to_index_detach_and_wait(
1114 0 : self: &Arc<Self>,
1115 0 : layers: &[Layer],
1116 0 : adopted: (TimelineId, Lsn),
1117 0 : ) -> anyhow::Result<()> {
1118 0 : let barrier = {
1119 0 : let mut guard = self.upload_queue.lock().unwrap();
1120 0 : let upload_queue = guard.initialized_mut()?;
1121 :
1122 0 : if upload_queue.clean.0.lineage.detached_previous_ancestor() == Some(adopted) {
1123 0 : None
1124 : } else {
1125 0 : upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
1126 0 : upload_queue.dirty.lineage.record_detaching(&adopted);
1127 :
1128 0 : for layer in layers {
1129 0 : let prev = upload_queue
1130 0 : .dirty
1131 0 : .layer_metadata
1132 0 : .insert(layer.layer_desc().layer_name(), layer.metadata());
1133 0 : assert!(prev.is_none(), "copied layer existed already {layer}");
1134 : }
1135 :
1136 0 : self.schedule_index_upload(upload_queue);
1137 :
1138 0 : Some(self.schedule_barrier0(upload_queue))
1139 : }
1140 : };
1141 :
1142 0 : if let Some(barrier) = barrier {
1143 0 : Self::wait_completion0(barrier).await?;
1144 0 : }
1145 0 : Ok(())
1146 0 : }
1147 :
1148 : /// Adds a gc blocking reason for this timeline if one does not exist already.
1149 : ///
1150 : /// A retryable step of timeline detach ancestor.
1151 : ///
1152 : /// Returns a future which waits until the completion of the upload.
1153 0 : pub(crate) fn schedule_insert_gc_block_reason(
1154 0 : self: &Arc<Self>,
1155 0 : reason: index::GcBlockingReason,
1156 0 : ) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
1157 : {
1158 0 : let maybe_barrier = {
1159 0 : let mut guard = self.upload_queue.lock().unwrap();
1160 0 : let upload_queue = guard.initialized_mut()?;
1161 :
1162 0 : if let index::GcBlockingReason::DetachAncestor = reason {
1163 0 : if upload_queue.dirty.metadata.ancestor_timeline().is_none() {
1164 0 : drop(guard);
1165 0 : panic!("cannot start detach ancestor if there is nothing to detach from");
1166 0 : }
1167 0 : }
1168 :
1169 0 : let wanted = |x: Option<&index::GcBlocking>| x.is_some_and(|x| x.blocked_by(reason));
1170 :
1171 0 : let current = upload_queue.dirty.gc_blocking.as_ref();
1172 0 : let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
1173 :
1174 0 : match (current, uploaded) {
1175 0 : (x, y) if wanted(x) && wanted(y) => None,
1176 0 : (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
1177 : // Usual case: !wanted(x) && !wanted(y)
1178 : //
1179 : // Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to
1180 : // turn on and off some reason.
1181 0 : (x, y) => {
1182 0 : if !wanted(x) && wanted(y) {
1183 : // this could be avoided by having external in-memory synchronization, like
1184 : // timeline detach ancestor
1185 0 : warn!(
1186 : ?reason,
1187 : op = "insert",
1188 0 : "unexpected: two racing processes to enable and disable a gc blocking reason"
1189 : );
1190 0 : }
1191 :
1192 : // at this point, the metadata must always show that there is a parent
1193 0 : upload_queue.dirty.gc_blocking = current
1194 0 : .map(|x| x.with_reason(reason))
1195 0 : .or_else(|| Some(index::GcBlocking::started_now_for(reason)));
1196 0 : self.schedule_index_upload(upload_queue);
1197 0 : Some(self.schedule_barrier0(upload_queue))
1198 : }
1199 : }
1200 : };
1201 :
1202 0 : Ok(async move {
1203 0 : if let Some(barrier) = maybe_barrier {
1204 0 : Self::wait_completion0(barrier).await?;
1205 0 : }
1206 0 : Ok(())
1207 0 : })
1208 0 : }
1209 :
1210 : /// Removes a gc blocking reason for this timeline if one exists.
1211 : ///
1212 : /// A retryable step of timeline detach ancestor.
1213 : ///
1214 : /// Returns a future which waits until the completion of the upload.
1215 0 : pub(crate) fn schedule_remove_gc_block_reason(
1216 0 : self: &Arc<Self>,
1217 0 : reason: index::GcBlockingReason,
1218 0 : ) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
1219 : {
1220 0 : let maybe_barrier = {
1221 0 : let mut guard = self.upload_queue.lock().unwrap();
1222 0 : let upload_queue = guard.initialized_mut()?;
1223 :
1224 0 : if let index::GcBlockingReason::DetachAncestor = reason {
1225 0 : if !upload_queue.clean.0.lineage.is_detached_from_ancestor() {
1226 0 : drop(guard);
1227 0 : panic!("cannot complete timeline_ancestor_detach while not detached");
1228 0 : }
1229 0 : }
1230 :
1231 0 : let wanted = |x: Option<&index::GcBlocking>| {
1232 0 : x.is_none() || x.is_some_and(|b| !b.blocked_by(reason))
1233 0 : };
1234 :
1235 0 : let current = upload_queue.dirty.gc_blocking.as_ref();
1236 0 : let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
1237 :
1238 0 : match (current, uploaded) {
1239 0 : (x, y) if wanted(x) && wanted(y) => None,
1240 0 : (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
1241 0 : (x, y) => {
1242 0 : if !wanted(x) && wanted(y) {
1243 0 : warn!(
1244 : ?reason,
1245 : op = "remove",
1246 0 : "unexpected: two racing processes to enable and disable a gc blocking reason (remove)"
1247 : );
1248 0 : }
1249 :
1250 : upload_queue.dirty.gc_blocking =
1251 0 : current.as_ref().and_then(|x| x.without_reason(reason));
1252 0 : assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
1253 0 : self.schedule_index_upload(upload_queue);
1254 0 : Some(self.schedule_barrier0(upload_queue))
1255 : }
1256 : }
1257 : };
1258 :
1259 0 : Ok(async move {
1260 0 : if let Some(barrier) = maybe_barrier {
1261 0 : Self::wait_completion0(barrier).await?;
1262 0 : }
1263 0 : Ok(())
1264 0 : })
1265 0 : }
1266 :
1267 : /// Launch an upload operation in the background; the file is added to be included in next
1268 : /// `index_part.json` upload.
1269 705 : pub(crate) fn schedule_layer_file_upload(
1270 705 : self: &Arc<Self>,
1271 705 : layer: ResidentLayer,
1272 705 : ) -> Result<(), NotInitialized> {
1273 705 : let mut guard = self.upload_queue.lock().unwrap();
1274 705 : let upload_queue = guard.initialized_mut()?;
1275 :
1276 705 : self.schedule_layer_file_upload0(upload_queue, layer);
1277 705 : self.launch_queued_tasks(upload_queue);
1278 705 : Ok(())
1279 705 : }
1280 :
1281 895 : fn schedule_layer_file_upload0(
1282 895 : self: &Arc<Self>,
1283 895 : upload_queue: &mut UploadQueueInitialized,
1284 895 : layer: ResidentLayer,
1285 895 : ) {
1286 895 : let metadata = layer.metadata();
1287 :
1288 895 : upload_queue
1289 895 : .dirty
1290 895 : .layer_metadata
1291 895 : .insert(layer.layer_desc().layer_name(), metadata.clone());
1292 895 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
1293 :
1294 895 : info!(
1295 : gen=?metadata.generation,
1296 : shard=?metadata.shard,
1297 0 : "scheduled layer file upload {layer}",
1298 : );
1299 :
1300 895 : let op = UploadOp::UploadLayer(layer, metadata, None);
1301 895 : self.metric_begin(&op);
1302 895 : upload_queue.queued_operations.push_back(op);
1303 895 : }
1304 :
1305 : /// Launch a delete operation in the background.
1306 : ///
1307 : /// The operation does not modify local filesystem state.
1308 : ///
1309 : /// Note: This schedules an index file upload before the deletions. The
1310 : /// deletion won't actually be performed, until all previously scheduled
1311 : /// upload operations, and the index file upload, have completed
1312 : /// successfully.
1313 4 : pub fn schedule_layer_file_deletion(
1314 4 : self: &Arc<Self>,
1315 4 : names: &[LayerName],
1316 4 : ) -> anyhow::Result<()> {
1317 4 : let mut guard = self.upload_queue.lock().unwrap();
1318 4 : let upload_queue = guard.initialized_mut()?;
1319 :
1320 4 : let with_metadata =
1321 4 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
1322 :
1323 4 : self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
1324 :
1325 : // Launch the tasks immediately, if possible
1326 4 : self.launch_queued_tasks(upload_queue);
1327 4 : Ok(())
1328 4 : }
1329 :
1330 : /// Unlinks the layer files from `index_part.json` but does not yet schedule deletion for the
1331 : /// layer files, leaving them dangling.
1332 : ///
1333 : /// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
1334 : /// is invoked on them.
1335 4 : pub(crate) fn schedule_gc_update(
1336 4 : self: &Arc<Self>,
1337 4 : gc_layers: &[Layer],
1338 4 : ) -> Result<(), NotInitialized> {
1339 4 : let mut guard = self.upload_queue.lock().unwrap();
1340 4 : let upload_queue = guard.initialized_mut()?;
1341 :
1342 : // just forget the return value; after uploading the next index_part.json, we can consider
1343 : // the layer files as "dangling". this is fine, at worst case we create work for the
1344 : // scrubber.
1345 :
1346 5 : let names = gc_layers.iter().map(|x| x.layer_desc().layer_name());
1347 :
1348 4 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
1349 :
1350 4 : self.launch_queued_tasks(upload_queue);
1351 :
1352 4 : Ok(())
1353 4 : }
1354 :
1355 0 : pub(crate) fn schedule_unlinking_of_layers_from_index_part<I>(
1356 0 : self: &Arc<Self>,
1357 0 : names: I,
1358 0 : ) -> Result<(), NotInitialized>
1359 0 : where
1360 0 : I: IntoIterator<Item = LayerName>,
1361 : {
1362 0 : let mut guard = self.upload_queue.lock().unwrap();
1363 0 : let upload_queue = guard.initialized_mut()?;
1364 :
1365 0 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
1366 :
1367 0 : Ok(())
1368 0 : }
1369 :
1370 : /// Update the remote index file, removing the to-be-deleted files from the index,
1371 : /// allowing scheduling of actual deletions later.
1372 55 : fn schedule_unlinking_of_layers_from_index_part0<I>(
1373 55 : self: &Arc<Self>,
1374 55 : upload_queue: &mut UploadQueueInitialized,
1375 55 : names: I,
1376 55 : ) -> Vec<(LayerName, LayerFileMetadata)>
1377 55 : where
1378 55 : I: IntoIterator<Item = LayerName>,
1379 : {
1380 : // Decorate our list of names with each name's metadata, dropping
1381 : // names that are unexpectedly missing from our metadata. This metadata
1382 : // is later used when physically deleting layers, to construct key paths.
1383 55 : let with_metadata: Vec<_> = names
1384 55 : .into_iter()
1385 259 : .filter_map(|name| {
1386 259 : let meta = upload_queue.dirty.layer_metadata.remove(&name);
1387 :
1388 259 : if let Some(meta) = meta {
1389 259 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
1390 259 : Some((name, meta))
1391 : } else {
1392 : // This can only happen if we forgot to to schedule the file upload
1393 : // before scheduling the delete. Log it because it is a rare/strange
1394 : // situation, and in case something is misbehaving, we'd like to know which
1395 : // layers experienced this.
1396 0 : info!("Deleting layer {name} not found in latest_files list, never uploaded?");
1397 0 : None
1398 : }
1399 259 : })
1400 55 : .collect();
1401 :
1402 : #[cfg(feature = "testing")]
1403 314 : for (name, metadata) in &with_metadata {
1404 259 : let gen_ = metadata.generation;
1405 259 : if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen_) {
1406 0 : if unexpected == gen_ {
1407 0 : tracing::error!("{name} was unlinked twice with same generation");
1408 : } else {
1409 0 : tracing::error!(
1410 0 : "{name} was unlinked twice with different generations {gen_:?} and {unexpected:?}"
1411 : );
1412 : }
1413 259 : }
1414 : }
1415 :
1416 : // after unlinking files from the upload_queue.latest_files we must always schedule an
1417 : // index_part update, because that needs to be uploaded before we can actually delete the
1418 : // files.
1419 55 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
1420 47 : self.schedule_index_upload(upload_queue);
1421 47 : }
1422 :
1423 55 : with_metadata
1424 55 : }
1425 :
1426 : /// Schedules deletion for layer files which have previously been unlinked from the
1427 : /// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
1428 260 : pub(crate) fn schedule_deletion_of_unlinked(
1429 260 : self: &Arc<Self>,
1430 260 : layers: Vec<(LayerName, LayerFileMetadata)>,
1431 260 : ) -> anyhow::Result<()> {
1432 260 : let mut guard = self.upload_queue.lock().unwrap();
1433 260 : let upload_queue = guard.initialized_mut()?;
1434 :
1435 260 : self.schedule_deletion_of_unlinked0(upload_queue, layers);
1436 260 : self.launch_queued_tasks(upload_queue);
1437 260 : Ok(())
1438 260 : }
1439 :
1440 263 : fn schedule_deletion_of_unlinked0(
1441 263 : self: &Arc<Self>,
1442 263 : upload_queue: &mut UploadQueueInitialized,
1443 263 : mut with_metadata: Vec<(LayerName, LayerFileMetadata)>,
1444 263 : ) {
1445 : // Filter out any layers which were not created by this tenant shard. These are
1446 : // layers that originate from some ancestor shard after a split, and may still
1447 : // be referenced by other shards. We are free to delete them locally and remove
1448 : // them from our index (and would have already done so when we reach this point
1449 : // in the code), but we may not delete them remotely.
1450 263 : with_metadata.retain(|(name, meta)| {
1451 260 : let retain = meta.shard.shard_number == self.tenant_shard_id.shard_number
1452 260 : && meta.shard.shard_count == self.tenant_shard_id.shard_count;
1453 260 : if !retain {
1454 0 : tracing::debug!(
1455 0 : "Skipping deletion of ancestor-shard layer {name}, from shard {}",
1456 : meta.shard
1457 : );
1458 260 : }
1459 260 : retain
1460 260 : });
1461 :
1462 523 : for (name, meta) in &with_metadata {
1463 260 : info!(
1464 0 : "scheduling deletion of layer {}{} (shard {})",
1465 : name,
1466 0 : meta.generation.get_suffix(),
1467 : meta.shard
1468 : );
1469 : }
1470 :
1471 : #[cfg(feature = "testing")]
1472 523 : for (name, meta) in &with_metadata {
1473 260 : let gen_ = meta.generation;
1474 260 : match upload_queue.dangling_files.remove(name) {
1475 258 : Some(same) if same == gen_ => { /* expected */ }
1476 0 : Some(other) => {
1477 0 : tracing::error!("{name} was unlinked with {other:?} but deleted with {gen_:?}");
1478 : }
1479 : None => {
1480 2 : tracing::error!("{name} was unlinked but was not dangling");
1481 : }
1482 : }
1483 : }
1484 :
1485 : // schedule the actual deletions
1486 263 : if with_metadata.is_empty() {
1487 : // avoid scheduling the op & bumping the metric
1488 3 : return;
1489 260 : }
1490 260 : let op = UploadOp::Delete(Delete {
1491 260 : layers: with_metadata,
1492 260 : });
1493 260 : self.metric_begin(&op);
1494 260 : upload_queue.queued_operations.push_back(op);
1495 263 : }
1496 :
1497 : /// Schedules a compaction update to the remote `index_part.json`.
1498 : ///
1499 : /// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
1500 47 : pub(crate) fn schedule_compaction_update(
1501 47 : self: &Arc<Self>,
1502 47 : compacted_from: &[Layer],
1503 47 : compacted_to: &[ResidentLayer],
1504 47 : ) -> Result<(), NotInitialized> {
1505 47 : let mut guard = self.upload_queue.lock().unwrap();
1506 47 : let upload_queue = guard.initialized_mut()?;
1507 :
1508 237 : for layer in compacted_to {
1509 190 : self.schedule_layer_file_upload0(upload_queue, layer.clone());
1510 190 : }
1511 :
1512 253 : let names = compacted_from.iter().map(|x| x.layer_desc().layer_name());
1513 :
1514 47 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
1515 47 : self.launch_queued_tasks(upload_queue);
1516 :
1517 47 : Ok(())
1518 47 : }
1519 :
1520 : /// Wait for all previously scheduled uploads/deletions to complete
1521 118 : pub(crate) async fn wait_completion(self: &Arc<Self>) -> Result<(), WaitCompletionError> {
1522 118 : let receiver = {
1523 118 : let mut guard = self.upload_queue.lock().unwrap();
1524 118 : let upload_queue = guard
1525 118 : .initialized_mut()
1526 118 : .map_err(WaitCompletionError::NotInitialized)?;
1527 118 : self.schedule_barrier0(upload_queue)
1528 : };
1529 :
1530 118 : Self::wait_completion0(receiver).await
1531 118 : }
1532 :
1533 118 : async fn wait_completion0(
1534 118 : mut receiver: tokio::sync::watch::Receiver<()>,
1535 118 : ) -> Result<(), WaitCompletionError> {
1536 118 : if receiver.changed().await.is_err() {
1537 0 : return Err(WaitCompletionError::UploadQueueShutDownOrStopped);
1538 118 : }
1539 :
1540 118 : Ok(())
1541 118 : }
1542 :
1543 3 : pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
1544 3 : let mut guard = self.upload_queue.lock().unwrap();
1545 3 : let upload_queue = guard.initialized_mut()?;
1546 3 : self.schedule_barrier0(upload_queue);
1547 3 : Ok(())
1548 3 : }
1549 :
1550 121 : fn schedule_barrier0(
1551 121 : self: &Arc<Self>,
1552 121 : upload_queue: &mut UploadQueueInitialized,
1553 121 : ) -> tokio::sync::watch::Receiver<()> {
1554 121 : let (sender, receiver) = tokio::sync::watch::channel(());
1555 121 : let barrier_op = UploadOp::Barrier(sender);
1556 :
1557 121 : upload_queue.queued_operations.push_back(barrier_op);
1558 : // Don't count this kind of operation!
1559 :
1560 : // Launch the task immediately, if possible
1561 121 : self.launch_queued_tasks(upload_queue);
1562 :
1563 121 : receiver
1564 121 : }
1565 :
1566 : /// Wait for all previously scheduled operations to complete, and then stop.
1567 : ///
1568 : /// Not cancellation safe
1569 5 : pub(crate) async fn shutdown(self: &Arc<Self>) {
1570 : // On cancellation the queue is left in ackward state of refusing new operations but
1571 : // proper stop is yet to be called. On cancel the original or some later task must call
1572 : // `stop` or `shutdown`.
1573 5 : let sg = scopeguard::guard((), |_| {
1574 0 : tracing::error!(
1575 0 : "RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error"
1576 : )
1577 0 : });
1578 :
1579 4 : let fut = {
1580 5 : let mut guard = self.upload_queue.lock().unwrap();
1581 5 : let upload_queue = match &mut *guard {
1582 : UploadQueue::Stopped(_) => {
1583 1 : scopeguard::ScopeGuard::into_inner(sg);
1584 1 : return;
1585 : }
1586 : UploadQueue::Uninitialized => {
1587 : // transition into Stopped state
1588 0 : self.stop_impl(&mut guard);
1589 0 : scopeguard::ScopeGuard::into_inner(sg);
1590 0 : return;
1591 : }
1592 4 : UploadQueue::Initialized(init) => init,
1593 : };
1594 :
1595 : // if the queue is already stuck due to a shutdown operation which was cancelled, then
1596 : // just don't add more of these as they would never complete.
1597 : //
1598 : // TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue
1599 : // in every place we would not have to jump through this hoop, and this method could be
1600 : // made cancellable.
1601 4 : if !upload_queue.shutting_down {
1602 3 : upload_queue.shutting_down = true;
1603 3 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
1604 3 : // this operation is not counted similar to Barrier
1605 3 :
1606 3 : self.launch_queued_tasks(upload_queue);
1607 3 : }
1608 :
1609 4 : upload_queue.shutdown_ready.clone().acquire_owned()
1610 : };
1611 :
1612 4 : let res = fut.await;
1613 :
1614 4 : scopeguard::ScopeGuard::into_inner(sg);
1615 :
1616 4 : match res {
1617 0 : Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
1618 4 : Err(_closed) => {
1619 4 : // expected
1620 4 : }
1621 : }
1622 :
1623 4 : self.stop();
1624 5 : }
1625 :
1626 : /// Set the deleted_at field in the remote index file.
1627 : ///
1628 : /// This fails if the upload queue has not been `stop()`ed.
1629 : ///
1630 : /// The caller is responsible for calling `stop()` AND for waiting
1631 : /// for any ongoing upload tasks to finish after `stop()` has succeeded.
1632 : /// Check method [`RemoteTimelineClient::stop`] for details.
1633 : #[instrument(skip_all)]
1634 : pub(crate) async fn persist_index_part_with_deleted_flag(
1635 : self: &Arc<Self>,
1636 : ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
1637 : let index_part_with_deleted_at = {
1638 : let mut locked = self.upload_queue.lock().unwrap();
1639 :
1640 : // We must be in stopped state because otherwise
1641 : // we can have inprogress index part upload that can overwrite the file
1642 : // with missing is_deleted flag that we going to set below
1643 : let stopped = locked.stopped_mut()?;
1644 :
1645 : match stopped.deleted_at {
1646 : SetDeletedFlagProgress::NotRunning => (), // proceed
1647 : SetDeletedFlagProgress::InProgress(at) => {
1648 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
1649 : }
1650 : SetDeletedFlagProgress::Successful(at) => {
1651 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
1652 : }
1653 : };
1654 : let deleted_at = Utc::now().naive_utc();
1655 : stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
1656 :
1657 : let mut index_part = stopped.upload_queue_for_deletion.dirty.clone();
1658 : index_part.deleted_at = Some(deleted_at);
1659 : index_part
1660 : };
1661 :
1662 0 : let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
1663 0 : let mut locked = self_clone.upload_queue.lock().unwrap();
1664 0 : let stopped = locked
1665 0 : .stopped_mut()
1666 0 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1667 0 : stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
1668 0 : });
1669 :
1670 : pausable_failpoint!("persist_deleted_index_part");
1671 :
1672 : backoff::retry(
1673 0 : || {
1674 0 : upload::upload_index_part(
1675 0 : &self.storage_impl,
1676 0 : &self.tenant_shard_id,
1677 0 : &self.timeline_id,
1678 0 : self.generation,
1679 0 : &index_part_with_deleted_at,
1680 0 : &self.cancel,
1681 : )
1682 0 : },
1683 : |_e| false,
1684 : 1,
1685 : // have just a couple of attempts
1686 : // when executed as part of timeline deletion this happens in context of api call
1687 : // when executed as part of tenant deletion this happens in the background
1688 : 2,
1689 : "persist_index_part_with_deleted_flag",
1690 : &self.cancel,
1691 : )
1692 : .await
1693 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1694 : .and_then(|x| x)?;
1695 :
1696 : // all good, disarm the guard and mark as success
1697 : ScopeGuard::into_inner(undo_deleted_at);
1698 : {
1699 : let mut locked = self.upload_queue.lock().unwrap();
1700 :
1701 : let stopped = locked
1702 : .stopped_mut()
1703 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1704 : stopped.deleted_at = SetDeletedFlagProgress::Successful(
1705 : index_part_with_deleted_at
1706 : .deleted_at
1707 : .expect("we set it above"),
1708 : );
1709 : }
1710 :
1711 : Ok(())
1712 : }
1713 :
1714 0 : pub(crate) fn is_deleting(&self) -> bool {
1715 0 : let mut locked = self.upload_queue.lock().unwrap();
1716 0 : locked.stopped_mut().is_ok()
1717 0 : }
1718 :
1719 0 : pub(crate) async fn preserve_initdb_archive(
1720 0 : self: &Arc<Self>,
1721 0 : tenant_id: &TenantId,
1722 0 : timeline_id: &TimelineId,
1723 0 : cancel: &CancellationToken,
1724 0 : ) -> anyhow::Result<()> {
1725 0 : backoff::retry(
1726 0 : || async {
1727 0 : upload::preserve_initdb_archive(&self.storage_impl, tenant_id, timeline_id, cancel)
1728 0 : .await
1729 0 : },
1730 : TimeoutOrCancel::caused_by_cancel,
1731 : FAILED_DOWNLOAD_WARN_THRESHOLD,
1732 : FAILED_REMOTE_OP_RETRIES,
1733 0 : "preserve_initdb_tar_zst",
1734 0 : &cancel.clone(),
1735 : )
1736 0 : .await
1737 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1738 0 : .and_then(|x| x)
1739 0 : .context("backing up initdb archive")?;
1740 0 : Ok(())
1741 0 : }
1742 :
1743 : /// Uploads the given layer **without** adding it to be part of a future `index_part.json` upload.
1744 : ///
1745 : /// This is not normally needed.
1746 0 : pub(crate) async fn upload_layer_file(
1747 0 : self: &Arc<Self>,
1748 0 : uploaded: &ResidentLayer,
1749 0 : cancel: &CancellationToken,
1750 0 : ) -> anyhow::Result<()> {
1751 0 : let remote_path = remote_layer_path(
1752 0 : &self.tenant_shard_id.tenant_id,
1753 0 : &self.timeline_id,
1754 0 : uploaded.metadata().shard,
1755 0 : &uploaded.layer_desc().layer_name(),
1756 0 : uploaded.metadata().generation,
1757 : );
1758 :
1759 0 : backoff::retry(
1760 0 : || async {
1761 0 : upload::upload_timeline_layer(
1762 0 : &self.storage_impl,
1763 0 : uploaded.local_path(),
1764 0 : &remote_path,
1765 0 : uploaded.metadata().file_size,
1766 0 : cancel,
1767 0 : )
1768 0 : .await
1769 0 : },
1770 : TimeoutOrCancel::caused_by_cancel,
1771 : FAILED_UPLOAD_WARN_THRESHOLD,
1772 : FAILED_REMOTE_OP_RETRIES,
1773 0 : "upload a layer without adding it to latest files",
1774 0 : cancel,
1775 : )
1776 0 : .await
1777 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1778 0 : .and_then(|x| x)
1779 0 : .context("upload a layer without adding it to latest files")
1780 0 : }
1781 :
1782 : /// Copies the `adopted` remote existing layer to the remote path of `adopted_as`. The layer is
1783 : /// not added to be part of a future `index_part.json` upload.
1784 0 : pub(crate) async fn copy_timeline_layer(
1785 0 : self: &Arc<Self>,
1786 0 : adopted: &Layer,
1787 0 : adopted_as: &Layer,
1788 0 : cancel: &CancellationToken,
1789 0 : ) -> anyhow::Result<()> {
1790 0 : let source_remote_path = remote_layer_path(
1791 0 : &self.tenant_shard_id.tenant_id,
1792 0 : &adopted
1793 0 : .get_timeline_id()
1794 0 : .expect("Source timeline should be alive"),
1795 0 : adopted.metadata().shard,
1796 0 : &adopted.layer_desc().layer_name(),
1797 0 : adopted.metadata().generation,
1798 : );
1799 :
1800 0 : let target_remote_path = remote_layer_path(
1801 0 : &self.tenant_shard_id.tenant_id,
1802 0 : &self.timeline_id,
1803 0 : adopted_as.metadata().shard,
1804 0 : &adopted_as.layer_desc().layer_name(),
1805 0 : adopted_as.metadata().generation,
1806 : );
1807 :
1808 0 : backoff::retry(
1809 0 : || async {
1810 0 : upload::copy_timeline_layer(
1811 0 : &self.storage_impl,
1812 0 : &source_remote_path,
1813 0 : &target_remote_path,
1814 0 : cancel,
1815 0 : )
1816 0 : .await
1817 0 : },
1818 : TimeoutOrCancel::caused_by_cancel,
1819 : FAILED_UPLOAD_WARN_THRESHOLD,
1820 : FAILED_REMOTE_OP_RETRIES,
1821 0 : "copy timeline layer",
1822 0 : cancel,
1823 : )
1824 0 : .await
1825 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1826 0 : .and_then(|x| x)
1827 0 : .context("remote copy timeline layer")
1828 0 : }
1829 :
1830 0 : async fn flush_deletion_queue(&self) -> Result<(), DeletionQueueError> {
1831 0 : match tokio::time::timeout(
1832 : DELETION_QUEUE_FLUSH_TIMEOUT,
1833 0 : self.deletion_queue_client.flush_immediate(),
1834 : )
1835 0 : .await
1836 : {
1837 0 : Ok(result) => result,
1838 0 : Err(_timeout) => {
1839 : // Flushing remote deletions is not mandatory: we flush here to make the system easier to test, and
1840 : // to ensure that _usually_ objects are really gone after a DELETE is acked. However, in case of deletion
1841 : // queue issues (https://github.com/neondatabase/neon/issues/6440), we don't want to wait indefinitely here.
1842 0 : tracing::warn!(
1843 0 : "Timed out waiting for deletion queue flush, acking deletion anyway"
1844 : );
1845 0 : Ok(())
1846 : }
1847 : }
1848 0 : }
1849 :
1850 : /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
1851 : /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
1852 : /// deletes leaked files if any and proceeds with deletion of index file at the end.
1853 0 : pub(crate) async fn delete_all(self: &Arc<Self>) -> Result<(), DeleteTimelineError> {
1854 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1855 :
1856 0 : let layers: Vec<RemotePath> = {
1857 0 : let mut locked = self.upload_queue.lock().unwrap();
1858 0 : let stopped = locked.stopped_mut().map_err(DeleteTimelineError::Other)?;
1859 :
1860 0 : if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
1861 0 : return Err(DeleteTimelineError::Other(anyhow::anyhow!(
1862 0 : "deleted_at is not set"
1863 0 : )));
1864 0 : }
1865 :
1866 0 : debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
1867 :
1868 0 : stopped
1869 0 : .upload_queue_for_deletion
1870 0 : .dirty
1871 0 : .layer_metadata
1872 0 : .drain()
1873 0 : .filter(|(_file_name, meta)| {
1874 : // Filter out layers that belonged to an ancestor shard. Since we are deleting the whole timeline from
1875 : // all shards anyway, we _could_ delete these, but
1876 : // - it creates a potential race if other shards are still
1877 : // using the layers while this shard deletes them.
1878 : // - it means that if we rolled back the shard split, the ancestor shards would be in a state where
1879 : // these timelines are present but corrupt (their index exists but some layers don't)
1880 : //
1881 : // These layers will eventually be cleaned up by the scrubber when it does physical GC.
1882 0 : meta.shard.shard_number == self.tenant_shard_id.shard_number
1883 0 : && meta.shard.shard_count == self.tenant_shard_id.shard_count
1884 0 : })
1885 0 : .map(|(file_name, meta)| {
1886 0 : remote_layer_path(
1887 0 : &self.tenant_shard_id.tenant_id,
1888 0 : &self.timeline_id,
1889 0 : meta.shard,
1890 0 : &file_name,
1891 0 : meta.generation,
1892 : )
1893 0 : })
1894 0 : .collect()
1895 : };
1896 :
1897 0 : let layer_deletion_count = layers.len();
1898 0 : self.deletion_queue_client
1899 0 : .push_immediate(layers)
1900 0 : .await
1901 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1902 :
1903 : // Delete the initdb.tar.zst, which is not always present, but deletion attempts of
1904 : // inexistant objects are not considered errors.
1905 0 : let initdb_path =
1906 0 : remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id);
1907 0 : self.deletion_queue_client
1908 0 : .push_immediate(vec![initdb_path])
1909 0 : .await
1910 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1911 :
1912 : // Do not delete index part yet, it is needed for possible retry. If we remove it first
1913 : // and retry will arrive to different pageserver there wont be any traces of it on remote storage
1914 0 : let timeline_storage_path = remote_timeline_path(&self.tenant_shard_id, &self.timeline_id);
1915 :
1916 : // Execute all pending deletions, so that when we proceed to do a listing below, we aren't
1917 : // taking the burden of listing all the layers that we already know we should delete.
1918 0 : self.flush_deletion_queue()
1919 0 : .await
1920 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1921 :
1922 0 : let cancel = shutdown_token();
1923 :
1924 0 : let remaining = download_retry(
1925 0 : || async {
1926 0 : self.storage_impl
1927 0 : .list(
1928 0 : Some(&timeline_storage_path),
1929 0 : ListingMode::NoDelimiter,
1930 0 : None,
1931 0 : &cancel,
1932 0 : )
1933 0 : .await
1934 0 : },
1935 0 : "list remaining files",
1936 0 : &cancel,
1937 : )
1938 0 : .await
1939 0 : .context("list files remaining files")?
1940 : .keys;
1941 :
1942 : // We will delete the current index_part object last, since it acts as a deletion
1943 : // marker via its deleted_at attribute
1944 0 : let latest_index = remaining
1945 0 : .iter()
1946 0 : .filter(|o| {
1947 0 : o.key
1948 0 : .object_name()
1949 0 : .map(|n| n.starts_with(IndexPart::FILE_NAME))
1950 0 : .unwrap_or(false)
1951 0 : })
1952 0 : .filter_map(|o| {
1953 0 : parse_remote_index_path(o.key.clone()).map(|gen_| (o.key.clone(), gen_))
1954 0 : })
1955 0 : .max_by_key(|i| i.1)
1956 0 : .map(|i| i.0.clone())
1957 0 : .unwrap_or(
1958 : // No generation-suffixed indices, assume we are dealing with
1959 : // a legacy index.
1960 0 : remote_index_path(&self.tenant_shard_id, &self.timeline_id, Generation::none()),
1961 : );
1962 :
1963 0 : let remaining_layers: Vec<RemotePath> = remaining
1964 0 : .into_iter()
1965 0 : .filter_map(|o| {
1966 0 : if o.key == latest_index || o.key.object_name() == Some(INITDB_PRESERVED_PATH) {
1967 0 : None
1968 : } else {
1969 0 : Some(o.key)
1970 : }
1971 0 : })
1972 0 : .inspect(|path| {
1973 0 : if let Some(name) = path.object_name() {
1974 0 : info!(%name, "deleting a file not referenced from index_part.json");
1975 : } else {
1976 0 : warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
1977 : }
1978 0 : })
1979 0 : .collect();
1980 :
1981 0 : let not_referenced_count = remaining_layers.len();
1982 0 : if !remaining_layers.is_empty() {
1983 0 : self.deletion_queue_client
1984 0 : .push_immediate(remaining_layers)
1985 0 : .await
1986 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1987 0 : }
1988 :
1989 0 : fail::fail_point!("timeline-delete-before-index-delete", |_| {
1990 0 : Err(DeleteTimelineError::Other(anyhow::anyhow!(
1991 0 : "failpoint: timeline-delete-before-index-delete"
1992 0 : )))?
1993 0 : });
1994 :
1995 0 : debug!("enqueuing index part deletion");
1996 0 : self.deletion_queue_client
1997 0 : .push_immediate([latest_index].to_vec())
1998 0 : .await
1999 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
2000 :
2001 : // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
2002 : // for a flush to a persistent deletion list so that we may be sure deletion will occur.
2003 0 : self.flush_deletion_queue()
2004 0 : .await
2005 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
2006 :
2007 0 : fail::fail_point!("timeline-delete-after-index-delete", |_| {
2008 0 : Err(DeleteTimelineError::Other(anyhow::anyhow!(
2009 0 : "failpoint: timeline-delete-after-index-delete"
2010 0 : )))?
2011 0 : });
2012 :
2013 0 : info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
2014 :
2015 0 : Ok(())
2016 0 : }
2017 :
2018 : /// Pick next tasks from the queue, and start as many of them as possible without violating
2019 : /// the ordering constraints.
2020 : ///
2021 : /// The number of inprogress tasks is limited by `Self::inprogress_tasks`, see `next_ready`.
2022 3797 : fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
2023 5800 : while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() {
2024 2003 : debug!("starting op: {next_op}");
2025 :
2026 : // Prepare upload.
2027 2003 : match &mut next_op {
2028 895 : UploadOp::UploadLayer(layer, meta, mode) => {
2029 895 : if upload_queue
2030 895 : .recently_deleted
2031 895 : .remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
2032 0 : {
2033 0 : *mode = Some(OpType::FlushDeletion);
2034 0 : } else {
2035 895 : *mode = Some(OpType::MayReorder)
2036 : }
2037 : }
2038 778 : UploadOp::UploadMetadata { .. } => {}
2039 209 : UploadOp::Delete(Delete { layers }) => {
2040 418 : for (name, meta) in layers {
2041 209 : upload_queue
2042 209 : .recently_deleted
2043 209 : .insert((name.clone(), meta.generation));
2044 209 : }
2045 : }
2046 121 : UploadOp::Barrier(sender) => {
2047 121 : sender.send_replace(());
2048 121 : continue;
2049 : }
2050 0 : UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
2051 : };
2052 :
2053 : // Assign unique ID to this task
2054 1882 : upload_queue.task_counter += 1;
2055 1882 : let upload_task_id = upload_queue.task_counter;
2056 :
2057 : // Add it to the in-progress map
2058 1882 : let task = Arc::new(UploadTask {
2059 1882 : task_id: upload_task_id,
2060 1882 : op: next_op,
2061 1882 : coalesced_ops,
2062 1882 : retries: AtomicU32::new(0),
2063 1882 : });
2064 1882 : upload_queue
2065 1882 : .inprogress_tasks
2066 1882 : .insert(task.task_id, Arc::clone(&task));
2067 :
2068 : // Spawn task to perform the task
2069 1882 : let self_rc = Arc::clone(self);
2070 1882 : let tenant_shard_id = self.tenant_shard_id;
2071 1882 : let timeline_id = self.timeline_id;
2072 1882 : task_mgr::spawn(
2073 1882 : &self.runtime,
2074 1882 : TaskKind::RemoteUploadTask,
2075 1882 : self.tenant_shard_id,
2076 1882 : Some(self.timeline_id),
2077 1882 : "remote upload",
2078 1871 : async move {
2079 1871 : self_rc.perform_upload_task(task).await;
2080 1858 : Ok(())
2081 1858 : }
2082 1882 : .instrument(info_span!(parent: None, "remote_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, %upload_task_id)),
2083 : );
2084 :
2085 : // Loop back to process next task
2086 : }
2087 3797 : }
2088 :
2089 : ///
2090 : /// Perform an upload task.
2091 : ///
2092 : /// The task is in the `inprogress_tasks` list. This function will try to
2093 : /// execute it, retrying forever. On successful completion, the task is
2094 : /// removed it from the `inprogress_tasks` list, and any next task(s) in the
2095 : /// queue that were waiting by the completion are launched.
2096 : ///
2097 : /// The task can be shut down, however. That leads to stopping the whole
2098 : /// queue.
2099 : ///
2100 1871 : async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
2101 1871 : let cancel = shutdown_token();
2102 : // Loop to retry until it completes.
2103 : loop {
2104 : // If we're requested to shut down, close up shop and exit.
2105 : //
2106 : // Note: We only check for the shutdown requests between retries, so
2107 : // if a shutdown request arrives while we're busy uploading, in the
2108 : // upload::upload:*() call below, we will wait not exit until it has
2109 : // finished. We probably could cancel the upload by simply dropping
2110 : // the Future, but we're not 100% sure if the remote storage library
2111 : // is cancellation safe, so we don't dare to do that. Hopefully, the
2112 : // upload finishes or times out soon enough.
2113 1871 : if cancel.is_cancelled() {
2114 0 : info!("upload task cancelled by shutdown request");
2115 0 : self.stop();
2116 0 : return;
2117 1871 : }
2118 :
2119 : // Assert that we don't modify a layer that's referenced by the current index.
2120 1871 : if cfg!(debug_assertions) {
2121 1871 : let modified = match &task.op {
2122 890 : UploadOp::UploadLayer(layer, layer_metadata, _) => {
2123 890 : vec![(layer.layer_desc().layer_name(), layer_metadata)]
2124 : }
2125 209 : UploadOp::Delete(delete) => {
2126 209 : delete.layers.iter().map(|(n, m)| (n.clone(), m)).collect()
2127 : }
2128 : // These don't modify layers.
2129 772 : UploadOp::UploadMetadata { .. } => Vec::new(),
2130 0 : UploadOp::Barrier(_) => Vec::new(),
2131 0 : UploadOp::Shutdown => Vec::new(),
2132 : };
2133 1871 : if let Ok(queue) = self.upload_queue.lock().unwrap().initialized_mut() {
2134 2966 : for (ref name, metadata) in modified {
2135 1099 : debug_assert!(
2136 1099 : !queue.clean.0.references(name, metadata),
2137 2 : "layer {name} modified while referenced by index",
2138 : );
2139 : }
2140 2 : }
2141 0 : }
2142 :
2143 1869 : let upload_result: anyhow::Result<()> = match &task.op {
2144 890 : UploadOp::UploadLayer(layer, layer_metadata, mode) => {
2145 : // TODO: check if this mechanism can be removed now that can_bypass() performs
2146 : // conflict checks during scheduling.
2147 890 : if let Some(OpType::FlushDeletion) = mode {
2148 0 : if self.config.read().unwrap().block_deletions {
2149 : // Of course, this is not efficient... but usually the queue should be empty.
2150 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
2151 0 : let mut detected = false;
2152 0 : if let Ok(queue) = queue_locked.initialized_mut() {
2153 0 : for list in queue.blocked_deletions.iter_mut() {
2154 0 : list.layers.retain(|(name, meta)| {
2155 0 : if name == &layer.layer_desc().layer_name()
2156 0 : && meta.generation == layer_metadata.generation
2157 : {
2158 0 : detected = true;
2159 : // remove the layer from deletion queue
2160 0 : false
2161 : } else {
2162 : // keep the layer
2163 0 : true
2164 : }
2165 0 : });
2166 : }
2167 0 : }
2168 0 : if detected {
2169 0 : info!(
2170 0 : "cancelled blocked deletion of layer {} at gen {:?}",
2171 0 : layer.layer_desc().layer_name(),
2172 : layer_metadata.generation
2173 : );
2174 0 : }
2175 : } else {
2176 : // TODO: we did not guarantee that upload task starts after deletion task, so there could be possibly race conditions
2177 : // that we still get the layer deleted. But this only happens if someone creates a layer immediately after it's deleted,
2178 : // which is not possible in the current system.
2179 0 : info!(
2180 0 : "waiting for deletion queue flush to complete before uploading layer {} at gen {:?}",
2181 0 : layer.layer_desc().layer_name(),
2182 : layer_metadata.generation
2183 : );
2184 : {
2185 : // We are going to flush, we can clean up the recently deleted list.
2186 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
2187 0 : if let Ok(queue) = queue_locked.initialized_mut() {
2188 0 : queue.recently_deleted.clear();
2189 0 : }
2190 : }
2191 0 : if let Err(e) = self.deletion_queue_client.flush_execute().await {
2192 0 : warn!(
2193 0 : "failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ",
2194 0 : layer.layer_desc().layer_name(),
2195 : layer_metadata.generation
2196 : );
2197 : } else {
2198 0 : info!(
2199 0 : "done flushing deletion queue before uploading layer {} at gen {:?}",
2200 0 : layer.layer_desc().layer_name(),
2201 : layer_metadata.generation
2202 : );
2203 : }
2204 : }
2205 890 : }
2206 890 : let local_path = layer.local_path();
2207 :
2208 : // We should only be uploading layers created by this `Tenant`'s lifetime, so
2209 : // the metadata in the upload should always match our current generation.
2210 890 : assert_eq!(layer_metadata.generation, self.generation);
2211 :
2212 890 : let remote_path = remote_layer_path(
2213 890 : &self.tenant_shard_id.tenant_id,
2214 890 : &self.timeline_id,
2215 890 : layer_metadata.shard,
2216 890 : &layer.layer_desc().layer_name(),
2217 890 : layer_metadata.generation,
2218 : );
2219 :
2220 890 : upload::upload_timeline_layer(
2221 890 : &self.storage_impl,
2222 890 : local_path,
2223 890 : &remote_path,
2224 890 : layer_metadata.file_size,
2225 890 : &self.cancel,
2226 890 : )
2227 890 : .measure_remote_op(
2228 890 : Some(TaskKind::RemoteUploadTask),
2229 890 : RemoteOpFileKind::Layer,
2230 890 : RemoteOpKind::Upload,
2231 890 : Arc::clone(&self.metrics),
2232 890 : )
2233 890 : .await
2234 : }
2235 772 : UploadOp::UploadMetadata { uploaded } => {
2236 772 : let res = upload::upload_index_part(
2237 772 : &self.storage_impl,
2238 772 : &self.tenant_shard_id,
2239 772 : &self.timeline_id,
2240 772 : self.generation,
2241 772 : uploaded,
2242 772 : &self.cancel,
2243 772 : )
2244 772 : .measure_remote_op(
2245 772 : Some(TaskKind::RemoteUploadTask),
2246 772 : RemoteOpFileKind::Index,
2247 772 : RemoteOpKind::Upload,
2248 772 : Arc::clone(&self.metrics),
2249 772 : )
2250 772 : .await;
2251 769 : if res.is_ok() {
2252 769 : self.update_remote_physical_size_gauge(Some(uploaded));
2253 769 : let mention_having_future_layers = if cfg!(feature = "testing") {
2254 769 : uploaded
2255 769 : .layer_metadata
2256 769 : .keys()
2257 8897 : .any(|x| x.is_in_future(uploaded.metadata.disk_consistent_lsn()))
2258 : } else {
2259 0 : false
2260 : };
2261 769 : if mention_having_future_layers {
2262 : // find rationale near crate::tenant::timeline::init::cleanup_future_layer
2263 42 : tracing::info!(
2264 0 : disk_consistent_lsn = %uploaded.metadata.disk_consistent_lsn(),
2265 0 : "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup"
2266 : );
2267 727 : }
2268 0 : }
2269 769 : res
2270 : }
2271 : // TODO: this should wait for the deletion to be executed by the deletion queue.
2272 : // Otherwise, the deletion may race with an upload and wrongfully delete a newer
2273 : // file. Some of the above logic attempts to work around this, it should be replaced
2274 : // by the upload queue ordering guarantees (see `can_bypass`). See:
2275 : // <https://github.com/neondatabase/neon/issues/10283>.
2276 207 : UploadOp::Delete(delete) => {
2277 207 : if self.config.read().unwrap().block_deletions {
2278 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
2279 0 : if let Ok(queue) = queue_locked.initialized_mut() {
2280 0 : queue.blocked_deletions.push(delete.clone());
2281 0 : }
2282 0 : Ok(())
2283 : } else {
2284 207 : pausable_failpoint!("before-delete-layer-pausable");
2285 207 : self.deletion_queue_client
2286 207 : .push_layers(
2287 207 : self.tenant_shard_id,
2288 207 : self.timeline_id,
2289 207 : self.generation,
2290 207 : delete.layers.clone(),
2291 : )
2292 207 : .map_err(|e| anyhow::anyhow!(e))
2293 : }
2294 : }
2295 0 : unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
2296 : // unreachable. Barrier operations are handled synchronously in
2297 : // launch_queued_tasks
2298 0 : warn!("unexpected {unexpected:?} operation in perform_upload_task");
2299 0 : break;
2300 : }
2301 : };
2302 :
2303 0 : match upload_result {
2304 : Ok(()) => {
2305 1856 : break;
2306 : }
2307 0 : Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => {
2308 : // loop around to do the proper stopping
2309 0 : continue;
2310 : }
2311 0 : Err(e) => {
2312 0 : let retries = task.retries.fetch_add(1, Ordering::SeqCst);
2313 :
2314 : // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
2315 : // or other external reasons. Such issues are relatively regular, so log them
2316 : // at info level at first, and only WARN if the operation fails repeatedly.
2317 : //
2318 : // (See similar logic for downloads in `download::download_retry`)
2319 0 : if retries < FAILED_UPLOAD_WARN_THRESHOLD {
2320 0 : info!(
2321 0 : "failed to perform remote task {}, will retry (attempt {}): {:#}",
2322 0 : task.op, retries, e
2323 : );
2324 : } else {
2325 0 : warn!(
2326 0 : "failed to perform remote task {}, will retry (attempt {}): {:?}",
2327 0 : task.op, retries, e
2328 : );
2329 : }
2330 :
2331 : // sleep until it's time to retry, or we're cancelled
2332 0 : exponential_backoff(
2333 0 : retries,
2334 0 : DEFAULT_BASE_BACKOFF_SECONDS,
2335 0 : DEFAULT_MAX_BACKOFF_SECONDS,
2336 0 : &cancel,
2337 0 : )
2338 0 : .await;
2339 : }
2340 : }
2341 : }
2342 :
2343 1856 : let retries = task.retries.load(Ordering::SeqCst);
2344 1856 : if retries > 0 {
2345 0 : info!(
2346 0 : "remote task {} completed successfully after {} retries",
2347 0 : task.op, retries
2348 : );
2349 : } else {
2350 1856 : debug!("remote task {} completed successfully", task.op);
2351 : }
2352 :
2353 : // The task has completed successfully. Remove it from the in-progress list.
2354 1856 : let lsn_update = {
2355 1856 : let mut upload_queue_guard = self.upload_queue.lock().unwrap();
2356 1856 : let upload_queue = match upload_queue_guard.deref_mut() {
2357 0 : UploadQueue::Uninitialized => panic!(
2358 0 : "callers are responsible for ensuring this is only called on an initialized queue"
2359 : ),
2360 0 : UploadQueue::Stopped(_stopped) => None,
2361 1856 : UploadQueue::Initialized(qi) => Some(qi),
2362 : };
2363 :
2364 1856 : let upload_queue = match upload_queue {
2365 1856 : Some(upload_queue) => upload_queue,
2366 : None => {
2367 0 : info!("another concurrent task already stopped the queue");
2368 0 : return;
2369 : }
2370 : };
2371 :
2372 1856 : upload_queue.inprogress_tasks.remove(&task.task_id);
2373 :
2374 1856 : let lsn_update = match task.op {
2375 880 : UploadOp::UploadLayer(_, _, _) => None,
2376 769 : UploadOp::UploadMetadata { ref uploaded } => {
2377 : // the task id is reused as a monotonicity check for storing the "clean"
2378 : // IndexPart.
2379 769 : let last_updater = upload_queue.clean.1;
2380 769 : let is_later = last_updater.is_some_and(|task_id| task_id < task.task_id);
2381 769 : let monotone = is_later || last_updater.is_none();
2382 :
2383 769 : assert!(
2384 769 : monotone,
2385 0 : "no two index uploads should be completing at the same time, prev={last_updater:?}, task.task_id={}",
2386 0 : task.task_id
2387 : );
2388 :
2389 : // not taking ownership is wasteful
2390 769 : upload_queue.clean.0.clone_from(uploaded);
2391 769 : upload_queue.clean.1 = Some(task.task_id);
2392 :
2393 769 : let lsn = upload_queue.clean.0.metadata.disk_consistent_lsn();
2394 769 : self.metrics
2395 769 : .projected_remote_consistent_lsn_gauge
2396 769 : .set(lsn.0);
2397 :
2398 769 : if self.generation.is_none() {
2399 : // Legacy mode: skip validating generation
2400 0 : upload_queue.visible_remote_consistent_lsn.store(lsn);
2401 0 : None
2402 769 : } else if self
2403 769 : .config
2404 769 : .read()
2405 769 : .unwrap()
2406 769 : .process_remote_consistent_lsn_updates
2407 : {
2408 769 : Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
2409 : } else {
2410 : // Our config disables remote_consistent_lsn updates: drop it.
2411 0 : None
2412 : }
2413 : }
2414 207 : UploadOp::Delete(_) => None,
2415 0 : UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
2416 : };
2417 :
2418 : // Launch any queued tasks that were unblocked by this one.
2419 1856 : self.launch_queued_tasks(upload_queue);
2420 1856 : lsn_update
2421 : };
2422 :
2423 1856 : if let Some((lsn, slot)) = lsn_update {
2424 : // Updates to the remote_consistent_lsn we advertise to pageservers
2425 : // are all routed through the DeletionQueue, to enforce important
2426 : // data safety guarantees (see docs/rfcs/025-generation-numbers.md)
2427 769 : self.deletion_queue_client
2428 769 : .update_remote_consistent_lsn(
2429 769 : self.tenant_shard_id,
2430 769 : self.timeline_id,
2431 769 : self.generation,
2432 769 : lsn,
2433 769 : slot,
2434 769 : )
2435 769 : .await;
2436 1087 : }
2437 :
2438 1856 : self.metric_end(&task.op);
2439 1856 : for coalesced_op in &task.coalesced_ops {
2440 3 : self.metric_end(coalesced_op);
2441 3 : }
2442 1856 : }
2443 :
2444 3815 : fn metric_impl(
2445 3815 : &self,
2446 3815 : op: &UploadOp,
2447 3815 : ) -> Option<(
2448 3815 : RemoteOpFileKind,
2449 3815 : RemoteOpKind,
2450 3815 : RemoteTimelineClientMetricsCallTrackSize,
2451 3815 : )> {
2452 : use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
2453 3815 : let res = match op {
2454 1775 : UploadOp::UploadLayer(_, m, _) => (
2455 1775 : RemoteOpFileKind::Layer,
2456 1775 : RemoteOpKind::Upload,
2457 1775 : RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
2458 1775 : ),
2459 1569 : UploadOp::UploadMetadata { .. } => (
2460 1569 : RemoteOpFileKind::Index,
2461 1569 : RemoteOpKind::Upload,
2462 1569 : DontTrackSize {
2463 1569 : reason: "metadata uploads are tiny",
2464 1569 : },
2465 1569 : ),
2466 467 : UploadOp::Delete(_delete) => (
2467 467 : RemoteOpFileKind::Layer,
2468 467 : RemoteOpKind::Delete,
2469 467 : DontTrackSize {
2470 467 : reason: "should we track deletes? positive or negative sign?",
2471 467 : },
2472 467 : ),
2473 : UploadOp::Barrier(..) | UploadOp::Shutdown => {
2474 : // we do not account these
2475 4 : return None;
2476 : }
2477 : };
2478 3811 : Some(res)
2479 3815 : }
2480 :
2481 1952 : fn metric_begin(&self, op: &UploadOp) {
2482 1952 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
2483 1952 : Some(x) => x,
2484 0 : None => return,
2485 : };
2486 1952 : let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
2487 1952 : guard.will_decrement_manually(); // in metric_end(), see right below
2488 1952 : }
2489 :
2490 1863 : fn metric_end(&self, op: &UploadOp) {
2491 1863 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
2492 1859 : Some(x) => x,
2493 4 : None => return,
2494 : };
2495 1859 : self.metrics.call_end(&file_kind, &op_kind, track_bytes);
2496 1863 : }
2497 :
2498 : /// Close the upload queue for new operations and cancel queued operations.
2499 : ///
2500 : /// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
2501 : ///
2502 : /// In-progress operations will still be running after this function returns.
2503 : /// Use `task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), Some(self.tenant_shard_id), Some(timeline_id))`
2504 : /// to wait for them to complete, after calling this function.
2505 9 : pub(crate) fn stop(&self) {
2506 : // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
2507 : // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
2508 : // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
2509 9 : let mut guard = self.upload_queue.lock().unwrap();
2510 9 : self.stop_impl(&mut guard);
2511 9 : }
2512 :
2513 9 : fn stop_impl(&self, guard: &mut std::sync::MutexGuard<UploadQueue>) {
2514 9 : match &mut **guard {
2515 : UploadQueue::Uninitialized => {
2516 0 : info!("UploadQueue is in state Uninitialized, nothing to do");
2517 0 : **guard = UploadQueue::Stopped(UploadQueueStopped::Uninitialized);
2518 : }
2519 : UploadQueue::Stopped(_) => {
2520 : // nothing to do
2521 4 : info!("another concurrent task already shut down the queue");
2522 : }
2523 5 : UploadQueue::Initialized(initialized) => {
2524 5 : info!("shutting down upload queue");
2525 :
2526 : // Replace the queue with the Stopped state, taking ownership of the old
2527 : // Initialized queue. We will do some checks on it, and then drop it.
2528 5 : let qi = {
2529 : // Here we preserve working version of the upload queue for possible use during deletions.
2530 : // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
2531 : // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
2532 : // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
2533 5 : let upload_queue_for_deletion = UploadQueueInitialized {
2534 5 : inprogress_limit: initialized.inprogress_limit,
2535 5 : task_counter: 0,
2536 5 : dirty: initialized.dirty.clone(),
2537 5 : clean: initialized.clean.clone(),
2538 5 : latest_files_changes_since_metadata_upload_scheduled: 0,
2539 5 : visible_remote_consistent_lsn: initialized
2540 5 : .visible_remote_consistent_lsn
2541 5 : .clone(),
2542 5 : inprogress_tasks: HashMap::default(),
2543 5 : queued_operations: VecDeque::default(),
2544 5 : #[cfg(feature = "testing")]
2545 5 : dangling_files: HashMap::default(),
2546 5 : blocked_deletions: Vec::new(),
2547 5 : shutting_down: false,
2548 5 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
2549 5 : recently_deleted: HashSet::new(),
2550 5 : };
2551 :
2552 5 : let upload_queue = std::mem::replace(
2553 5 : &mut **guard,
2554 5 : UploadQueue::Stopped(UploadQueueStopped::Deletable(
2555 5 : UploadQueueStoppedDeletable {
2556 5 : upload_queue_for_deletion,
2557 5 : deleted_at: SetDeletedFlagProgress::NotRunning,
2558 5 : },
2559 5 : )),
2560 : );
2561 5 : if let UploadQueue::Initialized(qi) = upload_queue {
2562 5 : qi
2563 : } else {
2564 0 : unreachable!("we checked in the match above that it is Initialized");
2565 : }
2566 : };
2567 :
2568 : // We don't need to do anything here for in-progress tasks. They will finish
2569 : // on their own, decrement the unfinished-task counter themselves, and observe
2570 : // that the queue is Stopped.
2571 5 : drop(qi.inprogress_tasks);
2572 :
2573 : // Tear down queued ops
2574 5 : for op in qi.queued_operations.into_iter() {
2575 4 : self.metric_end(&op);
2576 4 : // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
2577 4 : // which is exactly what we want to happen.
2578 4 : drop(op);
2579 4 : }
2580 : }
2581 : }
2582 9 : }
2583 :
2584 : /// Returns an accessor which will hold the UploadQueue mutex for accessing the upload queue
2585 : /// externally to RemoteTimelineClient.
2586 0 : pub(crate) fn initialized_upload_queue(
2587 0 : &self,
2588 0 : ) -> Result<UploadQueueAccessor<'_>, NotInitialized> {
2589 0 : let mut inner = self.upload_queue.lock().unwrap();
2590 0 : inner.initialized_mut()?;
2591 0 : Ok(UploadQueueAccessor { inner })
2592 0 : }
2593 :
2594 4 : pub(crate) fn no_pending_work(&self) -> bool {
2595 4 : let inner = self.upload_queue.lock().unwrap();
2596 4 : match &*inner {
2597 : UploadQueue::Uninitialized
2598 0 : | UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => true,
2599 4 : UploadQueue::Stopped(UploadQueueStopped::Deletable(x)) => {
2600 4 : x.upload_queue_for_deletion.no_pending_work()
2601 : }
2602 0 : UploadQueue::Initialized(x) => x.no_pending_work(),
2603 : }
2604 4 : }
2605 :
2606 : /// 'foreign' in the sense that it does not belong to this tenant shard. This method
2607 : /// is used during GC for other shards to get the index of shard zero.
2608 0 : pub(crate) async fn download_foreign_index(
2609 0 : &self,
2610 0 : shard_number: ShardNumber,
2611 0 : cancel: &CancellationToken,
2612 0 : ) -> Result<(IndexPart, Generation, std::time::SystemTime), DownloadError> {
2613 0 : let foreign_shard_id = TenantShardId {
2614 0 : shard_number,
2615 0 : shard_count: self.tenant_shard_id.shard_count,
2616 0 : tenant_id: self.tenant_shard_id.tenant_id,
2617 0 : };
2618 0 : download_index_part(
2619 0 : &self.storage_impl,
2620 0 : &foreign_shard_id,
2621 0 : &self.timeline_id,
2622 0 : Generation::MAX,
2623 0 : cancel,
2624 0 : )
2625 0 : .await
2626 0 : }
2627 : }
2628 :
2629 : pub(crate) struct UploadQueueAccessor<'a> {
2630 : inner: std::sync::MutexGuard<'a, UploadQueue>,
2631 : }
2632 :
2633 : impl UploadQueueAccessor<'_> {
2634 0 : pub(crate) fn latest_uploaded_index_part(&self) -> &IndexPart {
2635 0 : match &*self.inner {
2636 0 : UploadQueue::Initialized(x) => &x.clean.0,
2637 : UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
2638 0 : unreachable!("checked before constructing")
2639 : }
2640 : }
2641 0 : }
2642 : }
2643 :
2644 0 : pub fn remote_tenant_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2645 0 : let path = format!("tenants/{tenant_shard_id}");
2646 0 : RemotePath::from_string(&path).expect("Failed to construct path")
2647 0 : }
2648 :
2649 468 : pub fn remote_tenant_manifest_path(
2650 468 : tenant_shard_id: &TenantShardId,
2651 468 : generation: Generation,
2652 468 : ) -> RemotePath {
2653 468 : let path = format!(
2654 468 : "tenants/{tenant_shard_id}/tenant-manifest{}.json",
2655 468 : generation.get_suffix()
2656 : );
2657 468 : RemotePath::from_string(&path).expect("Failed to construct path")
2658 468 : }
2659 :
2660 : /// Prefix to all generations' manifest objects in a tenant shard
2661 119 : pub fn remote_tenant_manifest_prefix(tenant_shard_id: &TenantShardId) -> RemotePath {
2662 119 : let path = format!("tenants/{tenant_shard_id}/tenant-manifest",);
2663 119 : RemotePath::from_string(&path).expect("Failed to construct path")
2664 119 : }
2665 :
2666 134 : pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2667 134 : let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
2668 134 : RemotePath::from_string(&path).expect("Failed to construct path")
2669 134 : }
2670 :
2671 0 : fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath {
2672 0 : let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
2673 0 : RemotePath::from_string(&path).expect("Failed to construct path")
2674 0 : }
2675 :
2676 15 : pub fn remote_timeline_path(
2677 15 : tenant_shard_id: &TenantShardId,
2678 15 : timeline_id: &TimelineId,
2679 15 : ) -> RemotePath {
2680 15 : remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
2681 15 : }
2682 :
2683 : /// Obtains the path of the given Layer in the remote
2684 : ///
2685 : /// Note that the shard component of a remote layer path is _not_ always the same
2686 : /// as in the TenantShardId of the caller: tenants may reference layers from a different
2687 : /// ShardIndex. Use the ShardIndex from the layer's metadata.
2688 1092 : pub fn remote_layer_path(
2689 1092 : tenant_id: &TenantId,
2690 1092 : timeline_id: &TimelineId,
2691 1092 : shard: ShardIndex,
2692 1092 : layer_file_name: &LayerName,
2693 1092 : generation: Generation,
2694 1092 : ) -> RemotePath {
2695 : // Generation-aware key format
2696 1092 : let path = format!(
2697 1092 : "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
2698 1092 : shard.get_suffix(),
2699 : layer_file_name,
2700 1092 : generation.get_suffix()
2701 : );
2702 :
2703 1092 : RemotePath::from_string(&path).expect("Failed to construct path")
2704 1092 : }
2705 :
2706 : /// Returns true if a and b have the same layer path within a tenant/timeline. This is essentially
2707 : /// remote_layer_path(a) == remote_layer_path(b) without the string allocations.
2708 : ///
2709 : /// TODO: there should be a variant of LayerName for the physical path that contains information
2710 : /// about the shard and generation, such that this could be replaced by a simple comparison.
2711 892600 : pub fn is_same_remote_layer_path(
2712 892600 : aname: &LayerName,
2713 892600 : ameta: &LayerFileMetadata,
2714 892600 : bname: &LayerName,
2715 892600 : bmeta: &LayerFileMetadata,
2716 892600 : ) -> bool {
2717 : // NB: don't assert remote_layer_path(a) == remote_layer_path(b); too expensive even for debug.
2718 892600 : aname == bname && ameta.shard == bmeta.shard && ameta.generation == bmeta.generation
2719 892600 : }
2720 :
2721 2 : pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
2722 2 : RemotePath::from_string(&format!(
2723 2 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
2724 2 : ))
2725 2 : .expect("Failed to construct path")
2726 2 : }
2727 :
2728 1 : pub fn remote_initdb_preserved_archive_path(
2729 1 : tenant_id: &TenantId,
2730 1 : timeline_id: &TimelineId,
2731 1 : ) -> RemotePath {
2732 1 : RemotePath::from_string(&format!(
2733 1 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PRESERVED_PATH}"
2734 1 : ))
2735 1 : .expect("Failed to construct path")
2736 1 : }
2737 :
2738 805 : pub fn remote_index_path(
2739 805 : tenant_shard_id: &TenantShardId,
2740 805 : timeline_id: &TimelineId,
2741 805 : generation: Generation,
2742 805 : ) -> RemotePath {
2743 805 : RemotePath::from_string(&format!(
2744 805 : "tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
2745 805 : IndexPart::FILE_NAME,
2746 805 : generation.get_suffix()
2747 805 : ))
2748 805 : .expect("Failed to construct path")
2749 805 : }
2750 :
2751 0 : pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2752 0 : RemotePath::from_string(&format!(
2753 0 : "tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
2754 0 : ))
2755 0 : .expect("Failed to construct path")
2756 0 : }
2757 :
2758 : /// Given the key of an index, parse out the generation part of the name
2759 9 : pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
2760 9 : let file_name = match path.get_path().file_name() {
2761 9 : Some(f) => f,
2762 : None => {
2763 : // Unexpected: we should be seeing index_part.json paths only
2764 0 : tracing::warn!("Malformed index key {}", path);
2765 0 : return None;
2766 : }
2767 : };
2768 :
2769 9 : match file_name.split_once('-') {
2770 6 : Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
2771 3 : None => None,
2772 : }
2773 9 : }
2774 :
2775 : /// Given the key of a tenant manifest, parse out the generation number
2776 0 : pub fn parse_remote_tenant_manifest_path(path: RemotePath) -> Option<Generation> {
2777 : static RE: OnceLock<Regex> = OnceLock::new();
2778 0 : let re = RE.get_or_init(|| Regex::new(r".*tenant-manifest-([0-9a-f]{8}).json").unwrap());
2779 0 : re.captures(path.get_path().as_str())
2780 0 : .and_then(|c| c.get(1))
2781 0 : .and_then(|m| Generation::parse_suffix(m.as_str()))
2782 0 : }
2783 :
2784 : #[cfg(test)]
2785 : mod tests {
2786 : use std::collections::HashSet;
2787 :
2788 : use super::*;
2789 : use crate::DEFAULT_PG_VERSION;
2790 : use crate::context::RequestContext;
2791 : use crate::tenant::config::AttachmentMode;
2792 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
2793 : use crate::tenant::storage_layer::layer::local_layer_path;
2794 : use crate::tenant::{TenantShard, Timeline};
2795 :
2796 4 : pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
2797 4 : format!("contents for {name}").into()
2798 4 : }
2799 :
2800 1 : pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
2801 1 : let metadata = TimelineMetadata::new(
2802 1 : disk_consistent_lsn,
2803 1 : None,
2804 1 : None,
2805 1 : Lsn(0),
2806 1 : Lsn(0),
2807 1 : Lsn(0),
2808 : // Any version will do
2809 : // but it should be consistent with the one in the tests
2810 : crate::DEFAULT_PG_VERSION,
2811 : );
2812 :
2813 : // go through serialize + deserialize to fix the header, including checksum
2814 1 : TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
2815 1 : }
2816 :
2817 1 : fn assert_file_list(a: &HashSet<LayerName>, b: &[&str]) {
2818 3 : let mut avec: Vec<String> = a.iter().map(|x| x.to_string()).collect();
2819 1 : avec.sort();
2820 :
2821 1 : let mut bvec = b.to_vec();
2822 1 : bvec.sort_unstable();
2823 :
2824 1 : assert_eq!(avec, bvec);
2825 1 : }
2826 :
2827 2 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
2828 2 : let mut expected: Vec<String> = expected
2829 2 : .iter()
2830 8 : .map(|x| format!("{}{}", x, generation.get_suffix()))
2831 2 : .collect();
2832 2 : expected.sort();
2833 :
2834 2 : let mut found: Vec<String> = Vec::new();
2835 8 : for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
2836 8 : let entry_name = entry.file_name();
2837 8 : let fname = entry_name.to_str().unwrap();
2838 8 : found.push(String::from(fname));
2839 8 : }
2840 2 : found.sort();
2841 :
2842 2 : assert_eq!(found, expected);
2843 2 : }
2844 :
2845 : struct TestSetup {
2846 : harness: TenantHarness,
2847 : tenant: Arc<TenantShard>,
2848 : timeline: Arc<Timeline>,
2849 : tenant_ctx: RequestContext,
2850 : }
2851 :
2852 : impl TestSetup {
2853 4 : async fn new(test_name: &str) -> anyhow::Result<Self> {
2854 4 : let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
2855 4 : let harness = TenantHarness::create(test_name).await?;
2856 4 : let (tenant, ctx) = harness.load().await;
2857 :
2858 4 : let timeline = tenant
2859 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2860 4 : .await?;
2861 :
2862 4 : Ok(Self {
2863 4 : harness,
2864 4 : tenant,
2865 4 : timeline,
2866 4 : tenant_ctx: ctx,
2867 4 : })
2868 4 : }
2869 :
2870 : /// Construct a RemoteTimelineClient in an arbitrary generation
2871 5 : fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
2872 5 : let location_conf = AttachedLocationConfig {
2873 5 : generation,
2874 5 : attach_mode: AttachmentMode::Single,
2875 5 : };
2876 5 : Arc::new(RemoteTimelineClient {
2877 5 : conf: self.harness.conf,
2878 5 : runtime: tokio::runtime::Handle::current(),
2879 5 : tenant_shard_id: self.harness.tenant_shard_id,
2880 5 : timeline_id: TIMELINE_ID,
2881 5 : generation,
2882 5 : storage_impl: self.harness.remote_storage.clone(),
2883 5 : deletion_queue_client: self.harness.deletion_queue.new_client(),
2884 5 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
2885 5 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
2886 5 : &self.harness.tenant_shard_id,
2887 5 : &TIMELINE_ID,
2888 5 : )),
2889 5 : config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
2890 5 : cancel: CancellationToken::new(),
2891 5 : })
2892 5 : }
2893 :
2894 : /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
2895 : /// and timeline_id are present.
2896 3 : fn span(&self) -> tracing::Span {
2897 3 : tracing::info_span!(
2898 : "test",
2899 : tenant_id = %self.harness.tenant_shard_id.tenant_id,
2900 0 : shard_id = %self.harness.tenant_shard_id.shard_slug(),
2901 : timeline_id = %TIMELINE_ID
2902 : )
2903 3 : }
2904 : }
2905 :
2906 : // Test scheduling
2907 : #[tokio::test]
2908 1 : async fn upload_scheduling() {
2909 : // Test outline:
2910 : //
2911 : // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
2912 : // Schedule upload of index. Check that it is queued
2913 : // let the layer file uploads finish. Check that the index-upload is now started
2914 : // let the index-upload finish.
2915 : //
2916 : // Download back the index.json. Check that the list of files is correct
2917 : //
2918 : // Schedule upload. Schedule deletion. Check that the deletion is queued
2919 : // let upload finish. Check that deletion is now started
2920 : // Schedule another deletion. Check that it's launched immediately.
2921 : // Schedule index upload. Check that it's queued
2922 :
2923 1 : let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
2924 1 : let span = test_setup.span();
2925 1 : let _guard = span.enter();
2926 :
2927 : let TestSetup {
2928 1 : harness,
2929 1 : tenant: _tenant,
2930 1 : timeline,
2931 1 : tenant_ctx: _tenant_ctx,
2932 1 : } = test_setup;
2933 :
2934 1 : let client = &timeline.remote_client;
2935 :
2936 : // Download back the index.json, and check that the list of files is correct
2937 1 : let initial_index_part = match client
2938 1 : .download_index_file(&CancellationToken::new())
2939 1 : .await
2940 1 : .unwrap()
2941 : {
2942 1 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2943 0 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2944 : };
2945 1 : let initial_layers = initial_index_part
2946 1 : .layer_metadata
2947 1 : .keys()
2948 1 : .map(|f| f.to_owned())
2949 1 : .collect::<HashSet<LayerName>>();
2950 1 : let initial_layer = {
2951 1 : assert!(initial_layers.len() == 1);
2952 1 : initial_layers.into_iter().next().unwrap()
2953 : };
2954 :
2955 1 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
2956 :
2957 1 : println!("workdir: {}", harness.conf.workdir);
2958 :
2959 1 : let remote_timeline_dir = harness
2960 1 : .remote_fs_dir
2961 1 : .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
2962 1 : println!("remote_timeline_dir: {remote_timeline_dir}");
2963 :
2964 1 : let generation = harness.generation;
2965 1 : let shard = harness.shard;
2966 :
2967 : // Create a couple of dummy files, schedule upload for them
2968 :
2969 1 : let layers = [
2970 1 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
2971 1 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
2972 1 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
2973 1 : ]
2974 1 : .into_iter()
2975 3 : .map(|(name, contents): (LayerName, Vec<u8>)| {
2976 :
2977 3 : let local_path = local_layer_path(
2978 3 : harness.conf,
2979 3 : &timeline.tenant_shard_id,
2980 3 : &timeline.timeline_id,
2981 3 : &name,
2982 3 : &generation,
2983 : );
2984 3 : std::fs::write(&local_path, &contents).unwrap();
2985 :
2986 3 : Layer::for_resident(
2987 3 : harness.conf,
2988 3 : &timeline,
2989 3 : local_path,
2990 3 : name,
2991 3 : LayerFileMetadata::new(contents.len() as u64, generation, shard),
2992 : )
2993 3 : }).collect::<Vec<_>>();
2994 :
2995 1 : client
2996 1 : .schedule_layer_file_upload(layers[0].clone())
2997 1 : .unwrap();
2998 1 : client
2999 1 : .schedule_layer_file_upload(layers[1].clone())
3000 1 : .unwrap();
3001 :
3002 : // Check that they are started immediately, not queued
3003 : //
3004 : // this works because we running within block_on, so any futures are now queued up until
3005 : // our next await point.
3006 : {
3007 1 : let mut guard = client.upload_queue.lock().unwrap();
3008 1 : let upload_queue = guard.initialized_mut().unwrap();
3009 1 : assert!(upload_queue.queued_operations.is_empty());
3010 1 : assert_eq!(upload_queue.inprogress_tasks.len(), 2);
3011 1 : assert_eq!(upload_queue.num_inprogress_layer_uploads(), 2);
3012 :
3013 : // also check that `latest_file_changes` was updated
3014 1 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
3015 : }
3016 :
3017 : // Schedule upload of index. Check that it is queued
3018 1 : let metadata = dummy_metadata(Lsn(0x20));
3019 1 : client
3020 1 : .schedule_index_upload_for_full_metadata_update(&metadata)
3021 1 : .unwrap();
3022 : {
3023 1 : let mut guard = client.upload_queue.lock().unwrap();
3024 1 : let upload_queue = guard.initialized_mut().unwrap();
3025 1 : assert!(upload_queue.queued_operations.len() == 1);
3026 1 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
3027 : }
3028 :
3029 : // Wait for the uploads to finish
3030 1 : client.wait_completion().await.unwrap();
3031 : {
3032 1 : let mut guard = client.upload_queue.lock().unwrap();
3033 1 : let upload_queue = guard.initialized_mut().unwrap();
3034 :
3035 1 : assert!(upload_queue.queued_operations.is_empty());
3036 1 : assert!(upload_queue.inprogress_tasks.is_empty());
3037 : }
3038 :
3039 : // Download back the index.json, and check that the list of files is correct
3040 1 : let index_part = match client
3041 1 : .download_index_file(&CancellationToken::new())
3042 1 : .await
3043 1 : .unwrap()
3044 : {
3045 1 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
3046 0 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
3047 : };
3048 :
3049 1 : assert_file_list(
3050 1 : &index_part
3051 1 : .layer_metadata
3052 1 : .keys()
3053 3 : .map(|f| f.to_owned())
3054 1 : .collect(),
3055 1 : &[
3056 1 : &initial_layer.to_string(),
3057 1 : &layers[0].layer_desc().layer_name().to_string(),
3058 1 : &layers[1].layer_desc().layer_name().to_string(),
3059 1 : ],
3060 : );
3061 1 : assert_eq!(index_part.metadata, metadata);
3062 :
3063 : // Schedule upload and then a deletion. Check that the deletion is queued
3064 1 : client
3065 1 : .schedule_layer_file_upload(layers[2].clone())
3066 1 : .unwrap();
3067 :
3068 : // this is no longer consistent with how deletion works with Layer::drop, but in this test
3069 : // keep using schedule_layer_file_deletion because we don't have a way to wait for the
3070 : // spawn_blocking started by the drop.
3071 1 : client
3072 1 : .schedule_layer_file_deletion(&[layers[0].layer_desc().layer_name()])
3073 1 : .unwrap();
3074 : {
3075 1 : let mut guard = client.upload_queue.lock().unwrap();
3076 1 : let upload_queue = guard.initialized_mut().unwrap();
3077 :
3078 : // Deletion schedules upload of the index file, and the file deletion itself
3079 1 : assert_eq!(upload_queue.queued_operations.len(), 2);
3080 1 : assert_eq!(upload_queue.inprogress_tasks.len(), 1);
3081 1 : assert_eq!(upload_queue.num_inprogress_layer_uploads(), 1);
3082 1 : assert_eq!(upload_queue.num_inprogress_deletions(), 0);
3083 1 : assert_eq!(
3084 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
3085 : 0
3086 : );
3087 : }
3088 1 : assert_remote_files(
3089 1 : &[
3090 1 : &initial_layer.to_string(),
3091 1 : &layers[0].layer_desc().layer_name().to_string(),
3092 1 : &layers[1].layer_desc().layer_name().to_string(),
3093 1 : "index_part.json",
3094 1 : ],
3095 1 : &remote_timeline_dir,
3096 1 : generation,
3097 : );
3098 :
3099 : // Finish them
3100 1 : client.wait_completion().await.unwrap();
3101 1 : harness.deletion_queue.pump().await;
3102 :
3103 1 : assert_remote_files(
3104 1 : &[
3105 1 : &initial_layer.to_string(),
3106 1 : &layers[1].layer_desc().layer_name().to_string(),
3107 1 : &layers[2].layer_desc().layer_name().to_string(),
3108 1 : "index_part.json",
3109 1 : ],
3110 1 : &remote_timeline_dir,
3111 1 : generation,
3112 1 : );
3113 1 : }
3114 :
3115 : #[tokio::test]
3116 1 : async fn bytes_unfinished_gauge_for_layer_file_uploads() {
3117 : // Setup
3118 :
3119 : let TestSetup {
3120 1 : harness,
3121 1 : tenant: _tenant,
3122 1 : timeline,
3123 : ..
3124 1 : } = TestSetup::new("metrics").await.unwrap();
3125 1 : let client = &timeline.remote_client;
3126 :
3127 1 : let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
3128 1 : let local_path = local_layer_path(
3129 1 : harness.conf,
3130 1 : &timeline.tenant_shard_id,
3131 1 : &timeline.timeline_id,
3132 1 : &layer_file_name_1,
3133 1 : &harness.generation,
3134 : );
3135 1 : let content_1 = dummy_contents("foo");
3136 1 : std::fs::write(&local_path, &content_1).unwrap();
3137 :
3138 1 : let layer_file_1 = Layer::for_resident(
3139 1 : harness.conf,
3140 1 : &timeline,
3141 1 : local_path,
3142 1 : layer_file_name_1.clone(),
3143 1 : LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
3144 : );
3145 :
3146 : #[derive(Debug, PartialEq, Clone, Copy)]
3147 : struct BytesStartedFinished {
3148 : started: Option<usize>,
3149 : finished: Option<usize>,
3150 : }
3151 : impl std::ops::Add for BytesStartedFinished {
3152 : type Output = Self;
3153 2 : fn add(self, rhs: Self) -> Self::Output {
3154 : Self {
3155 2 : started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
3156 2 : finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
3157 : }
3158 2 : }
3159 : }
3160 3 : let get_bytes_started_stopped = || {
3161 3 : let started = client
3162 3 : .metrics
3163 3 : .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
3164 3 : .map(|v| v.try_into().unwrap());
3165 3 : let stopped = client
3166 3 : .metrics
3167 3 : .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
3168 3 : .map(|v| v.try_into().unwrap());
3169 3 : BytesStartedFinished {
3170 3 : started,
3171 3 : finished: stopped,
3172 3 : }
3173 3 : };
3174 :
3175 : // Test
3176 1 : tracing::info!("now doing actual test");
3177 :
3178 1 : let actual_a = get_bytes_started_stopped();
3179 :
3180 1 : client
3181 1 : .schedule_layer_file_upload(layer_file_1.clone())
3182 1 : .unwrap();
3183 :
3184 1 : let actual_b = get_bytes_started_stopped();
3185 :
3186 1 : client.wait_completion().await.unwrap();
3187 :
3188 1 : let actual_c = get_bytes_started_stopped();
3189 :
3190 : // Validate
3191 :
3192 1 : let expected_b = actual_a
3193 1 : + BytesStartedFinished {
3194 1 : started: Some(content_1.len()),
3195 1 : // assert that the _finished metric is created eagerly so that subtractions work on first sample
3196 1 : finished: Some(0),
3197 1 : };
3198 1 : assert_eq!(actual_b, expected_b);
3199 :
3200 1 : let expected_c = actual_a
3201 1 : + BytesStartedFinished {
3202 1 : started: Some(content_1.len()),
3203 1 : finished: Some(content_1.len()),
3204 1 : };
3205 1 : assert_eq!(actual_c, expected_c);
3206 1 : }
3207 :
3208 6 : async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
3209 : // An empty IndexPart, just sufficient to ensure deserialization will succeed
3210 6 : let example_index_part = IndexPart::example();
3211 :
3212 6 : let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
3213 :
3214 6 : let index_path = test_state.harness.remote_fs_dir.join(
3215 6 : remote_index_path(
3216 6 : &test_state.harness.tenant_shard_id,
3217 6 : &TIMELINE_ID,
3218 6 : generation,
3219 6 : )
3220 6 : .get_path(),
3221 6 : );
3222 :
3223 6 : std::fs::create_dir_all(index_path.parent().unwrap())
3224 6 : .expect("creating test dir should work");
3225 :
3226 6 : eprintln!("Writing {index_path}");
3227 6 : std::fs::write(&index_path, index_part_bytes).unwrap();
3228 6 : example_index_part
3229 6 : }
3230 :
3231 : /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
3232 : /// index, the IndexPart returned is equal to `expected`
3233 5 : async fn assert_got_index_part(
3234 5 : test_state: &TestSetup,
3235 5 : get_generation: Generation,
3236 5 : expected: &IndexPart,
3237 5 : ) {
3238 5 : let client = test_state.build_client(get_generation);
3239 :
3240 5 : let download_r = client
3241 5 : .download_index_file(&CancellationToken::new())
3242 5 : .await
3243 5 : .expect("download should always succeed");
3244 5 : assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
3245 5 : match download_r {
3246 5 : MaybeDeletedIndexPart::IndexPart(index_part) => {
3247 5 : assert_eq!(&index_part, expected);
3248 : }
3249 0 : MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
3250 : }
3251 5 : }
3252 :
3253 : #[tokio::test]
3254 1 : async fn index_part_download_simple() -> anyhow::Result<()> {
3255 1 : let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
3256 1 : let span = test_state.span();
3257 1 : let _guard = span.enter();
3258 :
3259 : // Simple case: we are in generation N, load the index from generation N - 1
3260 1 : let generation_n = 5;
3261 1 : let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
3262 :
3263 1 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
3264 :
3265 2 : Ok(())
3266 1 : }
3267 :
3268 : #[tokio::test]
3269 1 : async fn index_part_download_ordering() -> anyhow::Result<()> {
3270 1 : let test_state = TestSetup::new("index_part_download_ordering")
3271 1 : .await
3272 1 : .unwrap();
3273 :
3274 1 : let span = test_state.span();
3275 1 : let _guard = span.enter();
3276 :
3277 : // A generation-less IndexPart exists in the bucket, we should find it
3278 1 : let generation_n = 5;
3279 1 : let injected_none = inject_index_part(&test_state, Generation::none()).await;
3280 1 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
3281 :
3282 : // If a more recent-than-none generation exists, we should prefer to load that
3283 1 : let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
3284 1 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
3285 :
3286 : // If a more-recent-than-me generation exists, we should ignore it.
3287 1 : let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
3288 1 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
3289 :
3290 : // If a directly previous generation exists, _and_ an index exists in my own
3291 : // generation, I should prefer my own generation.
3292 1 : let _injected_prev =
3293 1 : inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
3294 1 : let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
3295 1 : assert_got_index_part(
3296 1 : &test_state,
3297 1 : Generation::new(generation_n),
3298 1 : &injected_current,
3299 1 : )
3300 1 : .await;
3301 :
3302 2 : Ok(())
3303 1 : }
3304 : }
|