Line data Source code
1 : //! This module manages synchronizing local FS with remote storage.
2 : //!
3 : //! # Overview
4 : //!
5 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
6 : //! It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
7 : //! when it's safe to do so.
8 : //!
9 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
10 : //!
11 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
12 : //!
13 : //! # APIs & How To Use Them
14 : //!
15 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
16 : //! unless the pageserver is configured without remote storage.
17 : //!
18 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
19 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
20 : //! timeline.
21 : //! However, the client does not become ready for use until we've initialized its upload queue:
22 : //!
23 : //! - For timelines that already have some state on the remote storage, we use
24 : //! [`RemoteTimelineClient::init_upload_queue`] .
25 : //! - For newly created timelines, we use
26 : //! [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
27 : //!
28 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
29 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
30 : //!
31 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
32 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
33 : //!
34 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`] when we've created a new layer file.
35 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
36 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
37 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
38 : //!
39 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
40 : //!
41 : //! There are also APIs for downloading files.
42 : //! These are not part of the aforementioned queuing and will not be discussed
43 : //! further here, except in the section covering tenant attach.
44 : //!
45 : //! # Remote Storage Structure & [`IndexPart`] Index File
46 : //!
47 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
48 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
49 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
50 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
51 : //! files for a given timeline.
52 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
53 : //!
54 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
55 : //!
56 : //! # Consistency
57 : //!
58 : //! To have a consistent remote structure, it's important that uploads and
59 : //! deletions are performed in the right order. For example, the index file
60 : //! contains a list of layer files, so it must not be uploaded until all the
61 : //! layer files that are in its list have been successfully uploaded.
62 : //!
63 : //! The contract between client and its user is that the user is responsible of
64 : //! scheduling operations in an order that keeps the remote consistent as
65 : //! described above.
66 : //!
67 : //! From the user's perspective, the operations are executed sequentially.
68 : //! Internally, the client knows which operations can be performed in parallel,
69 : //! and which operations act like a "barrier" that require preceding operations
70 : //! to finish. The calling code just needs to call the schedule-functions in the
71 : //! correct order, and the client will parallelize the operations in a way that
72 : //! is safe. For more details, see `UploadOp::can_bypass`.
73 : //!
74 : //! All of this relies on the following invariants:
75 : //!
76 : //! - We rely on read-after write consistency in the remote storage.
77 : //! - Layer files are immutable.
78 : //!
79 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
80 : //! storage. Different tenants can be attached to different pageservers, but if the
81 : //! same tenant is attached to two pageservers at the same time, they will overwrite
82 : //! each other's index file updates, and confusion will ensue. There's no interlock or
83 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
84 : //! that that doesn't happen.
85 : //!
86 : //! ## Implementation Note
87 : //!
88 : //! The *actual* remote state lags behind the *desired* remote state while
89 : //! there are in-flight operations.
90 : //! We keep track of the desired remote state in [`UploadQueueInitialized::dirty`].
91 : //! It is initialized based on the [`IndexPart`] that was passed during init
92 : //! and updated with every `schedule_*` function call.
93 : //! All this is necessary necessary to compute the future [`IndexPart`]s
94 : //! when scheduling an operation while other operations that also affect the
95 : //! remote [`IndexPart`] are in flight.
96 : //!
97 : //! # Retries & Error Handling
98 : //!
99 : //! The client retries operations indefinitely, using exponential back-off.
100 : //! There is no way to force a retry, i.e., interrupt the back-off.
101 : //! This could be built easily.
102 : //!
103 : //! # Cancellation
104 : //!
105 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
106 : //! the client's tenant and timeline.
107 : //! Dropping the client will drop queued operations but not executing operations.
108 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
109 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
110 : //!
111 : //! # Completion
112 : //!
113 : //! Once an operation has completed, we update [`UploadQueueInitialized::clean`] immediately,
114 : //! and submit a request through the DeletionQueue to update
115 : //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
116 : //! validated that our generation is not stale. It is this visible value
117 : //! that is advertized to safekeepers as a signal that that they can
118 : //! delete the WAL up to that LSN.
119 : //!
120 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
121 : //! for all pending operations to complete. It does not prevent more
122 : //! operations from getting scheduled.
123 : //!
124 : //! # Crash Consistency
125 : //!
126 : //! We do not persist the upload queue state.
127 : //! If we drop the client, or crash, all unfinished operations are lost.
128 : //!
129 : //! To recover, the following steps need to be taken:
130 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
131 : //! consistent remote state, assuming the user scheduled the operations in
132 : //! the correct order.
133 : //! - Initiate upload queue with that [`IndexPart`].
134 : //! - Reschedule all lost operations by comparing the local filesystem state
135 : //! and remote state as per [`IndexPart`]. This is done in
136 : //! [`Tenant::timeline_init_and_sync`].
137 : //!
138 : //! Note that if we crash during file deletion between the index update
139 : //! that removes the file from the list of files, and deleting the remote file,
140 : //! the file is leaked in the remote storage. Similarly, if a new file is created
141 : //! and uploaded, but the pageserver dies permanently before updating the
142 : //! remote index file, the new file is leaked in remote storage. We accept and
143 : //! tolerate that for now.
144 : //! Note further that we cannot easily fix this by scheduling deletes for every
145 : //! file that is present only on the remote, because we cannot distinguish the
146 : //! following two cases:
147 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
148 : //! but crashed before it finished remotely.
149 : //! - (2) We never had the file locally because we haven't on-demand downloaded
150 : //! it yet.
151 : //!
152 : //! # Downloads
153 : //!
154 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
155 : //! downloading files from the remote storage. Downloads are performed immediately
156 : //! against the `RemoteStorage`, independently of the upload queue.
157 : //!
158 : //! When we attach a tenant, we perform the following steps:
159 : //! - create `Tenant` object in `TenantState::Attaching` state
160 : //! - List timelines that are present in remote storage, and for each:
161 : //! - download their remote [`IndexPart`]s
162 : //! - create `Timeline` struct and a `RemoteTimelineClient`
163 : //! - initialize the client's upload queue with its `IndexPart`
164 : //! - schedule uploads for layers that are only present locally.
165 : //! - After the above is done for each timeline, open the tenant for business by
166 : //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
167 : //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
168 : //!
169 : //! # Operating Without Remote Storage
170 : //!
171 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
172 : //! not created and the uploads are skipped.
173 : //!
174 : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
175 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
176 :
177 : pub(crate) mod download;
178 : pub mod index;
179 : pub mod manifest;
180 : pub(crate) mod upload;
181 :
182 : use std::collections::{HashMap, HashSet, VecDeque};
183 : use std::ops::DerefMut;
184 : use std::sync::atomic::{AtomicU32, Ordering};
185 : use std::sync::{Arc, Mutex, OnceLock};
186 : use std::time::Duration;
187 :
188 : use anyhow::Context;
189 : use camino::Utf8Path;
190 : use chrono::{NaiveDateTime, Utc};
191 : pub(crate) use download::{
192 : download_index_part, download_initdb_tar_zst, download_tenant_manifest, is_temp_download_file,
193 : list_remote_tenant_shards, list_remote_timelines,
194 : };
195 : use index::GcCompactionState;
196 : pub(crate) use index::LayerFileMetadata;
197 : use pageserver_api::models::TimelineArchivalState;
198 : use pageserver_api::shard::{ShardIndex, TenantShardId};
199 : use regex::Regex;
200 : use remote_storage::{
201 : DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
202 : };
203 : use scopeguard::ScopeGuard;
204 : use tokio_util::sync::CancellationToken;
205 : use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
206 : pub(crate) use upload::upload_initdb_dir;
207 : use utils::backoff::{
208 : self, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
209 : };
210 : use utils::id::{TenantId, TimelineId};
211 : use utils::lsn::Lsn;
212 : use utils::pausable_failpoint;
213 : use utils::shard::ShardNumber;
214 :
215 : use self::index::IndexPart;
216 : use super::config::AttachedLocationConfig;
217 : use super::metadata::MetadataUpdate;
218 : use super::storage_layer::{Layer, LayerName, ResidentLayer};
219 : use super::timeline::import_pgdata;
220 : use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
221 : use super::{DeleteTimelineError, Generation};
222 : use crate::config::PageServerConf;
223 : use crate::context::RequestContext;
224 : use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
225 : use crate::metrics::{
226 : MeasureRemoteOp, REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
227 : RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
228 : RemoteTimelineClientMetricsCallTrackSize,
229 : };
230 : use crate::task_mgr::{BACKGROUND_RUNTIME, TaskKind, shutdown_token};
231 : use crate::tenant::metadata::TimelineMetadata;
232 : use crate::tenant::remote_timeline_client::download::download_retry;
233 : use crate::tenant::storage_layer::AsLayerDesc;
234 : use crate::tenant::upload_queue::{
235 : Delete, OpType, UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped,
236 : UploadQueueStoppedDeletable, UploadTask,
237 : };
238 : use crate::tenant::{TIMELINES_SEGMENT_NAME, debug_assert_current_span_has_tenant_and_timeline_id};
239 : use crate::{TENANT_HEATMAP_BASENAME, task_mgr};
240 :
241 : // Occasional network issues and such can cause remote operations to fail, and
242 : // that's expected. If a download fails, we log it at info-level, and retry.
243 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
244 : // level instead, as repeated failures can mean a more serious problem. If it
245 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
246 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
247 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
248 :
249 : // Similarly log failed uploads and deletions at WARN level, after this many
250 : // retries. Uploads and deletions are retried forever, though.
251 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
252 :
253 : pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
254 :
255 : pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst";
256 :
257 : /// Default buffer size when interfacing with [`tokio::fs::File`].
258 : pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
259 :
260 : /// Doing non-essential flushes of deletion queue is subject to this timeout, after
261 : /// which we warn and skip.
262 : const DELETION_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_secs(10);
263 :
264 : pub enum MaybeDeletedIndexPart {
265 : IndexPart(IndexPart),
266 : Deleted(IndexPart),
267 : }
268 :
269 : #[derive(Debug, thiserror::Error)]
270 : pub enum PersistIndexPartWithDeletedFlagError {
271 : #[error("another task is already setting the deleted_flag, started at {0:?}")]
272 : AlreadyInProgress(NaiveDateTime),
273 : #[error("the deleted_flag was already set, value is {0:?}")]
274 : AlreadyDeleted(NaiveDateTime),
275 : #[error(transparent)]
276 : Other(#[from] anyhow::Error),
277 : }
278 :
279 : #[derive(Debug, thiserror::Error)]
280 : pub enum WaitCompletionError {
281 : #[error(transparent)]
282 : NotInitialized(NotInitialized),
283 : #[error("wait_completion aborted because upload queue was stopped")]
284 : UploadQueueShutDownOrStopped,
285 : }
286 :
287 : #[derive(Debug, thiserror::Error)]
288 : #[error("Upload queue either in unexpected state or hasn't downloaded manifest yet")]
289 : pub struct UploadQueueNotReadyError;
290 :
291 : #[derive(Debug, thiserror::Error)]
292 : pub enum ShutdownIfArchivedError {
293 : #[error(transparent)]
294 : NotInitialized(NotInitialized),
295 : #[error("timeline is not archived")]
296 : NotArchived,
297 : }
298 :
299 : /// Behavioral modes that enable seamless live migration.
300 : ///
301 : /// See docs/rfcs/028-pageserver-migration.md to understand how these fit in.
302 : struct RemoteTimelineClientConfig {
303 : /// If this is false, then update to remote_consistent_lsn are dropped rather
304 : /// than being submitted to DeletionQueue for validation. This behavior is
305 : /// used when a tenant attachment is known to have a stale generation number,
306 : /// such that validation attempts will always fail. This is not necessary
307 : /// for correctness, but avoids spamming error statistics with failed validations
308 : /// when doing migrations of tenants.
309 : process_remote_consistent_lsn_updates: bool,
310 :
311 : /// If this is true, then object deletions are held in a buffer in RemoteTimelineClient
312 : /// rather than being submitted to the DeletionQueue. This behavior is used when a tenant
313 : /// is known to be multi-attached, in order to avoid disrupting other attached tenants
314 : /// whose generations' metadata refers to the deleted objects.
315 : block_deletions: bool,
316 : }
317 :
318 : /// RemoteTimelineClientConfig's state is entirely driven by LocationConf, but we do
319 : /// not carry the entire LocationConf structure: it's much more than we need. The From
320 : /// impl extracts the subset of the LocationConf that is interesting to RemoteTimelineClient.
321 : impl From<&AttachedLocationConfig> for RemoteTimelineClientConfig {
322 926 : fn from(lc: &AttachedLocationConfig) -> Self {
323 926 : Self {
324 926 : block_deletions: !lc.may_delete_layers_hint(),
325 926 : process_remote_consistent_lsn_updates: lc.may_upload_layers_hint(),
326 926 : }
327 926 : }
328 : }
329 :
330 : /// A client for accessing a timeline's data in remote storage.
331 : ///
332 : /// This takes care of managing the number of connections, and balancing them
333 : /// across tenants. This also handles retries of failed uploads.
334 : ///
335 : /// Upload and delete requests are ordered so that before a deletion is
336 : /// performed, we wait for all preceding uploads to finish. This ensures sure
337 : /// that if you perform a compaction operation that reshuffles data in layer
338 : /// files, we don't have a transient state where the old files have already been
339 : /// deleted, but new files have not yet been uploaded.
340 : ///
341 : /// Similarly, this enforces an order between index-file uploads, and layer
342 : /// uploads. Before an index-file upload is performed, all preceding layer
343 : /// uploads must be finished.
344 : ///
345 : /// This also maintains a list of remote files, and automatically includes that
346 : /// in the index part file, whenever timeline metadata is uploaded.
347 : ///
348 : /// Downloads are not queued, they are performed immediately.
349 : pub(crate) struct RemoteTimelineClient {
350 : conf: &'static PageServerConf,
351 :
352 : runtime: tokio::runtime::Handle,
353 :
354 : tenant_shard_id: TenantShardId,
355 : timeline_id: TimelineId,
356 : generation: Generation,
357 :
358 : upload_queue: Mutex<UploadQueue>,
359 :
360 : pub(crate) metrics: Arc<RemoteTimelineClientMetrics>,
361 :
362 : storage_impl: GenericRemoteStorage,
363 :
364 : deletion_queue_client: DeletionQueueClient,
365 :
366 : /// Subset of tenant configuration used to control upload behaviors during migrations
367 : config: std::sync::RwLock<RemoteTimelineClientConfig>,
368 :
369 : cancel: CancellationToken,
370 : }
371 :
372 : impl Drop for RemoteTimelineClient {
373 40 : fn drop(&mut self) {
374 40 : debug!("dropping RemoteTimelineClient");
375 40 : }
376 : }
377 :
378 : impl RemoteTimelineClient {
379 : ///
380 : /// Create a remote storage client for given timeline
381 : ///
382 : /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
383 : /// by calling init_upload_queue.
384 : ///
385 906 : pub(crate) fn new(
386 906 : remote_storage: GenericRemoteStorage,
387 906 : deletion_queue_client: DeletionQueueClient,
388 906 : conf: &'static PageServerConf,
389 906 : tenant_shard_id: TenantShardId,
390 906 : timeline_id: TimelineId,
391 906 : generation: Generation,
392 906 : location_conf: &AttachedLocationConfig,
393 906 : ) -> RemoteTimelineClient {
394 906 : RemoteTimelineClient {
395 906 : conf,
396 906 : runtime: if cfg!(test) {
397 : // remote_timeline_client.rs tests rely on current-thread runtime
398 906 : tokio::runtime::Handle::current()
399 : } else {
400 0 : BACKGROUND_RUNTIME.handle().clone()
401 : },
402 906 : tenant_shard_id,
403 906 : timeline_id,
404 906 : generation,
405 906 : storage_impl: remote_storage,
406 906 : deletion_queue_client,
407 906 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
408 906 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
409 906 : &tenant_shard_id,
410 906 : &timeline_id,
411 906 : )),
412 906 : config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
413 906 : cancel: CancellationToken::new(),
414 906 : }
415 906 : }
416 :
417 : /// Initialize the upload queue for a remote storage that already received
418 : /// an index file upload, i.e., it's not empty.
419 : /// The given `index_part` must be the one on the remote.
420 12 : pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
421 12 : // Set the maximum number of inprogress tasks to the remote storage concurrency. There's
422 12 : // certainly no point in starting more upload tasks than this.
423 12 : let inprogress_limit = self
424 12 : .conf
425 12 : .remote_storage_config
426 12 : .as_ref()
427 12 : .map_or(0, |r| r.concurrency_limit());
428 12 : let mut upload_queue = self.upload_queue.lock().unwrap();
429 12 : upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
430 12 : self.update_remote_physical_size_gauge(Some(index_part));
431 12 : info!(
432 0 : "initialized upload queue from remote index with {} layer files",
433 0 : index_part.layer_metadata.len()
434 : );
435 12 : Ok(())
436 12 : }
437 :
438 : /// Initialize the upload queue for the case where the remote storage is empty,
439 : /// i.e., it doesn't have an `IndexPart`.
440 894 : pub fn init_upload_queue_for_empty_remote(
441 894 : &self,
442 894 : local_metadata: &TimelineMetadata,
443 894 : ) -> anyhow::Result<()> {
444 894 : // Set the maximum number of inprogress tasks to the remote storage concurrency. There's
445 894 : // certainly no point in starting more upload tasks than this.
446 894 : let inprogress_limit = self
447 894 : .conf
448 894 : .remote_storage_config
449 894 : .as_ref()
450 894 : .map_or(0, |r| r.concurrency_limit());
451 894 : let mut upload_queue = self.upload_queue.lock().unwrap();
452 894 : upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
453 894 : self.update_remote_physical_size_gauge(None);
454 894 : info!("initialized upload queue as empty");
455 894 : Ok(())
456 894 : }
457 :
458 : /// Initialize the queue in stopped state. Used in startup path
459 : /// to continue deletion operation interrupted by pageserver crash or restart.
460 0 : pub fn init_upload_queue_stopped_to_continue_deletion(
461 0 : &self,
462 0 : index_part: &IndexPart,
463 0 : ) -> anyhow::Result<()> {
464 : // FIXME: consider newtype for DeletedIndexPart.
465 0 : let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
466 0 : "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
467 0 : ))?;
468 0 : let inprogress_limit = self
469 0 : .conf
470 0 : .remote_storage_config
471 0 : .as_ref()
472 0 : .map_or(0, |r| r.concurrency_limit());
473 0 :
474 0 : let mut upload_queue = self.upload_queue.lock().unwrap();
475 0 : upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
476 0 : self.update_remote_physical_size_gauge(Some(index_part));
477 0 : self.stop_impl(&mut upload_queue);
478 0 :
479 0 : upload_queue
480 0 : .stopped_mut()
481 0 : .expect("stopped above")
482 0 : .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
483 0 :
484 0 : Ok(())
485 0 : }
486 :
487 : /// Notify this client of a change to its parent tenant's config, as this may cause us to
488 : /// take action (unblocking deletions when transitioning from AttachedMulti to AttachedSingle)
489 0 : pub(super) fn update_config(&self, location_conf: &AttachedLocationConfig) {
490 0 : let new_conf = RemoteTimelineClientConfig::from(location_conf);
491 0 : let unblocked = !new_conf.block_deletions;
492 0 :
493 0 : // Update config before draining deletions, so that we don't race with more being
494 0 : // inserted. This can result in deletions happening our of order, but that does not
495 0 : // violate any invariants: deletions only need to be ordered relative to upload of the index
496 0 : // that dereferences the deleted objects, and we are not changing that order.
497 0 : *self.config.write().unwrap() = new_conf;
498 0 :
499 0 : if unblocked {
500 : // If we may now delete layers, drain any that were blocked in our old
501 : // configuration state
502 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
503 :
504 0 : if let Ok(queue) = queue_locked.initialized_mut() {
505 0 : let blocked_deletions = std::mem::take(&mut queue.blocked_deletions);
506 0 : for d in blocked_deletions {
507 0 : if let Err(e) = self.deletion_queue_client.push_layers(
508 0 : self.tenant_shard_id,
509 0 : self.timeline_id,
510 0 : self.generation,
511 0 : d.layers,
512 0 : ) {
513 : // This could happen if the pageserver is shut down while a tenant
514 : // is transitioning from a deletion-blocked state: we will leak some
515 : // S3 objects in this case.
516 0 : warn!("Failed to drain blocked deletions: {}", e);
517 0 : break;
518 0 : }
519 : }
520 0 : }
521 0 : }
522 0 : }
523 :
524 : /// Returns `None` if nothing is yet uplodaded, `Some(disk_consistent_lsn)` otherwise.
525 0 : pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
526 0 : match &mut *self.upload_queue.lock().unwrap() {
527 0 : UploadQueue::Uninitialized => None,
528 0 : UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
529 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
530 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => q
531 0 : .upload_queue_for_deletion
532 0 : .get_last_remote_consistent_lsn_projected(),
533 : }
534 0 : }
535 :
536 0 : pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
537 0 : match &mut *self.upload_queue.lock().unwrap() {
538 0 : UploadQueue::Uninitialized => None,
539 0 : UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
540 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
541 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => Some(
542 0 : q.upload_queue_for_deletion
543 0 : .get_last_remote_consistent_lsn_visible(),
544 0 : ),
545 : }
546 0 : }
547 :
548 : /// Returns true if this timeline was previously detached at this Lsn and the remote timeline
549 : /// client is currently initialized.
550 0 : pub(crate) fn is_previous_ancestor_lsn(&self, lsn: Lsn) -> bool {
551 0 : self.upload_queue
552 0 : .lock()
553 0 : .unwrap()
554 0 : .initialized_mut()
555 0 : .map(|uq| uq.clean.0.lineage.is_previous_ancestor_lsn(lsn))
556 0 : .unwrap_or(false)
557 0 : }
558 :
559 : /// Returns whether the timeline is archived.
560 : /// Return None if the remote index_part hasn't been downloaded yet.
561 0 : pub(crate) fn is_archived(&self) -> Option<bool> {
562 0 : self.upload_queue
563 0 : .lock()
564 0 : .unwrap()
565 0 : .initialized_mut()
566 0 : .map(|q| q.clean.0.archived_at.is_some())
567 0 : .ok()
568 0 : }
569 :
570 : /// Returns `Ok(Some(timestamp))` if the timeline has been archived, `Ok(None)` if the timeline hasn't been archived.
571 : ///
572 : /// Return Err(_) if the remote index_part hasn't been downloaded yet, or the timeline hasn't been stopped yet.
573 4 : pub(crate) fn archived_at_stopped_queue(
574 4 : &self,
575 4 : ) -> Result<Option<NaiveDateTime>, UploadQueueNotReadyError> {
576 4 : self.upload_queue
577 4 : .lock()
578 4 : .unwrap()
579 4 : .stopped_mut()
580 4 : .map(|q| q.upload_queue_for_deletion.clean.0.archived_at)
581 4 : .map_err(|_| UploadQueueNotReadyError)
582 4 : }
583 :
584 3883 : fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
585 3883 : let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
586 2989 : current_remote_index_part
587 2989 : .layer_metadata
588 2989 : .values()
589 35814 : .map(|ilmd| ilmd.file_size)
590 2989 : .sum()
591 : } else {
592 894 : 0
593 : };
594 3883 : self.metrics.remote_physical_size_gauge.set(size);
595 3883 : }
596 :
597 4 : pub fn get_remote_physical_size(&self) -> u64 {
598 4 : self.metrics.remote_physical_size_gauge.get()
599 4 : }
600 :
601 : //
602 : // Download operations.
603 : //
604 : // These don't use the per-timeline queue. They do use the global semaphore in
605 : // S3Bucket, to limit the total number of concurrent operations, though.
606 : //
607 :
608 : /// Download index file
609 40 : pub async fn download_index_file(
610 40 : &self,
611 40 : cancel: &CancellationToken,
612 40 : ) -> Result<MaybeDeletedIndexPart, DownloadError> {
613 40 : let _unfinished_gauge_guard = self.metrics.call_begin(
614 40 : &RemoteOpFileKind::Index,
615 40 : &RemoteOpKind::Download,
616 40 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
617 40 : reason: "no need for a downloads gauge",
618 40 : },
619 40 : );
620 :
621 40 : let (index_part, index_generation, index_last_modified) = download::download_index_part(
622 40 : &self.storage_impl,
623 40 : &self.tenant_shard_id,
624 40 : &self.timeline_id,
625 40 : self.generation,
626 40 : cancel,
627 40 : )
628 40 : .measure_remote_op(
629 40 : RemoteOpFileKind::Index,
630 40 : RemoteOpKind::Download,
631 40 : Arc::clone(&self.metrics),
632 40 : )
633 40 : .await?;
634 :
635 : // Defense in depth: monotonicity of generation numbers is an important correctness guarantee, so when we see a very
636 : // old index, we do extra checks in case this is the result of backward time-travel of the generation number (e.g.
637 : // in case of a bug in the service that issues generation numbers). Indices are allowed to be old, but we expect that
638 : // when we load an old index we are loading the _latest_ index: if we are asked to load an old index and there is
639 : // also a newer index available, that is surprising.
640 : const INDEX_AGE_CHECKS_THRESHOLD: Duration = Duration::from_secs(14 * 24 * 3600);
641 40 : let index_age = index_last_modified.elapsed().unwrap_or_else(|e| {
642 0 : if e.duration() > Duration::from_secs(5) {
643 : // We only warn if the S3 clock and our local clock are >5s out: because this is a low resolution
644 : // timestamp, it is common to be out by at least 1 second.
645 0 : tracing::warn!("Index has modification time in the future: {e}");
646 0 : }
647 0 : Duration::ZERO
648 40 : });
649 40 : if index_age > INDEX_AGE_CHECKS_THRESHOLD {
650 0 : tracing::info!(
651 : ?index_generation,
652 0 : age = index_age.as_secs_f64(),
653 0 : "Loaded an old index, checking for other indices..."
654 : );
655 :
656 : // Find the highest-generation index
657 0 : let (_latest_index_part, latest_index_generation, latest_index_mtime) =
658 0 : download::download_index_part(
659 0 : &self.storage_impl,
660 0 : &self.tenant_shard_id,
661 0 : &self.timeline_id,
662 0 : Generation::MAX,
663 0 : cancel,
664 0 : )
665 0 : .await?;
666 :
667 0 : if latest_index_generation > index_generation {
668 : // Unexpected! Why are we loading such an old index if a more recent one exists?
669 : // We will refuse to proceed, as there is no reasonable scenario where this should happen, but
670 : // there _is_ a clear bug/corruption scenario where it would happen (controller sets the generation
671 : // backwards).
672 0 : tracing::error!(
673 : ?index_generation,
674 : ?latest_index_generation,
675 : ?latest_index_mtime,
676 0 : "Found a newer index while loading an old one"
677 : );
678 0 : return Err(DownloadError::Fatal(
679 0 : "Index age exceeds threshold and a newer index exists".into(),
680 0 : ));
681 0 : }
682 40 : }
683 :
684 40 : if index_part.deleted_at.is_some() {
685 0 : Ok(MaybeDeletedIndexPart::Deleted(index_part))
686 : } else {
687 40 : Ok(MaybeDeletedIndexPart::IndexPart(index_part))
688 : }
689 40 : }
690 :
691 : /// Download a (layer) file from `path`, into local filesystem.
692 : ///
693 : /// 'layer_metadata' is the metadata from the remote index file.
694 : ///
695 : /// On success, returns the size of the downloaded file.
696 28 : pub async fn download_layer_file(
697 28 : &self,
698 28 : layer_file_name: &LayerName,
699 28 : layer_metadata: &LayerFileMetadata,
700 28 : local_path: &Utf8Path,
701 28 : gate: &utils::sync::gate::Gate,
702 28 : cancel: &CancellationToken,
703 28 : ctx: &RequestContext,
704 28 : ) -> Result<u64, DownloadError> {
705 28 : let downloaded_size = {
706 28 : let _unfinished_gauge_guard = self.metrics.call_begin(
707 28 : &RemoteOpFileKind::Layer,
708 28 : &RemoteOpKind::Download,
709 28 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
710 28 : reason: "no need for a downloads gauge",
711 28 : },
712 28 : );
713 28 : download::download_layer_file(
714 28 : self.conf,
715 28 : &self.storage_impl,
716 28 : self.tenant_shard_id,
717 28 : self.timeline_id,
718 28 : layer_file_name,
719 28 : layer_metadata,
720 28 : local_path,
721 28 : gate,
722 28 : cancel,
723 28 : ctx,
724 28 : )
725 28 : .measure_remote_op(
726 28 : RemoteOpFileKind::Layer,
727 28 : RemoteOpKind::Download,
728 28 : Arc::clone(&self.metrics),
729 28 : )
730 28 : .await?
731 : };
732 :
733 28 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
734 28 : REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
735 28 :
736 28 : Ok(downloaded_size)
737 28 : }
738 :
739 : //
740 : // Upload operations.
741 : //
742 :
743 : /// Launch an index-file upload operation in the background, with
744 : /// fully updated metadata.
745 : ///
746 : /// This should only be used to upload initial metadata to remote storage.
747 : ///
748 : /// The upload will be added to the queue immediately, but it
749 : /// won't be performed until all previously scheduled layer file
750 : /// upload operations have completed successfully. This is to
751 : /// ensure that when the index file claims that layers X, Y and Z
752 : /// exist in remote storage, they really do. To wait for the upload
753 : /// to complete, use `wait_completion`.
754 : ///
755 : /// If there were any changes to the list of files, i.e. if any
756 : /// layer file uploads were scheduled, since the last index file
757 : /// upload, those will be included too.
758 460 : pub fn schedule_index_upload_for_full_metadata_update(
759 460 : self: &Arc<Self>,
760 460 : metadata: &TimelineMetadata,
761 460 : ) -> anyhow::Result<()> {
762 460 : let mut guard = self.upload_queue.lock().unwrap();
763 460 : let upload_queue = guard.initialized_mut()?;
764 :
765 : // As documented in the struct definition, it's ok for latest_metadata to be
766 : // ahead of what's _actually_ on the remote during index upload.
767 460 : upload_queue.dirty.metadata = metadata.clone();
768 460 :
769 460 : self.schedule_index_upload(upload_queue);
770 460 :
771 460 : Ok(())
772 460 : }
773 :
774 : /// Launch an index-file upload operation in the background, with only parts of the metadata
775 : /// updated.
776 : ///
777 : /// This is the regular way of updating metadata on layer flushes or Gc.
778 : ///
779 : /// Using this lighter update mechanism allows for reparenting and detaching without changes to
780 : /// `index_part.json`, while being more clear on what values update regularly.
781 2458 : pub(crate) fn schedule_index_upload_for_metadata_update(
782 2458 : self: &Arc<Self>,
783 2458 : update: &MetadataUpdate,
784 2458 : ) -> anyhow::Result<()> {
785 2458 : let mut guard = self.upload_queue.lock().unwrap();
786 2458 : let upload_queue = guard.initialized_mut()?;
787 :
788 2458 : upload_queue.dirty.metadata.apply(update);
789 2458 :
790 2458 : // Defense in depth: if we somehow generated invalid metadata, do not persist it.
791 2458 : upload_queue
792 2458 : .dirty
793 2458 : .validate()
794 2458 : .map_err(|e| anyhow::anyhow!(e))?;
795 :
796 2458 : self.schedule_index_upload(upload_queue);
797 2458 :
798 2458 : Ok(())
799 2458 : }
800 :
801 : /// Launch an index-file upload operation in the background, with only the `archived_at` field updated.
802 : ///
803 : /// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded,
804 : /// so either if the change is already sitting in the queue, but not commited yet, or the change has not
805 : /// been in the queue yet.
806 4 : pub(crate) fn schedule_index_upload_for_timeline_archival_state(
807 4 : self: &Arc<Self>,
808 4 : state: TimelineArchivalState,
809 4 : ) -> anyhow::Result<bool> {
810 4 : let mut guard = self.upload_queue.lock().unwrap();
811 4 : let upload_queue = guard.initialized_mut()?;
812 :
813 : /// Returns Some(_) if a change is needed, and Some(true) if it's a
814 : /// change needed to set archived_at.
815 8 : fn need_change(
816 8 : archived_at: &Option<NaiveDateTime>,
817 8 : state: TimelineArchivalState,
818 8 : ) -> Option<bool> {
819 8 : match (archived_at, state) {
820 : (Some(_), TimelineArchivalState::Archived)
821 : | (None, TimelineArchivalState::Unarchived) => {
822 : // Nothing to do
823 0 : tracing::info!("intended state matches present state");
824 0 : None
825 : }
826 8 : (None, TimelineArchivalState::Archived) => Some(true),
827 0 : (Some(_), TimelineArchivalState::Unarchived) => Some(false),
828 : }
829 8 : }
830 4 : let need_upload_scheduled = need_change(&upload_queue.dirty.archived_at, state);
831 :
832 4 : if let Some(archived_at_set) = need_upload_scheduled {
833 4 : let intended_archived_at = archived_at_set.then(|| Utc::now().naive_utc());
834 4 : upload_queue.dirty.archived_at = intended_archived_at;
835 4 : self.schedule_index_upload(upload_queue);
836 4 : }
837 :
838 4 : let need_wait = need_change(&upload_queue.clean.0.archived_at, state).is_some();
839 4 : Ok(need_wait)
840 4 : }
841 :
842 : /// Shuts the timeline client down, but only if the timeline is archived.
843 : ///
844 : /// This function and [`Self::schedule_index_upload_for_timeline_archival_state`] use the
845 : /// same lock to prevent races between unarchival and offloading: unarchival requires the
846 : /// upload queue to be initialized, and leaves behind an upload queue where either dirty
847 : /// or clean has archived_at of `None`. offloading leaves behind an uninitialized upload
848 : /// queue.
849 4 : pub(crate) async fn shutdown_if_archived(
850 4 : self: &Arc<Self>,
851 4 : ) -> Result<(), ShutdownIfArchivedError> {
852 4 : {
853 4 : let mut guard = self.upload_queue.lock().unwrap();
854 4 : let upload_queue = guard
855 4 : .initialized_mut()
856 4 : .map_err(ShutdownIfArchivedError::NotInitialized)?;
857 :
858 4 : match (
859 4 : upload_queue.dirty.archived_at.is_none(),
860 4 : upload_queue.clean.0.archived_at.is_none(),
861 4 : ) {
862 : // The expected case: the timeline is archived and we don't want to unarchive
863 4 : (false, false) => {}
864 : (true, false) => {
865 0 : tracing::info!("can't shut down timeline: timeline slated for unarchival");
866 0 : return Err(ShutdownIfArchivedError::NotArchived);
867 : }
868 0 : (dirty_archived, true) => {
869 0 : tracing::info!(%dirty_archived, "can't shut down timeline: timeline not archived in remote storage");
870 0 : return Err(ShutdownIfArchivedError::NotArchived);
871 : }
872 : }
873 :
874 : // Set the shutting_down flag while the guard from the archival check is held.
875 : // This prevents a race with unarchival, as initialized_mut will not return
876 : // an upload queue from this point.
877 : // Also launch the queued tasks like shutdown() does.
878 4 : if !upload_queue.shutting_down {
879 4 : upload_queue.shutting_down = true;
880 4 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
881 4 : // this operation is not counted similar to Barrier
882 4 : self.launch_queued_tasks(upload_queue);
883 4 : }
884 : }
885 :
886 4 : self.shutdown().await;
887 :
888 4 : Ok(())
889 4 : }
890 :
891 : /// Launch an index-file upload operation in the background, setting `import_pgdata` field.
892 0 : pub(crate) fn schedule_index_upload_for_import_pgdata_state_update(
893 0 : self: &Arc<Self>,
894 0 : state: Option<import_pgdata::index_part_format::Root>,
895 0 : ) -> anyhow::Result<()> {
896 0 : let mut guard = self.upload_queue.lock().unwrap();
897 0 : let upload_queue = guard.initialized_mut()?;
898 0 : upload_queue.dirty.import_pgdata = state;
899 0 : self.schedule_index_upload(upload_queue);
900 0 : Ok(())
901 0 : }
902 :
903 : /// Launch an index-file upload operation in the background, setting `import_pgdata` field.
904 0 : pub(crate) fn schedule_index_upload_for_gc_compaction_state_update(
905 0 : self: &Arc<Self>,
906 0 : gc_compaction_state: GcCompactionState,
907 0 : ) -> anyhow::Result<()> {
908 0 : let mut guard = self.upload_queue.lock().unwrap();
909 0 : let upload_queue = guard.initialized_mut()?;
910 0 : upload_queue.dirty.gc_compaction = Some(gc_compaction_state);
911 0 : self.schedule_index_upload(upload_queue);
912 0 : Ok(())
913 0 : }
914 :
915 : ///
916 : /// Launch an index-file upload operation in the background, if necessary.
917 : ///
918 : /// Use this function to schedule the update of the index file after
919 : /// scheduling file uploads or deletions. If no file uploads or deletions
920 : /// have been scheduled since the last index file upload, this does
921 : /// nothing.
922 : ///
923 : /// Like schedule_index_upload_for_metadata_update(), this merely adds
924 : /// the upload to the upload queue and returns quickly.
925 740 : pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> Result<(), NotInitialized> {
926 740 : let mut guard = self.upload_queue.lock().unwrap();
927 740 : let upload_queue = guard.initialized_mut()?;
928 :
929 740 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
930 28 : self.schedule_index_upload(upload_queue);
931 712 : }
932 :
933 740 : Ok(())
934 740 : }
935 :
936 : /// Launch an index-file upload operation in the background (internal function)
937 3094 : fn schedule_index_upload(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
938 3094 : let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn();
939 3094 : // fix up the duplicated field
940 3094 : upload_queue.dirty.disk_consistent_lsn = disk_consistent_lsn;
941 3094 :
942 3094 : // make sure it serializes before doing it in perform_upload_task so that it doesn't
943 3094 : // look like a retryable error
944 3094 : let void = std::io::sink();
945 3094 : serde_json::to_writer(void, &upload_queue.dirty).expect("serialize index_part.json");
946 3094 :
947 3094 : let index_part = &upload_queue.dirty;
948 3094 :
949 3094 : info!(
950 0 : "scheduling metadata upload up to consistent LSN {disk_consistent_lsn} with {} files ({} changed)",
951 0 : index_part.layer_metadata.len(),
952 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
953 : );
954 :
955 3094 : let op = UploadOp::UploadMetadata {
956 3094 : uploaded: Box::new(index_part.clone()),
957 3094 : };
958 3094 : self.metric_begin(&op);
959 3094 : upload_queue.queued_operations.push_back(op);
960 3094 : upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
961 3094 :
962 3094 : // Launch the task immediately, if possible
963 3094 : self.launch_queued_tasks(upload_queue);
964 3094 : }
965 :
966 : /// Reparent this timeline to a new parent.
967 : ///
968 : /// A retryable step of timeline ancestor detach.
969 0 : pub(crate) async fn schedule_reparenting_and_wait(
970 0 : self: &Arc<Self>,
971 0 : new_parent: &TimelineId,
972 0 : ) -> anyhow::Result<()> {
973 0 : let receiver = {
974 0 : let mut guard = self.upload_queue.lock().unwrap();
975 0 : let upload_queue = guard.initialized_mut()?;
976 :
977 0 : let Some(prev) = upload_queue.dirty.metadata.ancestor_timeline() else {
978 0 : return Err(anyhow::anyhow!(
979 0 : "cannot reparent without a current ancestor"
980 0 : ));
981 : };
982 :
983 0 : let uploaded = &upload_queue.clean.0.metadata;
984 0 :
985 0 : if uploaded.ancestor_timeline().is_none() && !uploaded.ancestor_lsn().is_valid() {
986 : // nothing to do
987 0 : None
988 : } else {
989 0 : upload_queue.dirty.metadata.reparent(new_parent);
990 0 : upload_queue.dirty.lineage.record_previous_ancestor(&prev);
991 0 :
992 0 : self.schedule_index_upload(upload_queue);
993 0 :
994 0 : Some(self.schedule_barrier0(upload_queue))
995 : }
996 : };
997 :
998 0 : if let Some(receiver) = receiver {
999 0 : Self::wait_completion0(receiver).await?;
1000 0 : }
1001 0 : Ok(())
1002 0 : }
1003 :
1004 : /// Schedules uploading a new version of `index_part.json` with the given layers added,
1005 : /// detaching from ancestor and waits for it to complete.
1006 : ///
1007 : /// This is used with `Timeline::detach_ancestor` functionality.
1008 0 : pub(crate) async fn schedule_adding_existing_layers_to_index_detach_and_wait(
1009 0 : self: &Arc<Self>,
1010 0 : layers: &[Layer],
1011 0 : adopted: (TimelineId, Lsn),
1012 0 : ) -> anyhow::Result<()> {
1013 0 : let barrier = {
1014 0 : let mut guard = self.upload_queue.lock().unwrap();
1015 0 : let upload_queue = guard.initialized_mut()?;
1016 :
1017 0 : if upload_queue.clean.0.lineage.detached_previous_ancestor() == Some(adopted) {
1018 0 : None
1019 : } else {
1020 0 : upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
1021 0 : upload_queue.dirty.lineage.record_detaching(&adopted);
1022 :
1023 0 : for layer in layers {
1024 0 : let prev = upload_queue
1025 0 : .dirty
1026 0 : .layer_metadata
1027 0 : .insert(layer.layer_desc().layer_name(), layer.metadata());
1028 0 : assert!(prev.is_none(), "copied layer existed already {layer}");
1029 : }
1030 :
1031 0 : self.schedule_index_upload(upload_queue);
1032 0 :
1033 0 : Some(self.schedule_barrier0(upload_queue))
1034 : }
1035 : };
1036 :
1037 0 : if let Some(barrier) = barrier {
1038 0 : Self::wait_completion0(barrier).await?;
1039 0 : }
1040 0 : Ok(())
1041 0 : }
1042 :
1043 : /// Adds a gc blocking reason for this timeline if one does not exist already.
1044 : ///
1045 : /// A retryable step of timeline detach ancestor.
1046 : ///
1047 : /// Returns a future which waits until the completion of the upload.
1048 0 : pub(crate) fn schedule_insert_gc_block_reason(
1049 0 : self: &Arc<Self>,
1050 0 : reason: index::GcBlockingReason,
1051 0 : ) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
1052 0 : {
1053 0 : let maybe_barrier = {
1054 0 : let mut guard = self.upload_queue.lock().unwrap();
1055 0 : let upload_queue = guard.initialized_mut()?;
1056 :
1057 0 : if let index::GcBlockingReason::DetachAncestor = reason {
1058 0 : if upload_queue.dirty.metadata.ancestor_timeline().is_none() {
1059 0 : drop(guard);
1060 0 : panic!("cannot start detach ancestor if there is nothing to detach from");
1061 0 : }
1062 0 : }
1063 :
1064 0 : let wanted = |x: Option<&index::GcBlocking>| x.is_some_and(|x| x.blocked_by(reason));
1065 :
1066 0 : let current = upload_queue.dirty.gc_blocking.as_ref();
1067 0 : let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
1068 0 :
1069 0 : match (current, uploaded) {
1070 0 : (x, y) if wanted(x) && wanted(y) => None,
1071 0 : (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
1072 : // Usual case: !wanted(x) && !wanted(y)
1073 : //
1074 : // Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to
1075 : // turn on and off some reason.
1076 0 : (x, y) => {
1077 0 : if !wanted(x) && wanted(y) {
1078 : // this could be avoided by having external in-memory synchronization, like
1079 : // timeline detach ancestor
1080 0 : warn!(
1081 : ?reason,
1082 : op = "insert",
1083 0 : "unexpected: two racing processes to enable and disable a gc blocking reason"
1084 : );
1085 0 : }
1086 :
1087 : // at this point, the metadata must always show that there is a parent
1088 0 : upload_queue.dirty.gc_blocking = current
1089 0 : .map(|x| x.with_reason(reason))
1090 0 : .or_else(|| Some(index::GcBlocking::started_now_for(reason)));
1091 0 : self.schedule_index_upload(upload_queue);
1092 0 : Some(self.schedule_barrier0(upload_queue))
1093 : }
1094 : }
1095 : };
1096 :
1097 0 : Ok(async move {
1098 0 : if let Some(barrier) = maybe_barrier {
1099 0 : Self::wait_completion0(barrier).await?;
1100 0 : }
1101 0 : Ok(())
1102 0 : })
1103 0 : }
1104 :
1105 : /// Removes a gc blocking reason for this timeline if one exists.
1106 : ///
1107 : /// A retryable step of timeline detach ancestor.
1108 : ///
1109 : /// Returns a future which waits until the completion of the upload.
1110 0 : pub(crate) fn schedule_remove_gc_block_reason(
1111 0 : self: &Arc<Self>,
1112 0 : reason: index::GcBlockingReason,
1113 0 : ) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
1114 0 : {
1115 0 : let maybe_barrier = {
1116 0 : let mut guard = self.upload_queue.lock().unwrap();
1117 0 : let upload_queue = guard.initialized_mut()?;
1118 :
1119 0 : if let index::GcBlockingReason::DetachAncestor = reason {
1120 0 : if !upload_queue.clean.0.lineage.is_detached_from_ancestor() {
1121 0 : drop(guard);
1122 0 : panic!("cannot complete timeline_ancestor_detach while not detached");
1123 0 : }
1124 0 : }
1125 :
1126 0 : let wanted = |x: Option<&index::GcBlocking>| {
1127 0 : x.is_none() || x.is_some_and(|b| !b.blocked_by(reason))
1128 0 : };
1129 :
1130 0 : let current = upload_queue.dirty.gc_blocking.as_ref();
1131 0 : let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
1132 0 :
1133 0 : match (current, uploaded) {
1134 0 : (x, y) if wanted(x) && wanted(y) => None,
1135 0 : (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
1136 0 : (x, y) => {
1137 0 : if !wanted(x) && wanted(y) {
1138 0 : warn!(
1139 : ?reason,
1140 : op = "remove",
1141 0 : "unexpected: two racing processes to enable and disable a gc blocking reason (remove)"
1142 : );
1143 0 : }
1144 :
1145 0 : upload_queue.dirty.gc_blocking =
1146 0 : current.as_ref().and_then(|x| x.without_reason(reason));
1147 0 : assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
1148 0 : self.schedule_index_upload(upload_queue);
1149 0 : Some(self.schedule_barrier0(upload_queue))
1150 : }
1151 : }
1152 : };
1153 :
1154 0 : Ok(async move {
1155 0 : if let Some(barrier) = maybe_barrier {
1156 0 : Self::wait_completion0(barrier).await?;
1157 0 : }
1158 0 : Ok(())
1159 0 : })
1160 0 : }
1161 :
1162 : /// Launch an upload operation in the background; the file is added to be included in next
1163 : /// `index_part.json` upload.
1164 2766 : pub(crate) fn schedule_layer_file_upload(
1165 2766 : self: &Arc<Self>,
1166 2766 : layer: ResidentLayer,
1167 2766 : ) -> Result<(), NotInitialized> {
1168 2766 : let mut guard = self.upload_queue.lock().unwrap();
1169 2766 : let upload_queue = guard.initialized_mut()?;
1170 :
1171 2766 : self.schedule_layer_file_upload0(upload_queue, layer);
1172 2766 : self.launch_queued_tasks(upload_queue);
1173 2766 : Ok(())
1174 2766 : }
1175 :
1176 3490 : fn schedule_layer_file_upload0(
1177 3490 : self: &Arc<Self>,
1178 3490 : upload_queue: &mut UploadQueueInitialized,
1179 3490 : layer: ResidentLayer,
1180 3490 : ) {
1181 3490 : let metadata = layer.metadata();
1182 3490 :
1183 3490 : upload_queue
1184 3490 : .dirty
1185 3490 : .layer_metadata
1186 3490 : .insert(layer.layer_desc().layer_name(), metadata.clone());
1187 3490 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
1188 3490 :
1189 3490 : info!(
1190 : gen=?metadata.generation,
1191 : shard=?metadata.shard,
1192 0 : "scheduled layer file upload {layer}",
1193 : );
1194 :
1195 3490 : let op = UploadOp::UploadLayer(layer, metadata, None);
1196 3490 : self.metric_begin(&op);
1197 3490 : upload_queue.queued_operations.push_back(op);
1198 3490 : }
1199 :
1200 : /// Launch a delete operation in the background.
1201 : ///
1202 : /// The operation does not modify local filesystem state.
1203 : ///
1204 : /// Note: This schedules an index file upload before the deletions. The
1205 : /// deletion won't actually be performed, until all previously scheduled
1206 : /// upload operations, and the index file upload, have completed
1207 : /// successfully.
1208 16 : pub fn schedule_layer_file_deletion(
1209 16 : self: &Arc<Self>,
1210 16 : names: &[LayerName],
1211 16 : ) -> anyhow::Result<()> {
1212 16 : let mut guard = self.upload_queue.lock().unwrap();
1213 16 : let upload_queue = guard.initialized_mut()?;
1214 :
1215 16 : let with_metadata =
1216 16 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
1217 16 :
1218 16 : self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
1219 16 :
1220 16 : // Launch the tasks immediately, if possible
1221 16 : self.launch_queued_tasks(upload_queue);
1222 16 : Ok(())
1223 16 : }
1224 :
1225 : /// Unlinks the layer files from `index_part.json` but does not yet schedule deletion for the
1226 : /// layer files, leaving them dangling.
1227 : ///
1228 : /// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
1229 : /// is invoked on them.
1230 8 : pub(crate) fn schedule_gc_update(
1231 8 : self: &Arc<Self>,
1232 8 : gc_layers: &[Layer],
1233 8 : ) -> Result<(), NotInitialized> {
1234 8 : let mut guard = self.upload_queue.lock().unwrap();
1235 8 : let upload_queue = guard.initialized_mut()?;
1236 :
1237 : // just forget the return value; after uploading the next index_part.json, we can consider
1238 : // the layer files as "dangling". this is fine, at worst case we create work for the
1239 : // scrubber.
1240 :
1241 8 : let names = gc_layers.iter().map(|x| x.layer_desc().layer_name());
1242 8 :
1243 8 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
1244 8 :
1245 8 : self.launch_queued_tasks(upload_queue);
1246 8 :
1247 8 : Ok(())
1248 8 : }
1249 :
1250 : /// Update the remote index file, removing the to-be-deleted files from the index,
1251 : /// allowing scheduling of actual deletions later.
1252 176 : fn schedule_unlinking_of_layers_from_index_part0<I>(
1253 176 : self: &Arc<Self>,
1254 176 : upload_queue: &mut UploadQueueInitialized,
1255 176 : names: I,
1256 176 : ) -> Vec<(LayerName, LayerFileMetadata)>
1257 176 : where
1258 176 : I: IntoIterator<Item = LayerName>,
1259 176 : {
1260 176 : // Decorate our list of names with each name's metadata, dropping
1261 176 : // names that are unexpectedly missing from our metadata. This metadata
1262 176 : // is later used when physically deleting layers, to construct key paths.
1263 176 : let with_metadata: Vec<_> = names
1264 176 : .into_iter()
1265 1024 : .filter_map(|name| {
1266 1024 : let meta = upload_queue.dirty.layer_metadata.remove(&name);
1267 :
1268 1024 : if let Some(meta) = meta {
1269 1024 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
1270 1024 : Some((name, meta))
1271 : } else {
1272 : // This can only happen if we forgot to to schedule the file upload
1273 : // before scheduling the delete. Log it because it is a rare/strange
1274 : // situation, and in case something is misbehaving, we'd like to know which
1275 : // layers experienced this.
1276 0 : info!("Deleting layer {name} not found in latest_files list, never uploaded?");
1277 0 : None
1278 : }
1279 1024 : })
1280 176 : .collect();
1281 :
1282 : #[cfg(feature = "testing")]
1283 1200 : for (name, metadata) in &with_metadata {
1284 1024 : let gen_ = metadata.generation;
1285 1024 : if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen_) {
1286 0 : if unexpected == gen_ {
1287 0 : tracing::error!("{name} was unlinked twice with same generation");
1288 : } else {
1289 0 : tracing::error!(
1290 0 : "{name} was unlinked twice with different generations {gen_:?} and {unexpected:?}"
1291 : );
1292 : }
1293 1024 : }
1294 : }
1295 :
1296 : // after unlinking files from the upload_queue.latest_files we must always schedule an
1297 : // index_part update, because that needs to be uploaded before we can actually delete the
1298 : // files.
1299 176 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
1300 144 : self.schedule_index_upload(upload_queue);
1301 144 : }
1302 :
1303 176 : with_metadata
1304 176 : }
1305 :
1306 : /// Schedules deletion for layer files which have previously been unlinked from the
1307 : /// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
1308 1024 : pub(crate) fn schedule_deletion_of_unlinked(
1309 1024 : self: &Arc<Self>,
1310 1024 : layers: Vec<(LayerName, LayerFileMetadata)>,
1311 1024 : ) -> anyhow::Result<()> {
1312 1024 : let mut guard = self.upload_queue.lock().unwrap();
1313 1024 : let upload_queue = guard.initialized_mut()?;
1314 :
1315 1024 : self.schedule_deletion_of_unlinked0(upload_queue, layers);
1316 1024 : self.launch_queued_tasks(upload_queue);
1317 1024 : Ok(())
1318 1024 : }
1319 :
1320 1036 : fn schedule_deletion_of_unlinked0(
1321 1036 : self: &Arc<Self>,
1322 1036 : upload_queue: &mut UploadQueueInitialized,
1323 1036 : mut with_metadata: Vec<(LayerName, LayerFileMetadata)>,
1324 1036 : ) {
1325 1036 : // Filter out any layers which were not created by this tenant shard. These are
1326 1036 : // layers that originate from some ancestor shard after a split, and may still
1327 1036 : // be referenced by other shards. We are free to delete them locally and remove
1328 1036 : // them from our index (and would have already done so when we reach this point
1329 1036 : // in the code), but we may not delete them remotely.
1330 1036 : with_metadata.retain(|(name, meta)| {
1331 1024 : let retain = meta.shard.shard_number == self.tenant_shard_id.shard_number
1332 1024 : && meta.shard.shard_count == self.tenant_shard_id.shard_count;
1333 1024 : if !retain {
1334 0 : tracing::debug!(
1335 0 : "Skipping deletion of ancestor-shard layer {name}, from shard {}",
1336 : meta.shard
1337 : );
1338 1024 : }
1339 1024 : retain
1340 1036 : });
1341 :
1342 2060 : for (name, meta) in &with_metadata {
1343 1024 : info!(
1344 0 : "scheduling deletion of layer {}{} (shard {})",
1345 0 : name,
1346 0 : meta.generation.get_suffix(),
1347 : meta.shard
1348 : );
1349 : }
1350 :
1351 : #[cfg(feature = "testing")]
1352 2060 : for (name, meta) in &with_metadata {
1353 1024 : let gen_ = meta.generation;
1354 1024 : match upload_queue.dangling_files.remove(name) {
1355 1016 : Some(same) if same == gen_ => { /* expected */ }
1356 0 : Some(other) => {
1357 0 : tracing::error!("{name} was unlinked with {other:?} but deleted with {gen_:?}");
1358 : }
1359 : None => {
1360 8 : tracing::error!("{name} was unlinked but was not dangling");
1361 : }
1362 : }
1363 : }
1364 :
1365 : // schedule the actual deletions
1366 1036 : if with_metadata.is_empty() {
1367 : // avoid scheduling the op & bumping the metric
1368 12 : return;
1369 1024 : }
1370 1024 : let op = UploadOp::Delete(Delete {
1371 1024 : layers: with_metadata,
1372 1024 : });
1373 1024 : self.metric_begin(&op);
1374 1024 : upload_queue.queued_operations.push_back(op);
1375 1036 : }
1376 :
1377 : /// Schedules a compaction update to the remote `index_part.json`.
1378 : ///
1379 : /// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
1380 152 : pub(crate) fn schedule_compaction_update(
1381 152 : self: &Arc<Self>,
1382 152 : compacted_from: &[Layer],
1383 152 : compacted_to: &[ResidentLayer],
1384 152 : ) -> Result<(), NotInitialized> {
1385 152 : let mut guard = self.upload_queue.lock().unwrap();
1386 152 : let upload_queue = guard.initialized_mut()?;
1387 :
1388 876 : for layer in compacted_to {
1389 724 : self.schedule_layer_file_upload0(upload_queue, layer.clone());
1390 724 : }
1391 :
1392 1012 : let names = compacted_from.iter().map(|x| x.layer_desc().layer_name());
1393 152 :
1394 152 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
1395 152 : self.launch_queued_tasks(upload_queue);
1396 152 :
1397 152 : Ok(())
1398 152 : }
1399 :
1400 : /// Wait for all previously scheduled uploads/deletions to complete
1401 2800 : pub(crate) async fn wait_completion(self: &Arc<Self>) -> Result<(), WaitCompletionError> {
1402 2800 : let receiver = {
1403 2800 : let mut guard = self.upload_queue.lock().unwrap();
1404 2800 : let upload_queue = guard
1405 2800 : .initialized_mut()
1406 2800 : .map_err(WaitCompletionError::NotInitialized)?;
1407 2800 : self.schedule_barrier0(upload_queue)
1408 2800 : };
1409 2800 :
1410 2800 : Self::wait_completion0(receiver).await
1411 2800 : }
1412 :
1413 2800 : async fn wait_completion0(
1414 2800 : mut receiver: tokio::sync::watch::Receiver<()>,
1415 2800 : ) -> Result<(), WaitCompletionError> {
1416 2800 : if receiver.changed().await.is_err() {
1417 0 : return Err(WaitCompletionError::UploadQueueShutDownOrStopped);
1418 2800 : }
1419 2800 :
1420 2800 : Ok(())
1421 2800 : }
1422 :
1423 12 : pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
1424 12 : let mut guard = self.upload_queue.lock().unwrap();
1425 12 : let upload_queue = guard.initialized_mut()?;
1426 12 : self.schedule_barrier0(upload_queue);
1427 12 : Ok(())
1428 12 : }
1429 :
1430 2812 : fn schedule_barrier0(
1431 2812 : self: &Arc<Self>,
1432 2812 : upload_queue: &mut UploadQueueInitialized,
1433 2812 : ) -> tokio::sync::watch::Receiver<()> {
1434 2812 : let (sender, receiver) = tokio::sync::watch::channel(());
1435 2812 : let barrier_op = UploadOp::Barrier(sender);
1436 2812 :
1437 2812 : upload_queue.queued_operations.push_back(barrier_op);
1438 2812 : // Don't count this kind of operation!
1439 2812 :
1440 2812 : // Launch the task immediately, if possible
1441 2812 : self.launch_queued_tasks(upload_queue);
1442 2812 :
1443 2812 : receiver
1444 2812 : }
1445 :
1446 : /// Wait for all previously scheduled operations to complete, and then stop.
1447 : ///
1448 : /// Not cancellation safe
1449 20 : pub(crate) async fn shutdown(self: &Arc<Self>) {
1450 20 : // On cancellation the queue is left in ackward state of refusing new operations but
1451 20 : // proper stop is yet to be called. On cancel the original or some later task must call
1452 20 : // `stop` or `shutdown`.
1453 20 : let sg = scopeguard::guard((), |_| {
1454 0 : tracing::error!(
1455 0 : "RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error"
1456 : )
1457 20 : });
1458 :
1459 16 : let fut = {
1460 20 : let mut guard = self.upload_queue.lock().unwrap();
1461 20 : let upload_queue = match &mut *guard {
1462 : UploadQueue::Stopped(_) => {
1463 4 : scopeguard::ScopeGuard::into_inner(sg);
1464 4 : return;
1465 : }
1466 : UploadQueue::Uninitialized => {
1467 : // transition into Stopped state
1468 0 : self.stop_impl(&mut guard);
1469 0 : scopeguard::ScopeGuard::into_inner(sg);
1470 0 : return;
1471 : }
1472 16 : UploadQueue::Initialized(init) => init,
1473 16 : };
1474 16 :
1475 16 : // if the queue is already stuck due to a shutdown operation which was cancelled, then
1476 16 : // just don't add more of these as they would never complete.
1477 16 : //
1478 16 : // TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue
1479 16 : // in every place we would not have to jump through this hoop, and this method could be
1480 16 : // made cancellable.
1481 16 : if !upload_queue.shutting_down {
1482 12 : upload_queue.shutting_down = true;
1483 12 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
1484 12 : // this operation is not counted similar to Barrier
1485 12 :
1486 12 : self.launch_queued_tasks(upload_queue);
1487 12 : }
1488 :
1489 16 : upload_queue.shutdown_ready.clone().acquire_owned()
1490 : };
1491 :
1492 16 : let res = fut.await;
1493 :
1494 16 : scopeguard::ScopeGuard::into_inner(sg);
1495 16 :
1496 16 : match res {
1497 0 : Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
1498 16 : Err(_closed) => {
1499 16 : // expected
1500 16 : }
1501 16 : }
1502 16 :
1503 16 : self.stop();
1504 20 : }
1505 :
1506 : /// Set the deleted_at field in the remote index file.
1507 : ///
1508 : /// This fails if the upload queue has not been `stop()`ed.
1509 : ///
1510 : /// The caller is responsible for calling `stop()` AND for waiting
1511 : /// for any ongoing upload tasks to finish after `stop()` has succeeded.
1512 : /// Check method [`RemoteTimelineClient::stop`] for details.
1513 : #[instrument(skip_all)]
1514 : pub(crate) async fn persist_index_part_with_deleted_flag(
1515 : self: &Arc<Self>,
1516 : ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
1517 : let index_part_with_deleted_at = {
1518 : let mut locked = self.upload_queue.lock().unwrap();
1519 :
1520 : // We must be in stopped state because otherwise
1521 : // we can have inprogress index part upload that can overwrite the file
1522 : // with missing is_deleted flag that we going to set below
1523 : let stopped = locked.stopped_mut()?;
1524 :
1525 : match stopped.deleted_at {
1526 : SetDeletedFlagProgress::NotRunning => (), // proceed
1527 : SetDeletedFlagProgress::InProgress(at) => {
1528 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
1529 : }
1530 : SetDeletedFlagProgress::Successful(at) => {
1531 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
1532 : }
1533 : };
1534 : let deleted_at = Utc::now().naive_utc();
1535 : stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
1536 :
1537 : let mut index_part = stopped.upload_queue_for_deletion.dirty.clone();
1538 : index_part.deleted_at = Some(deleted_at);
1539 : index_part
1540 : };
1541 :
1542 0 : let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
1543 0 : let mut locked = self_clone.upload_queue.lock().unwrap();
1544 0 : let stopped = locked
1545 0 : .stopped_mut()
1546 0 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1547 0 : stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
1548 0 : });
1549 :
1550 : pausable_failpoint!("persist_deleted_index_part");
1551 :
1552 : backoff::retry(
1553 0 : || {
1554 0 : upload::upload_index_part(
1555 0 : &self.storage_impl,
1556 0 : &self.tenant_shard_id,
1557 0 : &self.timeline_id,
1558 0 : self.generation,
1559 0 : &index_part_with_deleted_at,
1560 0 : &self.cancel,
1561 0 : )
1562 0 : },
1563 0 : |_e| false,
1564 : 1,
1565 : // have just a couple of attempts
1566 : // when executed as part of timeline deletion this happens in context of api call
1567 : // when executed as part of tenant deletion this happens in the background
1568 : 2,
1569 : "persist_index_part_with_deleted_flag",
1570 : &self.cancel,
1571 : )
1572 : .await
1573 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1574 0 : .and_then(|x| x)?;
1575 :
1576 : // all good, disarm the guard and mark as success
1577 : ScopeGuard::into_inner(undo_deleted_at);
1578 : {
1579 : let mut locked = self.upload_queue.lock().unwrap();
1580 :
1581 : let stopped = locked
1582 : .stopped_mut()
1583 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1584 : stopped.deleted_at = SetDeletedFlagProgress::Successful(
1585 : index_part_with_deleted_at
1586 : .deleted_at
1587 : .expect("we set it above"),
1588 : );
1589 : }
1590 :
1591 : Ok(())
1592 : }
1593 :
1594 0 : pub(crate) fn is_deleting(&self) -> bool {
1595 0 : let mut locked = self.upload_queue.lock().unwrap();
1596 0 : locked.stopped_mut().is_ok()
1597 0 : }
1598 :
1599 0 : pub(crate) async fn preserve_initdb_archive(
1600 0 : self: &Arc<Self>,
1601 0 : tenant_id: &TenantId,
1602 0 : timeline_id: &TimelineId,
1603 0 : cancel: &CancellationToken,
1604 0 : ) -> anyhow::Result<()> {
1605 0 : backoff::retry(
1606 0 : || async {
1607 0 : upload::preserve_initdb_archive(&self.storage_impl, tenant_id, timeline_id, cancel)
1608 0 : .await
1609 0 : },
1610 0 : TimeoutOrCancel::caused_by_cancel,
1611 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
1612 0 : FAILED_REMOTE_OP_RETRIES,
1613 0 : "preserve_initdb_tar_zst",
1614 0 : &cancel.clone(),
1615 0 : )
1616 0 : .await
1617 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1618 0 : .and_then(|x| x)
1619 0 : .context("backing up initdb archive")?;
1620 0 : Ok(())
1621 0 : }
1622 :
1623 : /// Uploads the given layer **without** adding it to be part of a future `index_part.json` upload.
1624 : ///
1625 : /// This is not normally needed.
1626 0 : pub(crate) async fn upload_layer_file(
1627 0 : self: &Arc<Self>,
1628 0 : uploaded: &ResidentLayer,
1629 0 : cancel: &CancellationToken,
1630 0 : ) -> anyhow::Result<()> {
1631 0 : let remote_path = remote_layer_path(
1632 0 : &self.tenant_shard_id.tenant_id,
1633 0 : &self.timeline_id,
1634 0 : uploaded.metadata().shard,
1635 0 : &uploaded.layer_desc().layer_name(),
1636 0 : uploaded.metadata().generation,
1637 0 : );
1638 0 :
1639 0 : backoff::retry(
1640 0 : || async {
1641 0 : upload::upload_timeline_layer(
1642 0 : &self.storage_impl,
1643 0 : uploaded.local_path(),
1644 0 : &remote_path,
1645 0 : uploaded.metadata().file_size,
1646 0 : cancel,
1647 0 : )
1648 0 : .await
1649 0 : },
1650 0 : TimeoutOrCancel::caused_by_cancel,
1651 0 : FAILED_UPLOAD_WARN_THRESHOLD,
1652 0 : FAILED_REMOTE_OP_RETRIES,
1653 0 : "upload a layer without adding it to latest files",
1654 0 : cancel,
1655 0 : )
1656 0 : .await
1657 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1658 0 : .and_then(|x| x)
1659 0 : .context("upload a layer without adding it to latest files")
1660 0 : }
1661 :
1662 : /// Copies the `adopted` remote existing layer to the remote path of `adopted_as`. The layer is
1663 : /// not added to be part of a future `index_part.json` upload.
1664 0 : pub(crate) async fn copy_timeline_layer(
1665 0 : self: &Arc<Self>,
1666 0 : adopted: &Layer,
1667 0 : adopted_as: &Layer,
1668 0 : cancel: &CancellationToken,
1669 0 : ) -> anyhow::Result<()> {
1670 0 : let source_remote_path = remote_layer_path(
1671 0 : &self.tenant_shard_id.tenant_id,
1672 0 : &adopted
1673 0 : .get_timeline_id()
1674 0 : .expect("Source timeline should be alive"),
1675 0 : adopted.metadata().shard,
1676 0 : &adopted.layer_desc().layer_name(),
1677 0 : adopted.metadata().generation,
1678 0 : );
1679 0 :
1680 0 : let target_remote_path = remote_layer_path(
1681 0 : &self.tenant_shard_id.tenant_id,
1682 0 : &self.timeline_id,
1683 0 : adopted_as.metadata().shard,
1684 0 : &adopted_as.layer_desc().layer_name(),
1685 0 : adopted_as.metadata().generation,
1686 0 : );
1687 0 :
1688 0 : backoff::retry(
1689 0 : || async {
1690 0 : upload::copy_timeline_layer(
1691 0 : &self.storage_impl,
1692 0 : &source_remote_path,
1693 0 : &target_remote_path,
1694 0 : cancel,
1695 0 : )
1696 0 : .await
1697 0 : },
1698 0 : TimeoutOrCancel::caused_by_cancel,
1699 0 : FAILED_UPLOAD_WARN_THRESHOLD,
1700 0 : FAILED_REMOTE_OP_RETRIES,
1701 0 : "copy timeline layer",
1702 0 : cancel,
1703 0 : )
1704 0 : .await
1705 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1706 0 : .and_then(|x| x)
1707 0 : .context("remote copy timeline layer")
1708 0 : }
1709 :
1710 0 : async fn flush_deletion_queue(&self) -> Result<(), DeletionQueueError> {
1711 0 : match tokio::time::timeout(
1712 0 : DELETION_QUEUE_FLUSH_TIMEOUT,
1713 0 : self.deletion_queue_client.flush_immediate(),
1714 0 : )
1715 0 : .await
1716 : {
1717 0 : Ok(result) => result,
1718 0 : Err(_timeout) => {
1719 0 : // Flushing remote deletions is not mandatory: we flush here to make the system easier to test, and
1720 0 : // to ensure that _usually_ objects are really gone after a DELETE is acked. However, in case of deletion
1721 0 : // queue issues (https://github.com/neondatabase/neon/issues/6440), we don't want to wait indefinitely here.
1722 0 : tracing::warn!(
1723 0 : "Timed out waiting for deletion queue flush, acking deletion anyway"
1724 : );
1725 0 : Ok(())
1726 : }
1727 : }
1728 0 : }
1729 :
1730 : /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
1731 : /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
1732 : /// deletes leaked files if any and proceeds with deletion of index file at the end.
1733 0 : pub(crate) async fn delete_all(self: &Arc<Self>) -> Result<(), DeleteTimelineError> {
1734 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1735 :
1736 0 : let layers: Vec<RemotePath> = {
1737 0 : let mut locked = self.upload_queue.lock().unwrap();
1738 0 : let stopped = locked.stopped_mut().map_err(DeleteTimelineError::Other)?;
1739 :
1740 0 : if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
1741 0 : return Err(DeleteTimelineError::Other(anyhow::anyhow!(
1742 0 : "deleted_at is not set"
1743 0 : )));
1744 0 : }
1745 0 :
1746 0 : debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
1747 :
1748 0 : stopped
1749 0 : .upload_queue_for_deletion
1750 0 : .dirty
1751 0 : .layer_metadata
1752 0 : .drain()
1753 0 : .filter(|(_file_name, meta)| {
1754 0 : // Filter out layers that belonged to an ancestor shard. Since we are deleting the whole timeline from
1755 0 : // all shards anyway, we _could_ delete these, but
1756 0 : // - it creates a potential race if other shards are still
1757 0 : // using the layers while this shard deletes them.
1758 0 : // - it means that if we rolled back the shard split, the ancestor shards would be in a state where
1759 0 : // these timelines are present but corrupt (their index exists but some layers don't)
1760 0 : //
1761 0 : // These layers will eventually be cleaned up by the scrubber when it does physical GC.
1762 0 : meta.shard.shard_number == self.tenant_shard_id.shard_number
1763 0 : && meta.shard.shard_count == self.tenant_shard_id.shard_count
1764 0 : })
1765 0 : .map(|(file_name, meta)| {
1766 0 : remote_layer_path(
1767 0 : &self.tenant_shard_id.tenant_id,
1768 0 : &self.timeline_id,
1769 0 : meta.shard,
1770 0 : &file_name,
1771 0 : meta.generation,
1772 0 : )
1773 0 : })
1774 0 : .collect()
1775 0 : };
1776 0 :
1777 0 : let layer_deletion_count = layers.len();
1778 0 : self.deletion_queue_client
1779 0 : .push_immediate(layers)
1780 0 : .await
1781 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1782 :
1783 : // Delete the initdb.tar.zst, which is not always present, but deletion attempts of
1784 : // inexistant objects are not considered errors.
1785 0 : let initdb_path =
1786 0 : remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id);
1787 0 : self.deletion_queue_client
1788 0 : .push_immediate(vec![initdb_path])
1789 0 : .await
1790 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1791 :
1792 : // Do not delete index part yet, it is needed for possible retry. If we remove it first
1793 : // and retry will arrive to different pageserver there wont be any traces of it on remote storage
1794 0 : let timeline_storage_path = remote_timeline_path(&self.tenant_shard_id, &self.timeline_id);
1795 0 :
1796 0 : // Execute all pending deletions, so that when we proceed to do a listing below, we aren't
1797 0 : // taking the burden of listing all the layers that we already know we should delete.
1798 0 : self.flush_deletion_queue()
1799 0 : .await
1800 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1801 :
1802 0 : let cancel = shutdown_token();
1803 :
1804 0 : let remaining = download_retry(
1805 0 : || async {
1806 0 : self.storage_impl
1807 0 : .list(
1808 0 : Some(&timeline_storage_path),
1809 0 : ListingMode::NoDelimiter,
1810 0 : None,
1811 0 : &cancel,
1812 0 : )
1813 0 : .await
1814 0 : },
1815 0 : "list remaining files",
1816 0 : &cancel,
1817 0 : )
1818 0 : .await
1819 0 : .context("list files remaining files")?
1820 : .keys;
1821 :
1822 : // We will delete the current index_part object last, since it acts as a deletion
1823 : // marker via its deleted_at attribute
1824 0 : let latest_index = remaining
1825 0 : .iter()
1826 0 : .filter(|o| {
1827 0 : o.key
1828 0 : .object_name()
1829 0 : .map(|n| n.starts_with(IndexPart::FILE_NAME))
1830 0 : .unwrap_or(false)
1831 0 : })
1832 0 : .filter_map(|o| {
1833 0 : parse_remote_index_path(o.key.clone()).map(|gen_| (o.key.clone(), gen_))
1834 0 : })
1835 0 : .max_by_key(|i| i.1)
1836 0 : .map(|i| i.0.clone())
1837 0 : .unwrap_or(
1838 0 : // No generation-suffixed indices, assume we are dealing with
1839 0 : // a legacy index.
1840 0 : remote_index_path(&self.tenant_shard_id, &self.timeline_id, Generation::none()),
1841 0 : );
1842 0 :
1843 0 : let remaining_layers: Vec<RemotePath> = remaining
1844 0 : .into_iter()
1845 0 : .filter_map(|o| {
1846 0 : if o.key == latest_index || o.key.object_name() == Some(INITDB_PRESERVED_PATH) {
1847 0 : None
1848 : } else {
1849 0 : Some(o.key)
1850 : }
1851 0 : })
1852 0 : .inspect(|path| {
1853 0 : if let Some(name) = path.object_name() {
1854 0 : info!(%name, "deleting a file not referenced from index_part.json");
1855 : } else {
1856 0 : warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
1857 : }
1858 0 : })
1859 0 : .collect();
1860 0 :
1861 0 : let not_referenced_count = remaining_layers.len();
1862 0 : if !remaining_layers.is_empty() {
1863 0 : self.deletion_queue_client
1864 0 : .push_immediate(remaining_layers)
1865 0 : .await
1866 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1867 0 : }
1868 :
1869 0 : fail::fail_point!("timeline-delete-before-index-delete", |_| {
1870 0 : Err(DeleteTimelineError::Other(anyhow::anyhow!(
1871 0 : "failpoint: timeline-delete-before-index-delete"
1872 0 : )))?
1873 0 : });
1874 :
1875 0 : debug!("enqueuing index part deletion");
1876 0 : self.deletion_queue_client
1877 0 : .push_immediate([latest_index].to_vec())
1878 0 : .await
1879 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1880 :
1881 : // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
1882 : // for a flush to a persistent deletion list so that we may be sure deletion will occur.
1883 0 : self.flush_deletion_queue()
1884 0 : .await
1885 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1886 :
1887 0 : fail::fail_point!("timeline-delete-after-index-delete", |_| {
1888 0 : Err(DeleteTimelineError::Other(anyhow::anyhow!(
1889 0 : "failpoint: timeline-delete-after-index-delete"
1890 0 : )))?
1891 0 : });
1892 :
1893 0 : info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
1894 :
1895 0 : Ok(())
1896 0 : }
1897 :
1898 : /// Pick next tasks from the queue, and start as many of them as possible without violating
1899 : /// the ordering constraints.
1900 : ///
1901 : /// TODO: consider limiting the number of in-progress tasks, beyond what remote_storage does.
1902 : /// This can launch an unbounded number of queued tasks. `UploadQueue::next_ready()` also has
1903 : /// worst-case quadratic cost in the number of tasks, and may struggle beyond 10,000 tasks.
1904 16619 : fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
1905 26595 : while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() {
1906 9976 : debug!("starting op: {next_op}");
1907 :
1908 : // Prepare upload.
1909 9976 : match &mut next_op {
1910 3490 : UploadOp::UploadLayer(layer, meta, mode) => {
1911 3490 : if upload_queue
1912 3490 : .recently_deleted
1913 3490 : .remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
1914 0 : {
1915 0 : *mode = Some(OpType::FlushDeletion);
1916 0 : } else {
1917 3490 : *mode = Some(OpType::MayReorder)
1918 : }
1919 : }
1920 3018 : UploadOp::UploadMetadata { .. } => {}
1921 656 : UploadOp::Delete(Delete { layers }) => {
1922 1312 : for (name, meta) in layers {
1923 656 : upload_queue
1924 656 : .recently_deleted
1925 656 : .insert((name.clone(), meta.generation));
1926 656 : }
1927 : }
1928 2812 : UploadOp::Barrier(sender) => {
1929 2812 : sender.send_replace(());
1930 2812 : continue;
1931 : }
1932 0 : UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
1933 : };
1934 :
1935 : // Assign unique ID to this task
1936 7164 : upload_queue.task_counter += 1;
1937 7164 : let upload_task_id = upload_queue.task_counter;
1938 7164 :
1939 7164 : // Add it to the in-progress map
1940 7164 : let task = Arc::new(UploadTask {
1941 7164 : task_id: upload_task_id,
1942 7164 : op: next_op,
1943 7164 : coalesced_ops,
1944 7164 : retries: AtomicU32::new(0),
1945 7164 : });
1946 7164 : upload_queue
1947 7164 : .inprogress_tasks
1948 7164 : .insert(task.task_id, Arc::clone(&task));
1949 7164 :
1950 7164 : // Spawn task to perform the task
1951 7164 : let self_rc = Arc::clone(self);
1952 7164 : let tenant_shard_id = self.tenant_shard_id;
1953 7164 : let timeline_id = self.timeline_id;
1954 7164 : task_mgr::spawn(
1955 7164 : &self.runtime,
1956 7164 : TaskKind::RemoteUploadTask,
1957 7164 : self.tenant_shard_id,
1958 7164 : Some(self.timeline_id),
1959 7164 : "remote upload",
1960 7023 : async move {
1961 7023 : self_rc.perform_upload_task(task).await;
1962 6735 : Ok(())
1963 6735 : }
1964 7164 : .instrument(info_span!(parent: None, "remote_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, %upload_task_id)),
1965 : );
1966 :
1967 : // Loop back to process next task
1968 : }
1969 16619 : }
1970 :
1971 : ///
1972 : /// Perform an upload task.
1973 : ///
1974 : /// The task is in the `inprogress_tasks` list. This function will try to
1975 : /// execute it, retrying forever. On successful completion, the task is
1976 : /// removed it from the `inprogress_tasks` list, and any next task(s) in the
1977 : /// queue that were waiting by the completion are launched.
1978 : ///
1979 : /// The task can be shut down, however. That leads to stopping the whole
1980 : /// queue.
1981 : ///
1982 7023 : async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
1983 7023 : let cancel = shutdown_token();
1984 : // Loop to retry until it completes.
1985 : loop {
1986 : // If we're requested to shut down, close up shop and exit.
1987 : //
1988 : // Note: We only check for the shutdown requests between retries, so
1989 : // if a shutdown request arrives while we're busy uploading, in the
1990 : // upload::upload:*() call below, we will wait not exit until it has
1991 : // finished. We probably could cancel the upload by simply dropping
1992 : // the Future, but we're not 100% sure if the remote storage library
1993 : // is cancellation safe, so we don't dare to do that. Hopefully, the
1994 : // upload finishes or times out soon enough.
1995 7023 : if cancel.is_cancelled() {
1996 0 : info!("upload task cancelled by shutdown request");
1997 0 : self.stop();
1998 0 : return;
1999 7023 : }
2000 7023 :
2001 7023 : // Assert that we don't modify a layer that's referenced by the current index.
2002 7023 : if cfg!(debug_assertions) {
2003 7023 : let modified = match &task.op {
2004 3371 : UploadOp::UploadLayer(layer, layer_metadata, _) => {
2005 3371 : vec![(layer.layer_desc().layer_name(), layer_metadata)]
2006 : }
2007 656 : UploadOp::Delete(delete) => {
2008 656 : delete.layers.iter().map(|(n, m)| (n.clone(), m)).collect()
2009 : }
2010 : // These don't modify layers.
2011 2996 : UploadOp::UploadMetadata { .. } => Vec::new(),
2012 0 : UploadOp::Barrier(_) => Vec::new(),
2013 0 : UploadOp::Shutdown => Vec::new(),
2014 : };
2015 7023 : if let Ok(queue) = self.upload_queue.lock().unwrap().initialized_mut() {
2016 11042 : for (ref name, metadata) in modified {
2017 4027 : debug_assert!(
2018 4027 : !queue.clean.0.references(name, metadata),
2019 8 : "layer {name} modified while referenced by index",
2020 : );
2021 : }
2022 0 : }
2023 0 : }
2024 :
2025 7015 : let upload_result: anyhow::Result<()> = match &task.op {
2026 3371 : UploadOp::UploadLayer(layer, layer_metadata, mode) => {
2027 : // TODO: check if this mechanism can be removed now that can_bypass() performs
2028 : // conflict checks during scheduling.
2029 3371 : if let Some(OpType::FlushDeletion) = mode {
2030 0 : if self.config.read().unwrap().block_deletions {
2031 : // Of course, this is not efficient... but usually the queue should be empty.
2032 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
2033 0 : let mut detected = false;
2034 0 : if let Ok(queue) = queue_locked.initialized_mut() {
2035 0 : for list in queue.blocked_deletions.iter_mut() {
2036 0 : list.layers.retain(|(name, meta)| {
2037 0 : if name == &layer.layer_desc().layer_name()
2038 0 : && meta.generation == layer_metadata.generation
2039 : {
2040 0 : detected = true;
2041 0 : // remove the layer from deletion queue
2042 0 : false
2043 : } else {
2044 : // keep the layer
2045 0 : true
2046 : }
2047 0 : });
2048 0 : }
2049 0 : }
2050 0 : if detected {
2051 0 : info!(
2052 0 : "cancelled blocked deletion of layer {} at gen {:?}",
2053 0 : layer.layer_desc().layer_name(),
2054 : layer_metadata.generation
2055 : );
2056 0 : }
2057 : } else {
2058 : // TODO: we did not guarantee that upload task starts after deletion task, so there could be possibly race conditions
2059 : // that we still get the layer deleted. But this only happens if someone creates a layer immediately after it's deleted,
2060 : // which is not possible in the current system.
2061 0 : info!(
2062 0 : "waiting for deletion queue flush to complete before uploading layer {} at gen {:?}",
2063 0 : layer.layer_desc().layer_name(),
2064 : layer_metadata.generation
2065 : );
2066 : {
2067 : // We are going to flush, we can clean up the recently deleted list.
2068 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
2069 0 : if let Ok(queue) = queue_locked.initialized_mut() {
2070 0 : queue.recently_deleted.clear();
2071 0 : }
2072 : }
2073 0 : if let Err(e) = self.deletion_queue_client.flush_execute().await {
2074 0 : warn!(
2075 0 : "failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ",
2076 0 : layer.layer_desc().layer_name(),
2077 : layer_metadata.generation
2078 : );
2079 : } else {
2080 0 : info!(
2081 0 : "done flushing deletion queue before uploading layer {} at gen {:?}",
2082 0 : layer.layer_desc().layer_name(),
2083 : layer_metadata.generation
2084 : );
2085 : }
2086 : }
2087 3371 : }
2088 3371 : let local_path = layer.local_path();
2089 3371 :
2090 3371 : // We should only be uploading layers created by this `Tenant`'s lifetime, so
2091 3371 : // the metadata in the upload should always match our current generation.
2092 3371 : assert_eq!(layer_metadata.generation, self.generation);
2093 :
2094 3371 : let remote_path = remote_layer_path(
2095 3371 : &self.tenant_shard_id.tenant_id,
2096 3371 : &self.timeline_id,
2097 3371 : layer_metadata.shard,
2098 3371 : &layer.layer_desc().layer_name(),
2099 3371 : layer_metadata.generation,
2100 3371 : );
2101 3371 :
2102 3371 : upload::upload_timeline_layer(
2103 3371 : &self.storage_impl,
2104 3371 : local_path,
2105 3371 : &remote_path,
2106 3371 : layer_metadata.file_size,
2107 3371 : &self.cancel,
2108 3371 : )
2109 3371 : .measure_remote_op(
2110 3371 : RemoteOpFileKind::Layer,
2111 3371 : RemoteOpKind::Upload,
2112 3371 : Arc::clone(&self.metrics),
2113 3371 : )
2114 3371 : .await
2115 : }
2116 2996 : UploadOp::UploadMetadata { uploaded } => {
2117 2996 : let res = upload::upload_index_part(
2118 2996 : &self.storage_impl,
2119 2996 : &self.tenant_shard_id,
2120 2996 : &self.timeline_id,
2121 2996 : self.generation,
2122 2996 : uploaded,
2123 2996 : &self.cancel,
2124 2996 : )
2125 2996 : .measure_remote_op(
2126 2996 : RemoteOpFileKind::Index,
2127 2996 : RemoteOpKind::Upload,
2128 2996 : Arc::clone(&self.metrics),
2129 2996 : )
2130 2996 : .await;
2131 2977 : if res.is_ok() {
2132 2977 : self.update_remote_physical_size_gauge(Some(uploaded));
2133 2977 : let mention_having_future_layers = if cfg!(feature = "testing") {
2134 2977 : uploaded
2135 2977 : .layer_metadata
2136 2977 : .keys()
2137 35435 : .any(|x| x.is_in_future(uploaded.metadata.disk_consistent_lsn()))
2138 : } else {
2139 0 : false
2140 : };
2141 2977 : if mention_having_future_layers {
2142 : // find rationale near crate::tenant::timeline::init::cleanup_future_layer
2143 122 : tracing::info!(
2144 0 : disk_consistent_lsn = %uploaded.metadata.disk_consistent_lsn(),
2145 0 : "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup"
2146 : );
2147 2855 : }
2148 0 : }
2149 2977 : res
2150 : }
2151 648 : UploadOp::Delete(delete) => {
2152 648 : if self.config.read().unwrap().block_deletions {
2153 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
2154 0 : if let Ok(queue) = queue_locked.initialized_mut() {
2155 0 : queue.blocked_deletions.push(delete.clone());
2156 0 : }
2157 0 : Ok(())
2158 : } else {
2159 648 : pausable_failpoint!("before-delete-layer-pausable");
2160 648 : self.deletion_queue_client
2161 648 : .push_layers(
2162 648 : self.tenant_shard_id,
2163 648 : self.timeline_id,
2164 648 : self.generation,
2165 648 : delete.layers.clone(),
2166 648 : )
2167 648 : .map_err(|e| anyhow::anyhow!(e))
2168 : }
2169 : }
2170 0 : unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
2171 : // unreachable. Barrier operations are handled synchronously in
2172 : // launch_queued_tasks
2173 0 : warn!("unexpected {unexpected:?} operation in perform_upload_task");
2174 0 : break;
2175 : }
2176 : };
2177 :
2178 0 : match upload_result {
2179 : Ok(()) => {
2180 6735 : break;
2181 : }
2182 0 : Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => {
2183 0 : // loop around to do the proper stopping
2184 0 : continue;
2185 : }
2186 0 : Err(e) => {
2187 0 : let retries = task.retries.fetch_add(1, Ordering::SeqCst);
2188 0 :
2189 0 : // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
2190 0 : // or other external reasons. Such issues are relatively regular, so log them
2191 0 : // at info level at first, and only WARN if the operation fails repeatedly.
2192 0 : //
2193 0 : // (See similar logic for downloads in `download::download_retry`)
2194 0 : if retries < FAILED_UPLOAD_WARN_THRESHOLD {
2195 0 : info!(
2196 0 : "failed to perform remote task {}, will retry (attempt {}): {:#}",
2197 0 : task.op, retries, e
2198 : );
2199 : } else {
2200 0 : warn!(
2201 0 : "failed to perform remote task {}, will retry (attempt {}): {:?}",
2202 0 : task.op, retries, e
2203 : );
2204 : }
2205 :
2206 : // sleep until it's time to retry, or we're cancelled
2207 0 : exponential_backoff(
2208 0 : retries,
2209 0 : DEFAULT_BASE_BACKOFF_SECONDS,
2210 0 : DEFAULT_MAX_BACKOFF_SECONDS,
2211 0 : &cancel,
2212 0 : )
2213 0 : .await;
2214 : }
2215 : }
2216 : }
2217 :
2218 6735 : let retries = task.retries.load(Ordering::SeqCst);
2219 6735 : if retries > 0 {
2220 0 : info!(
2221 0 : "remote task {} completed successfully after {} retries",
2222 0 : task.op, retries
2223 : );
2224 : } else {
2225 6735 : debug!("remote task {} completed successfully", task.op);
2226 : }
2227 :
2228 : // The task has completed successfully. Remove it from the in-progress list.
2229 6735 : let lsn_update = {
2230 6735 : let mut upload_queue_guard = self.upload_queue.lock().unwrap();
2231 6735 : let upload_queue = match upload_queue_guard.deref_mut() {
2232 0 : UploadQueue::Uninitialized => panic!(
2233 0 : "callers are responsible for ensuring this is only called on an initialized queue"
2234 0 : ),
2235 0 : UploadQueue::Stopped(_stopped) => None,
2236 6735 : UploadQueue::Initialized(qi) => Some(qi),
2237 : };
2238 :
2239 6735 : let upload_queue = match upload_queue {
2240 6735 : Some(upload_queue) => upload_queue,
2241 : None => {
2242 0 : info!("another concurrent task already stopped the queue");
2243 0 : return;
2244 : }
2245 : };
2246 :
2247 6735 : upload_queue.inprogress_tasks.remove(&task.task_id);
2248 :
2249 6735 : let lsn_update = match task.op {
2250 3110 : UploadOp::UploadLayer(_, _, _) => None,
2251 2977 : UploadOp::UploadMetadata { ref uploaded } => {
2252 2977 : // the task id is reused as a monotonicity check for storing the "clean"
2253 2977 : // IndexPart.
2254 2977 : let last_updater = upload_queue.clean.1;
2255 2977 : let is_later = last_updater.is_some_and(|task_id| task_id < task.task_id);
2256 2977 : let monotone = is_later || last_updater.is_none();
2257 :
2258 2977 : assert!(
2259 2977 : monotone,
2260 0 : "no two index uploads should be completing at the same time, prev={last_updater:?}, task.task_id={}",
2261 0 : task.task_id
2262 : );
2263 :
2264 : // not taking ownership is wasteful
2265 2977 : upload_queue.clean.0.clone_from(uploaded);
2266 2977 : upload_queue.clean.1 = Some(task.task_id);
2267 2977 :
2268 2977 : let lsn = upload_queue.clean.0.metadata.disk_consistent_lsn();
2269 2977 : self.metrics
2270 2977 : .projected_remote_consistent_lsn_gauge
2271 2977 : .set(lsn.0);
2272 2977 :
2273 2977 : if self.generation.is_none() {
2274 : // Legacy mode: skip validating generation
2275 0 : upload_queue.visible_remote_consistent_lsn.store(lsn);
2276 0 : None
2277 2977 : } else if self
2278 2977 : .config
2279 2977 : .read()
2280 2977 : .unwrap()
2281 2977 : .process_remote_consistent_lsn_updates
2282 : {
2283 2977 : Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
2284 : } else {
2285 : // Our config disables remote_consistent_lsn updates: drop it.
2286 0 : None
2287 : }
2288 : }
2289 648 : UploadOp::Delete(_) => None,
2290 0 : UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
2291 : };
2292 :
2293 : // Launch any queued tasks that were unblocked by this one.
2294 6735 : self.launch_queued_tasks(upload_queue);
2295 6735 : lsn_update
2296 : };
2297 :
2298 6735 : if let Some((lsn, slot)) = lsn_update {
2299 : // Updates to the remote_consistent_lsn we advertise to pageservers
2300 : // are all routed through the DeletionQueue, to enforce important
2301 : // data safety guarantees (see docs/rfcs/025-generation-numbers.md)
2302 2977 : self.deletion_queue_client
2303 2977 : .update_remote_consistent_lsn(
2304 2977 : self.tenant_shard_id,
2305 2977 : self.timeline_id,
2306 2977 : self.generation,
2307 2977 : lsn,
2308 2977 : slot,
2309 2977 : )
2310 2977 : .await;
2311 3758 : }
2312 :
2313 6735 : self.metric_end(&task.op);
2314 6735 : for coalesced_op in &task.coalesced_ops {
2315 2 : self.metric_end(coalesced_op);
2316 2 : }
2317 6735 : }
2318 :
2319 14361 : fn metric_impl(
2320 14361 : &self,
2321 14361 : op: &UploadOp,
2322 14361 : ) -> Option<(
2323 14361 : RemoteOpFileKind,
2324 14361 : RemoteOpKind,
2325 14361 : RemoteTimelineClientMetricsCallTrackSize,
2326 14361 : )> {
2327 : use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
2328 14361 : let res = match op {
2329 6600 : UploadOp::UploadLayer(_, m, _) => (
2330 6600 : RemoteOpFileKind::Layer,
2331 6600 : RemoteOpKind::Upload,
2332 6600 : RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
2333 6600 : ),
2334 6073 : UploadOp::UploadMetadata { .. } => (
2335 6073 : RemoteOpFileKind::Index,
2336 6073 : RemoteOpKind::Upload,
2337 6073 : DontTrackSize {
2338 6073 : reason: "metadata uploads are tiny",
2339 6073 : },
2340 6073 : ),
2341 1672 : UploadOp::Delete(_delete) => (
2342 1672 : RemoteOpFileKind::Layer,
2343 1672 : RemoteOpKind::Delete,
2344 1672 : DontTrackSize {
2345 1672 : reason: "should we track deletes? positive or negative sign?",
2346 1672 : },
2347 1672 : ),
2348 : UploadOp::Barrier(..) | UploadOp::Shutdown => {
2349 : // we do not account these
2350 16 : return None;
2351 : }
2352 : };
2353 14345 : Some(res)
2354 14361 : }
2355 :
2356 7608 : fn metric_begin(&self, op: &UploadOp) {
2357 7608 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
2358 7608 : Some(x) => x,
2359 0 : None => return,
2360 : };
2361 7608 : let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
2362 7608 : guard.will_decrement_manually(); // in metric_end(), see right below
2363 7608 : }
2364 :
2365 6753 : fn metric_end(&self, op: &UploadOp) {
2366 6753 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
2367 6737 : Some(x) => x,
2368 16 : None => return,
2369 : };
2370 6737 : self.metrics.call_end(&file_kind, &op_kind, track_bytes);
2371 6753 : }
2372 :
2373 : /// Close the upload queue for new operations and cancel queued operations.
2374 : ///
2375 : /// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
2376 : ///
2377 : /// In-progress operations will still be running after this function returns.
2378 : /// Use `task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), Some(self.tenant_shard_id), Some(timeline_id))`
2379 : /// to wait for them to complete, after calling this function.
2380 36 : pub(crate) fn stop(&self) {
2381 36 : // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
2382 36 : // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
2383 36 : // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
2384 36 : let mut guard = self.upload_queue.lock().unwrap();
2385 36 : self.stop_impl(&mut guard);
2386 36 : }
2387 :
2388 36 : fn stop_impl(&self, guard: &mut std::sync::MutexGuard<UploadQueue>) {
2389 36 : match &mut **guard {
2390 : UploadQueue::Uninitialized => {
2391 0 : info!("UploadQueue is in state Uninitialized, nothing to do");
2392 0 : **guard = UploadQueue::Stopped(UploadQueueStopped::Uninitialized);
2393 : }
2394 : UploadQueue::Stopped(_) => {
2395 : // nothing to do
2396 16 : info!("another concurrent task already shut down the queue");
2397 : }
2398 20 : UploadQueue::Initialized(initialized) => {
2399 20 : info!("shutting down upload queue");
2400 :
2401 : // Replace the queue with the Stopped state, taking ownership of the old
2402 : // Initialized queue. We will do some checks on it, and then drop it.
2403 20 : let qi = {
2404 : // Here we preserve working version of the upload queue for possible use during deletions.
2405 : // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
2406 : // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
2407 : // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
2408 20 : let upload_queue_for_deletion = UploadQueueInitialized {
2409 20 : inprogress_limit: initialized.inprogress_limit,
2410 20 : task_counter: 0,
2411 20 : dirty: initialized.dirty.clone(),
2412 20 : clean: initialized.clean.clone(),
2413 20 : latest_files_changes_since_metadata_upload_scheduled: 0,
2414 20 : visible_remote_consistent_lsn: initialized
2415 20 : .visible_remote_consistent_lsn
2416 20 : .clone(),
2417 20 : inprogress_tasks: HashMap::default(),
2418 20 : queued_operations: VecDeque::default(),
2419 20 : #[cfg(feature = "testing")]
2420 20 : dangling_files: HashMap::default(),
2421 20 : blocked_deletions: Vec::new(),
2422 20 : shutting_down: false,
2423 20 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
2424 20 : recently_deleted: HashSet::new(),
2425 20 : };
2426 20 :
2427 20 : let upload_queue = std::mem::replace(
2428 20 : &mut **guard,
2429 20 : UploadQueue::Stopped(UploadQueueStopped::Deletable(
2430 20 : UploadQueueStoppedDeletable {
2431 20 : upload_queue_for_deletion,
2432 20 : deleted_at: SetDeletedFlagProgress::NotRunning,
2433 20 : },
2434 20 : )),
2435 20 : );
2436 20 : if let UploadQueue::Initialized(qi) = upload_queue {
2437 20 : qi
2438 : } else {
2439 0 : unreachable!("we checked in the match above that it is Initialized");
2440 : }
2441 : };
2442 :
2443 : // We don't need to do anything here for in-progress tasks. They will finish
2444 : // on their own, decrement the unfinished-task counter themselves, and observe
2445 : // that the queue is Stopped.
2446 20 : drop(qi.inprogress_tasks);
2447 :
2448 : // Tear down queued ops
2449 20 : for op in qi.queued_operations.into_iter() {
2450 16 : self.metric_end(&op);
2451 16 : // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
2452 16 : // which is exactly what we want to happen.
2453 16 : drop(op);
2454 16 : }
2455 : }
2456 : }
2457 36 : }
2458 :
2459 : /// Returns an accessor which will hold the UploadQueue mutex for accessing the upload queue
2460 : /// externally to RemoteTimelineClient.
2461 0 : pub(crate) fn initialized_upload_queue(
2462 0 : &self,
2463 0 : ) -> Result<UploadQueueAccessor<'_>, NotInitialized> {
2464 0 : let mut inner = self.upload_queue.lock().unwrap();
2465 0 : inner.initialized_mut()?;
2466 0 : Ok(UploadQueueAccessor { inner })
2467 0 : }
2468 :
2469 16 : pub(crate) fn no_pending_work(&self) -> bool {
2470 16 : let inner = self.upload_queue.lock().unwrap();
2471 16 : match &*inner {
2472 : UploadQueue::Uninitialized
2473 0 : | UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => true,
2474 16 : UploadQueue::Stopped(UploadQueueStopped::Deletable(x)) => {
2475 16 : x.upload_queue_for_deletion.no_pending_work()
2476 : }
2477 0 : UploadQueue::Initialized(x) => x.no_pending_work(),
2478 : }
2479 16 : }
2480 :
2481 : /// 'foreign' in the sense that it does not belong to this tenant shard. This method
2482 : /// is used during GC for other shards to get the index of shard zero.
2483 0 : pub(crate) async fn download_foreign_index(
2484 0 : &self,
2485 0 : shard_number: ShardNumber,
2486 0 : cancel: &CancellationToken,
2487 0 : ) -> Result<(IndexPart, Generation, std::time::SystemTime), DownloadError> {
2488 0 : let foreign_shard_id = TenantShardId {
2489 0 : shard_number,
2490 0 : shard_count: self.tenant_shard_id.shard_count,
2491 0 : tenant_id: self.tenant_shard_id.tenant_id,
2492 0 : };
2493 0 : download_index_part(
2494 0 : &self.storage_impl,
2495 0 : &foreign_shard_id,
2496 0 : &self.timeline_id,
2497 0 : Generation::MAX,
2498 0 : cancel,
2499 0 : )
2500 0 : .await
2501 0 : }
2502 : }
2503 :
2504 : pub(crate) struct UploadQueueAccessor<'a> {
2505 : inner: std::sync::MutexGuard<'a, UploadQueue>,
2506 : }
2507 :
2508 : impl UploadQueueAccessor<'_> {
2509 0 : pub(crate) fn latest_uploaded_index_part(&self) -> &IndexPart {
2510 0 : match &*self.inner {
2511 0 : UploadQueue::Initialized(x) => &x.clean.0,
2512 : UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
2513 0 : unreachable!("checked before constructing")
2514 : }
2515 : }
2516 0 : }
2517 : }
2518 :
2519 0 : pub fn remote_tenant_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2520 0 : let path = format!("tenants/{tenant_shard_id}");
2521 0 : RemotePath::from_string(&path).expect("Failed to construct path")
2522 0 : }
2523 :
2524 1366 : pub fn remote_tenant_manifest_path(
2525 1366 : tenant_shard_id: &TenantShardId,
2526 1366 : generation: Generation,
2527 1366 : ) -> RemotePath {
2528 1366 : let path = format!(
2529 1366 : "tenants/{tenant_shard_id}/tenant-manifest{}.json",
2530 1366 : generation.get_suffix()
2531 1366 : );
2532 1366 : RemotePath::from_string(&path).expect("Failed to construct path")
2533 1366 : }
2534 :
2535 : /// Prefix to all generations' manifest objects in a tenant shard
2536 454 : pub fn remote_tenant_manifest_prefix(tenant_shard_id: &TenantShardId) -> RemotePath {
2537 454 : let path = format!("tenants/{tenant_shard_id}/tenant-manifest",);
2538 454 : RemotePath::from_string(&path).expect("Failed to construct path")
2539 454 : }
2540 :
2541 514 : pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2542 514 : let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
2543 514 : RemotePath::from_string(&path).expect("Failed to construct path")
2544 514 : }
2545 :
2546 0 : fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath {
2547 0 : let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
2548 0 : RemotePath::from_string(&path).expect("Failed to construct path")
2549 0 : }
2550 :
2551 60 : pub fn remote_timeline_path(
2552 60 : tenant_shard_id: &TenantShardId,
2553 60 : timeline_id: &TimelineId,
2554 60 : ) -> RemotePath {
2555 60 : remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
2556 60 : }
2557 :
2558 : /// Obtains the path of the given Layer in the remote
2559 : ///
2560 : /// Note that the shard component of a remote layer path is _not_ always the same
2561 : /// as in the TenantShardId of the caller: tenants may reference layers from a different
2562 : /// ShardIndex. Use the ShardIndex from the layer's metadata.
2563 3984 : pub fn remote_layer_path(
2564 3984 : tenant_id: &TenantId,
2565 3984 : timeline_id: &TimelineId,
2566 3984 : shard: ShardIndex,
2567 3984 : layer_file_name: &LayerName,
2568 3984 : generation: Generation,
2569 3984 : ) -> RemotePath {
2570 3984 : // Generation-aware key format
2571 3984 : let path = format!(
2572 3984 : "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
2573 3984 : shard.get_suffix(),
2574 3984 : layer_file_name,
2575 3984 : generation.get_suffix()
2576 3984 : );
2577 3984 :
2578 3984 : RemotePath::from_string(&path).expect("Failed to construct path")
2579 3984 : }
2580 :
2581 : /// Returns true if a and b have the same layer path within a tenant/timeline. This is essentially
2582 : /// remote_layer_path(a) == remote_layer_path(b) without the string allocations.
2583 : ///
2584 : /// TODO: there should be a variant of LayerName for the physical path that contains information
2585 : /// about the shard and generation, such that this could be replaced by a simple comparison.
2586 2360323 : pub fn is_same_remote_layer_path(
2587 2360323 : aname: &LayerName,
2588 2360323 : ameta: &LayerFileMetadata,
2589 2360323 : bname: &LayerName,
2590 2360323 : bmeta: &LayerFileMetadata,
2591 2360323 : ) -> bool {
2592 2360323 : // NB: don't assert remote_layer_path(a) == remote_layer_path(b); too expensive even for debug.
2593 2360323 : aname == bname && ameta.shard == bmeta.shard && ameta.generation == bmeta.generation
2594 2360323 : }
2595 :
2596 8 : pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
2597 8 : RemotePath::from_string(&format!(
2598 8 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
2599 8 : ))
2600 8 : .expect("Failed to construct path")
2601 8 : }
2602 :
2603 4 : pub fn remote_initdb_preserved_archive_path(
2604 4 : tenant_id: &TenantId,
2605 4 : timeline_id: &TimelineId,
2606 4 : ) -> RemotePath {
2607 4 : RemotePath::from_string(&format!(
2608 4 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PRESERVED_PATH}"
2609 4 : ))
2610 4 : .expect("Failed to construct path")
2611 4 : }
2612 :
2613 3125 : pub fn remote_index_path(
2614 3125 : tenant_shard_id: &TenantShardId,
2615 3125 : timeline_id: &TimelineId,
2616 3125 : generation: Generation,
2617 3125 : ) -> RemotePath {
2618 3125 : RemotePath::from_string(&format!(
2619 3125 : "tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
2620 3125 : IndexPart::FILE_NAME,
2621 3125 : generation.get_suffix()
2622 3125 : ))
2623 3125 : .expect("Failed to construct path")
2624 3125 : }
2625 :
2626 0 : pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2627 0 : RemotePath::from_string(&format!(
2628 0 : "tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
2629 0 : ))
2630 0 : .expect("Failed to construct path")
2631 0 : }
2632 :
2633 : /// Given the key of an index, parse out the generation part of the name
2634 36 : pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
2635 36 : let file_name = match path.get_path().file_name() {
2636 36 : Some(f) => f,
2637 : None => {
2638 : // Unexpected: we should be seeing index_part.json paths only
2639 0 : tracing::warn!("Malformed index key {}", path);
2640 0 : return None;
2641 : }
2642 : };
2643 :
2644 36 : match file_name.split_once('-') {
2645 24 : Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
2646 12 : None => None,
2647 : }
2648 36 : }
2649 :
2650 : /// Given the key of a tenant manifest, parse out the generation number
2651 0 : pub fn parse_remote_tenant_manifest_path(path: RemotePath) -> Option<Generation> {
2652 : static RE: OnceLock<Regex> = OnceLock::new();
2653 0 : let re = RE.get_or_init(|| Regex::new(r".*tenant-manifest-([0-9a-f]{8}).json").unwrap());
2654 0 : re.captures(path.get_path().as_str())
2655 0 : .and_then(|c| c.get(1))
2656 0 : .and_then(|m| Generation::parse_suffix(m.as_str()))
2657 0 : }
2658 :
2659 : #[cfg(test)]
2660 : mod tests {
2661 : use std::collections::HashSet;
2662 :
2663 : use super::*;
2664 : use crate::DEFAULT_PG_VERSION;
2665 : use crate::context::RequestContext;
2666 : use crate::tenant::config::AttachmentMode;
2667 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
2668 : use crate::tenant::storage_layer::layer::local_layer_path;
2669 : use crate::tenant::{Tenant, Timeline};
2670 :
2671 16 : pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
2672 16 : format!("contents for {name}").into()
2673 16 : }
2674 :
2675 4 : pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
2676 4 : let metadata = TimelineMetadata::new(
2677 4 : disk_consistent_lsn,
2678 4 : None,
2679 4 : None,
2680 4 : Lsn(0),
2681 4 : Lsn(0),
2682 4 : Lsn(0),
2683 4 : // Any version will do
2684 4 : // but it should be consistent with the one in the tests
2685 4 : crate::DEFAULT_PG_VERSION,
2686 4 : );
2687 4 :
2688 4 : // go through serialize + deserialize to fix the header, including checksum
2689 4 : TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
2690 4 : }
2691 :
2692 4 : fn assert_file_list(a: &HashSet<LayerName>, b: &[&str]) {
2693 12 : let mut avec: Vec<String> = a.iter().map(|x| x.to_string()).collect();
2694 4 : avec.sort();
2695 4 :
2696 4 : let mut bvec = b.to_vec();
2697 4 : bvec.sort_unstable();
2698 4 :
2699 4 : assert_eq!(avec, bvec);
2700 4 : }
2701 :
2702 8 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
2703 8 : let mut expected: Vec<String> = expected
2704 8 : .iter()
2705 32 : .map(|x| format!("{}{}", x, generation.get_suffix()))
2706 8 : .collect();
2707 8 : expected.sort();
2708 8 :
2709 8 : let mut found: Vec<String> = Vec::new();
2710 32 : for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
2711 32 : let entry_name = entry.file_name();
2712 32 : let fname = entry_name.to_str().unwrap();
2713 32 : found.push(String::from(fname));
2714 32 : }
2715 8 : found.sort();
2716 8 :
2717 8 : assert_eq!(found, expected);
2718 8 : }
2719 :
2720 : struct TestSetup {
2721 : harness: TenantHarness,
2722 : tenant: Arc<Tenant>,
2723 : timeline: Arc<Timeline>,
2724 : tenant_ctx: RequestContext,
2725 : }
2726 :
2727 : impl TestSetup {
2728 16 : async fn new(test_name: &str) -> anyhow::Result<Self> {
2729 16 : let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
2730 16 : let harness = TenantHarness::create(test_name).await?;
2731 16 : let (tenant, ctx) = harness.load().await;
2732 :
2733 16 : let timeline = tenant
2734 16 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2735 16 : .await?;
2736 :
2737 16 : Ok(Self {
2738 16 : harness,
2739 16 : tenant,
2740 16 : timeline,
2741 16 : tenant_ctx: ctx,
2742 16 : })
2743 16 : }
2744 :
2745 : /// Construct a RemoteTimelineClient in an arbitrary generation
2746 20 : fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
2747 20 : let location_conf = AttachedLocationConfig {
2748 20 : generation,
2749 20 : attach_mode: AttachmentMode::Single,
2750 20 : };
2751 20 : Arc::new(RemoteTimelineClient {
2752 20 : conf: self.harness.conf,
2753 20 : runtime: tokio::runtime::Handle::current(),
2754 20 : tenant_shard_id: self.harness.tenant_shard_id,
2755 20 : timeline_id: TIMELINE_ID,
2756 20 : generation,
2757 20 : storage_impl: self.harness.remote_storage.clone(),
2758 20 : deletion_queue_client: self.harness.deletion_queue.new_client(),
2759 20 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
2760 20 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
2761 20 : &self.harness.tenant_shard_id,
2762 20 : &TIMELINE_ID,
2763 20 : )),
2764 20 : config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
2765 20 : cancel: CancellationToken::new(),
2766 20 : })
2767 20 : }
2768 :
2769 : /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
2770 : /// and timeline_id are present.
2771 12 : fn span(&self) -> tracing::Span {
2772 12 : tracing::info_span!(
2773 : "test",
2774 : tenant_id = %self.harness.tenant_shard_id.tenant_id,
2775 0 : shard_id = %self.harness.tenant_shard_id.shard_slug(),
2776 : timeline_id = %TIMELINE_ID
2777 : )
2778 12 : }
2779 : }
2780 :
2781 : // Test scheduling
2782 : #[tokio::test]
2783 4 : async fn upload_scheduling() {
2784 4 : // Test outline:
2785 4 : //
2786 4 : // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
2787 4 : // Schedule upload of index. Check that it is queued
2788 4 : // let the layer file uploads finish. Check that the index-upload is now started
2789 4 : // let the index-upload finish.
2790 4 : //
2791 4 : // Download back the index.json. Check that the list of files is correct
2792 4 : //
2793 4 : // Schedule upload. Schedule deletion. Check that the deletion is queued
2794 4 : // let upload finish. Check that deletion is now started
2795 4 : // Schedule another deletion. Check that it's launched immediately.
2796 4 : // Schedule index upload. Check that it's queued
2797 4 :
2798 4 : let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
2799 4 : let span = test_setup.span();
2800 4 : let _guard = span.enter();
2801 4 :
2802 4 : let TestSetup {
2803 4 : harness,
2804 4 : tenant: _tenant,
2805 4 : timeline,
2806 4 : tenant_ctx: _tenant_ctx,
2807 4 : } = test_setup;
2808 4 :
2809 4 : let client = &timeline.remote_client;
2810 4 :
2811 4 : // Download back the index.json, and check that the list of files is correct
2812 4 : let initial_index_part = match client
2813 4 : .download_index_file(&CancellationToken::new())
2814 4 : .await
2815 4 : .unwrap()
2816 4 : {
2817 4 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2818 4 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2819 4 : };
2820 4 : let initial_layers = initial_index_part
2821 4 : .layer_metadata
2822 4 : .keys()
2823 4 : .map(|f| f.to_owned())
2824 4 : .collect::<HashSet<LayerName>>();
2825 4 : let initial_layer = {
2826 4 : assert!(initial_layers.len() == 1);
2827 4 : initial_layers.into_iter().next().unwrap()
2828 4 : };
2829 4 :
2830 4 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
2831 4 :
2832 4 : println!("workdir: {}", harness.conf.workdir);
2833 4 :
2834 4 : let remote_timeline_dir = harness
2835 4 : .remote_fs_dir
2836 4 : .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
2837 4 : println!("remote_timeline_dir: {remote_timeline_dir}");
2838 4 :
2839 4 : let generation = harness.generation;
2840 4 : let shard = harness.shard;
2841 4 :
2842 4 : // Create a couple of dummy files, schedule upload for them
2843 4 :
2844 4 : let layers = [
2845 4 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
2846 4 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
2847 4 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
2848 4 : ]
2849 4 : .into_iter()
2850 12 : .map(|(name, contents): (LayerName, Vec<u8>)| {
2851 12 :
2852 12 : let local_path = local_layer_path(
2853 12 : harness.conf,
2854 12 : &timeline.tenant_shard_id,
2855 12 : &timeline.timeline_id,
2856 12 : &name,
2857 12 : &generation,
2858 12 : );
2859 12 : std::fs::write(&local_path, &contents).unwrap();
2860 12 :
2861 12 : Layer::for_resident(
2862 12 : harness.conf,
2863 12 : &timeline,
2864 12 : local_path,
2865 12 : name,
2866 12 : LayerFileMetadata::new(contents.len() as u64, generation, shard),
2867 12 : )
2868 12 : }).collect::<Vec<_>>();
2869 4 :
2870 4 : client
2871 4 : .schedule_layer_file_upload(layers[0].clone())
2872 4 : .unwrap();
2873 4 : client
2874 4 : .schedule_layer_file_upload(layers[1].clone())
2875 4 : .unwrap();
2876 4 :
2877 4 : // Check that they are started immediately, not queued
2878 4 : //
2879 4 : // this works because we running within block_on, so any futures are now queued up until
2880 4 : // our next await point.
2881 4 : {
2882 4 : let mut guard = client.upload_queue.lock().unwrap();
2883 4 : let upload_queue = guard.initialized_mut().unwrap();
2884 4 : assert!(upload_queue.queued_operations.is_empty());
2885 4 : assert_eq!(upload_queue.inprogress_tasks.len(), 2);
2886 4 : assert_eq!(upload_queue.num_inprogress_layer_uploads(), 2);
2887 4 :
2888 4 : // also check that `latest_file_changes` was updated
2889 4 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
2890 4 : }
2891 4 :
2892 4 : // Schedule upload of index. Check that it is queued
2893 4 : let metadata = dummy_metadata(Lsn(0x20));
2894 4 : client
2895 4 : .schedule_index_upload_for_full_metadata_update(&metadata)
2896 4 : .unwrap();
2897 4 : {
2898 4 : let mut guard = client.upload_queue.lock().unwrap();
2899 4 : let upload_queue = guard.initialized_mut().unwrap();
2900 4 : assert!(upload_queue.queued_operations.len() == 1);
2901 4 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
2902 4 : }
2903 4 :
2904 4 : // Wait for the uploads to finish
2905 4 : client.wait_completion().await.unwrap();
2906 4 : {
2907 4 : let mut guard = client.upload_queue.lock().unwrap();
2908 4 : let upload_queue = guard.initialized_mut().unwrap();
2909 4 :
2910 4 : assert!(upload_queue.queued_operations.is_empty());
2911 4 : assert!(upload_queue.inprogress_tasks.is_empty());
2912 4 : }
2913 4 :
2914 4 : // Download back the index.json, and check that the list of files is correct
2915 4 : let index_part = match client
2916 4 : .download_index_file(&CancellationToken::new())
2917 4 : .await
2918 4 : .unwrap()
2919 4 : {
2920 4 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2921 4 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2922 4 : };
2923 4 :
2924 4 : assert_file_list(
2925 4 : &index_part
2926 4 : .layer_metadata
2927 4 : .keys()
2928 12 : .map(|f| f.to_owned())
2929 4 : .collect(),
2930 4 : &[
2931 4 : &initial_layer.to_string(),
2932 4 : &layers[0].layer_desc().layer_name().to_string(),
2933 4 : &layers[1].layer_desc().layer_name().to_string(),
2934 4 : ],
2935 4 : );
2936 4 : assert_eq!(index_part.metadata, metadata);
2937 4 :
2938 4 : // Schedule upload and then a deletion. Check that the deletion is queued
2939 4 : client
2940 4 : .schedule_layer_file_upload(layers[2].clone())
2941 4 : .unwrap();
2942 4 :
2943 4 : // this is no longer consistent with how deletion works with Layer::drop, but in this test
2944 4 : // keep using schedule_layer_file_deletion because we don't have a way to wait for the
2945 4 : // spawn_blocking started by the drop.
2946 4 : client
2947 4 : .schedule_layer_file_deletion(&[layers[0].layer_desc().layer_name()])
2948 4 : .unwrap();
2949 4 : {
2950 4 : let mut guard = client.upload_queue.lock().unwrap();
2951 4 : let upload_queue = guard.initialized_mut().unwrap();
2952 4 :
2953 4 : // Deletion schedules upload of the index file, and the file deletion itself
2954 4 : assert_eq!(upload_queue.queued_operations.len(), 2);
2955 4 : assert_eq!(upload_queue.inprogress_tasks.len(), 1);
2956 4 : assert_eq!(upload_queue.num_inprogress_layer_uploads(), 1);
2957 4 : assert_eq!(upload_queue.num_inprogress_deletions(), 0);
2958 4 : assert_eq!(
2959 4 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
2960 4 : 0
2961 4 : );
2962 4 : }
2963 4 : assert_remote_files(
2964 4 : &[
2965 4 : &initial_layer.to_string(),
2966 4 : &layers[0].layer_desc().layer_name().to_string(),
2967 4 : &layers[1].layer_desc().layer_name().to_string(),
2968 4 : "index_part.json",
2969 4 : ],
2970 4 : &remote_timeline_dir,
2971 4 : generation,
2972 4 : );
2973 4 :
2974 4 : // Finish them
2975 4 : client.wait_completion().await.unwrap();
2976 4 : harness.deletion_queue.pump().await;
2977 4 :
2978 4 : assert_remote_files(
2979 4 : &[
2980 4 : &initial_layer.to_string(),
2981 4 : &layers[1].layer_desc().layer_name().to_string(),
2982 4 : &layers[2].layer_desc().layer_name().to_string(),
2983 4 : "index_part.json",
2984 4 : ],
2985 4 : &remote_timeline_dir,
2986 4 : generation,
2987 4 : );
2988 4 : }
2989 :
2990 : #[tokio::test]
2991 4 : async fn bytes_unfinished_gauge_for_layer_file_uploads() {
2992 4 : // Setup
2993 4 :
2994 4 : let TestSetup {
2995 4 : harness,
2996 4 : tenant: _tenant,
2997 4 : timeline,
2998 4 : ..
2999 4 : } = TestSetup::new("metrics").await.unwrap();
3000 4 : let client = &timeline.remote_client;
3001 4 :
3002 4 : let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
3003 4 : let local_path = local_layer_path(
3004 4 : harness.conf,
3005 4 : &timeline.tenant_shard_id,
3006 4 : &timeline.timeline_id,
3007 4 : &layer_file_name_1,
3008 4 : &harness.generation,
3009 4 : );
3010 4 : let content_1 = dummy_contents("foo");
3011 4 : std::fs::write(&local_path, &content_1).unwrap();
3012 4 :
3013 4 : let layer_file_1 = Layer::for_resident(
3014 4 : harness.conf,
3015 4 : &timeline,
3016 4 : local_path,
3017 4 : layer_file_name_1.clone(),
3018 4 : LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
3019 4 : );
3020 4 :
3021 4 : #[derive(Debug, PartialEq, Clone, Copy)]
3022 4 : struct BytesStartedFinished {
3023 4 : started: Option<usize>,
3024 4 : finished: Option<usize>,
3025 4 : }
3026 4 : impl std::ops::Add for BytesStartedFinished {
3027 4 : type Output = Self;
3028 8 : fn add(self, rhs: Self) -> Self::Output {
3029 8 : Self {
3030 8 : started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
3031 8 : finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
3032 8 : }
3033 8 : }
3034 4 : }
3035 12 : let get_bytes_started_stopped = || {
3036 12 : let started = client
3037 12 : .metrics
3038 12 : .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
3039 12 : .map(|v| v.try_into().unwrap());
3040 12 : let stopped = client
3041 12 : .metrics
3042 12 : .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
3043 12 : .map(|v| v.try_into().unwrap());
3044 12 : BytesStartedFinished {
3045 12 : started,
3046 12 : finished: stopped,
3047 12 : }
3048 12 : };
3049 4 :
3050 4 : // Test
3051 4 : tracing::info!("now doing actual test");
3052 4 :
3053 4 : let actual_a = get_bytes_started_stopped();
3054 4 :
3055 4 : client
3056 4 : .schedule_layer_file_upload(layer_file_1.clone())
3057 4 : .unwrap();
3058 4 :
3059 4 : let actual_b = get_bytes_started_stopped();
3060 4 :
3061 4 : client.wait_completion().await.unwrap();
3062 4 :
3063 4 : let actual_c = get_bytes_started_stopped();
3064 4 :
3065 4 : // Validate
3066 4 :
3067 4 : let expected_b = actual_a
3068 4 : + BytesStartedFinished {
3069 4 : started: Some(content_1.len()),
3070 4 : // assert that the _finished metric is created eagerly so that subtractions work on first sample
3071 4 : finished: Some(0),
3072 4 : };
3073 4 : assert_eq!(actual_b, expected_b);
3074 4 :
3075 4 : let expected_c = actual_a
3076 4 : + BytesStartedFinished {
3077 4 : started: Some(content_1.len()),
3078 4 : finished: Some(content_1.len()),
3079 4 : };
3080 4 : assert_eq!(actual_c, expected_c);
3081 4 : }
3082 :
3083 24 : async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
3084 24 : // An empty IndexPart, just sufficient to ensure deserialization will succeed
3085 24 : let example_index_part = IndexPart::example();
3086 24 :
3087 24 : let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
3088 24 :
3089 24 : let index_path = test_state.harness.remote_fs_dir.join(
3090 24 : remote_index_path(
3091 24 : &test_state.harness.tenant_shard_id,
3092 24 : &TIMELINE_ID,
3093 24 : generation,
3094 24 : )
3095 24 : .get_path(),
3096 24 : );
3097 24 :
3098 24 : std::fs::create_dir_all(index_path.parent().unwrap())
3099 24 : .expect("creating test dir should work");
3100 24 :
3101 24 : eprintln!("Writing {index_path}");
3102 24 : std::fs::write(&index_path, index_part_bytes).unwrap();
3103 24 : example_index_part
3104 24 : }
3105 :
3106 : /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
3107 : /// index, the IndexPart returned is equal to `expected`
3108 20 : async fn assert_got_index_part(
3109 20 : test_state: &TestSetup,
3110 20 : get_generation: Generation,
3111 20 : expected: &IndexPart,
3112 20 : ) {
3113 20 : let client = test_state.build_client(get_generation);
3114 :
3115 20 : let download_r = client
3116 20 : .download_index_file(&CancellationToken::new())
3117 20 : .await
3118 20 : .expect("download should always succeed");
3119 20 : assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
3120 20 : match download_r {
3121 20 : MaybeDeletedIndexPart::IndexPart(index_part) => {
3122 20 : assert_eq!(&index_part, expected);
3123 : }
3124 0 : MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
3125 : }
3126 20 : }
3127 :
3128 : #[tokio::test]
3129 4 : async fn index_part_download_simple() -> anyhow::Result<()> {
3130 4 : let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
3131 4 : let span = test_state.span();
3132 4 : let _guard = span.enter();
3133 4 :
3134 4 : // Simple case: we are in generation N, load the index from generation N - 1
3135 4 : let generation_n = 5;
3136 4 : let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
3137 4 :
3138 4 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
3139 4 :
3140 4 : Ok(())
3141 4 : }
3142 :
3143 : #[tokio::test]
3144 4 : async fn index_part_download_ordering() -> anyhow::Result<()> {
3145 4 : let test_state = TestSetup::new("index_part_download_ordering")
3146 4 : .await
3147 4 : .unwrap();
3148 4 :
3149 4 : let span = test_state.span();
3150 4 : let _guard = span.enter();
3151 4 :
3152 4 : // A generation-less IndexPart exists in the bucket, we should find it
3153 4 : let generation_n = 5;
3154 4 : let injected_none = inject_index_part(&test_state, Generation::none()).await;
3155 4 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
3156 4 :
3157 4 : // If a more recent-than-none generation exists, we should prefer to load that
3158 4 : let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
3159 4 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
3160 4 :
3161 4 : // If a more-recent-than-me generation exists, we should ignore it.
3162 4 : let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
3163 4 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
3164 4 :
3165 4 : // If a directly previous generation exists, _and_ an index exists in my own
3166 4 : // generation, I should prefer my own generation.
3167 4 : let _injected_prev =
3168 4 : inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
3169 4 : let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
3170 4 : assert_got_index_part(
3171 4 : &test_state,
3172 4 : Generation::new(generation_n),
3173 4 : &injected_current,
3174 4 : )
3175 4 : .await;
3176 4 :
3177 4 : Ok(())
3178 4 : }
3179 : }
|