Line data Source code
1 : //! This module manages synchronizing local FS with remote storage.
2 : //!
3 : //! # Overview
4 : //!
5 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
6 : //! It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
7 : //! when it's safe to do so.
8 : //!
9 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
10 : //!
11 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
12 : //!
13 : //! # APIs & How To Use Them
14 : //!
15 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
16 : //! unless the pageserver is configured without remote storage.
17 : //!
18 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
19 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
20 : //! timeline.
21 : //! However, the client does not become ready for use until we've initialized its upload queue:
22 : //!
23 : //! - For timelines that already have some state on the remote storage, we use
24 : //! [`RemoteTimelineClient::init_upload_queue`] .
25 : //! - For newly created timelines, we use
26 : //! [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
27 : //!
28 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
29 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
30 : //!
31 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
32 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
33 : //!
34 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`] when we've created a new layer file.
35 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
36 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
37 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
38 : //!
39 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
40 : //!
41 : //! There are also APIs for downloading files.
42 : //! These are not part of the aforementioned queuing and will not be discussed
43 : //! further here, except in the section covering tenant attach.
44 : //!
45 : //! # Remote Storage Structure & [`IndexPart`] Index File
46 : //!
47 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
48 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
49 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
50 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
51 : //! files for a given timeline.
52 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
53 : //!
54 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
55 : //!
56 : //! # Consistency
57 : //!
58 : //! To have a consistent remote structure, it's important that uploads and
59 : //! deletions are performed in the right order. For example, the index file
60 : //! contains a list of layer files, so it must not be uploaded until all the
61 : //! layer files that are in its list have been successfully uploaded.
62 : //!
63 : //! The contract between client and its user is that the user is responsible of
64 : //! scheduling operations in an order that keeps the remote consistent as
65 : //! described above.
66 : //!
67 : //! From the user's perspective, the operations are executed sequentially.
68 : //! Internally, the client knows which operations can be performed in parallel,
69 : //! and which operations act like a "barrier" that require preceding operations
70 : //! to finish. The calling code just needs to call the schedule-functions in the
71 : //! correct order, and the client will parallelize the operations in a way that
72 : //! is safe. For more details, see `UploadOp::can_bypass`.
73 : //!
74 : //! All of this relies on the following invariants:
75 : //!
76 : //! - We rely on read-after write consistency in the remote storage.
77 : //! - Layer files are immutable.
78 : //!
79 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
80 : //! storage. Different tenants can be attached to different pageservers, but if the
81 : //! same tenant is attached to two pageservers at the same time, they will overwrite
82 : //! each other's index file updates, and confusion will ensue. There's no interlock or
83 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
84 : //! that that doesn't happen.
85 : //!
86 : //! ## Implementation Note
87 : //!
88 : //! The *actual* remote state lags behind the *desired* remote state while
89 : //! there are in-flight operations.
90 : //! We keep track of the desired remote state in [`UploadQueueInitialized::dirty`].
91 : //! It is initialized based on the [`IndexPart`] that was passed during init
92 : //! and updated with every `schedule_*` function call.
93 : //! All this is necessary necessary to compute the future [`IndexPart`]s
94 : //! when scheduling an operation while other operations that also affect the
95 : //! remote [`IndexPart`] are in flight.
96 : //!
97 : //! # Retries & Error Handling
98 : //!
99 : //! The client retries operations indefinitely, using exponential back-off.
100 : //! There is no way to force a retry, i.e., interrupt the back-off.
101 : //! This could be built easily.
102 : //!
103 : //! # Cancellation
104 : //!
105 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
106 : //! the client's tenant and timeline.
107 : //! Dropping the client will drop queued operations but not executing operations.
108 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
109 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
110 : //!
111 : //! # Completion
112 : //!
113 : //! Once an operation has completed, we update [`UploadQueueInitialized::clean`] immediately,
114 : //! and submit a request through the DeletionQueue to update
115 : //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
116 : //! validated that our generation is not stale. It is this visible value
117 : //! that is advertized to safekeepers as a signal that that they can
118 : //! delete the WAL up to that LSN.
119 : //!
120 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
121 : //! for all pending operations to complete. It does not prevent more
122 : //! operations from getting scheduled.
123 : //!
124 : //! # Crash Consistency
125 : //!
126 : //! We do not persist the upload queue state.
127 : //! If we drop the client, or crash, all unfinished operations are lost.
128 : //!
129 : //! To recover, the following steps need to be taken:
130 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
131 : //! consistent remote state, assuming the user scheduled the operations in
132 : //! the correct order.
133 : //! - Initiate upload queue with that [`IndexPart`].
134 : //! - Reschedule all lost operations by comparing the local filesystem state
135 : //! and remote state as per [`IndexPart`]. This is done in
136 : //! [`Tenant::timeline_init_and_sync`].
137 : //!
138 : //! Note that if we crash during file deletion between the index update
139 : //! that removes the file from the list of files, and deleting the remote file,
140 : //! the file is leaked in the remote storage. Similarly, if a new file is created
141 : //! and uploaded, but the pageserver dies permanently before updating the
142 : //! remote index file, the new file is leaked in remote storage. We accept and
143 : //! tolerate that for now.
144 : //! Note further that we cannot easily fix this by scheduling deletes for every
145 : //! file that is present only on the remote, because we cannot distinguish the
146 : //! following two cases:
147 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
148 : //! but crashed before it finished remotely.
149 : //! - (2) We never had the file locally because we haven't on-demand downloaded
150 : //! it yet.
151 : //!
152 : //! # Downloads
153 : //!
154 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
155 : //! downloading files from the remote storage. Downloads are performed immediately
156 : //! against the `RemoteStorage`, independently of the upload queue.
157 : //!
158 : //! When we attach a tenant, we perform the following steps:
159 : //! - create `Tenant` object in `TenantState::Attaching` state
160 : //! - List timelines that are present in remote storage, and for each:
161 : //! - download their remote [`IndexPart`]s
162 : //! - create `Timeline` struct and a `RemoteTimelineClient`
163 : //! - initialize the client's upload queue with its `IndexPart`
164 : //! - schedule uploads for layers that are only present locally.
165 : //! - After the above is done for each timeline, open the tenant for business by
166 : //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
167 : //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
168 : //!
169 : //! # Operating Without Remote Storage
170 : //!
171 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
172 : //! not created and the uploads are skipped.
173 : //!
174 : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
175 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
176 :
177 : pub(crate) mod download;
178 : pub mod index;
179 : pub mod manifest;
180 : pub(crate) mod upload;
181 :
182 : use anyhow::Context;
183 : use camino::Utf8Path;
184 : use chrono::{NaiveDateTime, Utc};
185 :
186 : pub(crate) use download::download_initdb_tar_zst;
187 : use pageserver_api::models::TimelineArchivalState;
188 : use pageserver_api::shard::{ShardIndex, TenantShardId};
189 : use regex::Regex;
190 : use scopeguard::ScopeGuard;
191 : use tokio_util::sync::CancellationToken;
192 : use utils::backoff::{
193 : self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
194 : };
195 : use utils::pausable_failpoint;
196 : use utils::shard::ShardNumber;
197 :
198 : use std::collections::{HashMap, HashSet, VecDeque};
199 : use std::sync::atomic::{AtomicU32, Ordering};
200 : use std::sync::{Arc, Mutex, OnceLock};
201 : use std::time::Duration;
202 :
203 : use remote_storage::{
204 : DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
205 : };
206 : use std::ops::DerefMut;
207 : use tracing::{debug, error, info, instrument, warn};
208 : use tracing::{info_span, Instrument};
209 : use utils::lsn::Lsn;
210 :
211 : use crate::context::RequestContext;
212 : use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
213 : use crate::metrics::{
214 : MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
215 : RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
216 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
217 : };
218 : use crate::task_mgr::shutdown_token;
219 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
220 : use crate::tenant::remote_timeline_client::download::download_retry;
221 : use crate::tenant::storage_layer::AsLayerDesc;
222 : use crate::tenant::upload_queue::{Delete, OpType, UploadQueueStoppedDeletable};
223 : use crate::tenant::TIMELINES_SEGMENT_NAME;
224 : use crate::{
225 : config::PageServerConf,
226 : task_mgr,
227 : task_mgr::TaskKind,
228 : task_mgr::BACKGROUND_RUNTIME,
229 : tenant::metadata::TimelineMetadata,
230 : tenant::upload_queue::{
231 : UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
232 : },
233 : TENANT_HEATMAP_BASENAME,
234 : };
235 :
236 : use utils::id::{TenantId, TimelineId};
237 :
238 : use self::index::IndexPart;
239 :
240 : use super::config::AttachedLocationConfig;
241 : use super::metadata::MetadataUpdate;
242 : use super::storage_layer::{Layer, LayerName, ResidentLayer};
243 : use super::timeline::import_pgdata;
244 : use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
245 : use super::{DeleteTimelineError, Generation};
246 :
247 : pub(crate) use download::{
248 : download_index_part, download_tenant_manifest, is_temp_download_file,
249 : list_remote_tenant_shards, list_remote_timelines,
250 : };
251 : pub(crate) use index::LayerFileMetadata;
252 : pub(crate) use upload::upload_initdb_dir;
253 :
254 : // Occasional network issues and such can cause remote operations to fail, and
255 : // that's expected. If a download fails, we log it at info-level, and retry.
256 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
257 : // level instead, as repeated failures can mean a more serious problem. If it
258 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
259 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
260 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
261 :
262 : // Similarly log failed uploads and deletions at WARN level, after this many
263 : // retries. Uploads and deletions are retried forever, though.
264 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
265 :
266 : pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
267 :
268 : pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst";
269 :
270 : /// Default buffer size when interfacing with [`tokio::fs::File`].
271 : pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
272 :
273 : /// Doing non-essential flushes of deletion queue is subject to this timeout, after
274 : /// which we warn and skip.
275 : const DELETION_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_secs(10);
276 :
277 : pub enum MaybeDeletedIndexPart {
278 : IndexPart(IndexPart),
279 : Deleted(IndexPart),
280 : }
281 :
282 : #[derive(Debug, thiserror::Error)]
283 : pub enum PersistIndexPartWithDeletedFlagError {
284 : #[error("another task is already setting the deleted_flag, started at {0:?}")]
285 : AlreadyInProgress(NaiveDateTime),
286 : #[error("the deleted_flag was already set, value is {0:?}")]
287 : AlreadyDeleted(NaiveDateTime),
288 : #[error(transparent)]
289 : Other(#[from] anyhow::Error),
290 : }
291 :
292 : #[derive(Debug, thiserror::Error)]
293 : pub enum WaitCompletionError {
294 : #[error(transparent)]
295 : NotInitialized(NotInitialized),
296 : #[error("wait_completion aborted because upload queue was stopped")]
297 : UploadQueueShutDownOrStopped,
298 : }
299 :
300 : #[derive(Debug, thiserror::Error)]
301 : #[error("Upload queue either in unexpected state or hasn't downloaded manifest yet")]
302 : pub struct UploadQueueNotReadyError;
303 :
304 : #[derive(Debug, thiserror::Error)]
305 : pub enum ShutdownIfArchivedError {
306 : #[error(transparent)]
307 : NotInitialized(NotInitialized),
308 : #[error("timeline is not archived")]
309 : NotArchived,
310 : }
311 :
312 : /// Behavioral modes that enable seamless live migration.
313 : ///
314 : /// See docs/rfcs/028-pageserver-migration.md to understand how these fit in.
315 : struct RemoteTimelineClientConfig {
316 : /// If this is false, then update to remote_consistent_lsn are dropped rather
317 : /// than being submitted to DeletionQueue for validation. This behavior is
318 : /// used when a tenant attachment is known to have a stale generation number,
319 : /// such that validation attempts will always fail. This is not necessary
320 : /// for correctness, but avoids spamming error statistics with failed validations
321 : /// when doing migrations of tenants.
322 : process_remote_consistent_lsn_updates: bool,
323 :
324 : /// If this is true, then object deletions are held in a buffer in RemoteTimelineClient
325 : /// rather than being submitted to the DeletionQueue. This behavior is used when a tenant
326 : /// is known to be multi-attached, in order to avoid disrupting other attached tenants
327 : /// whose generations' metadata refers to the deleted objects.
328 : block_deletions: bool,
329 : }
330 :
331 : /// RemoteTimelineClientConfig's state is entirely driven by LocationConf, but we do
332 : /// not carry the entire LocationConf structure: it's much more than we need. The From
333 : /// impl extracts the subset of the LocationConf that is interesting to RemoteTimelineClient.
334 : impl From<&AttachedLocationConfig> for RemoteTimelineClientConfig {
335 456 : fn from(lc: &AttachedLocationConfig) -> Self {
336 456 : Self {
337 456 : block_deletions: !lc.may_delete_layers_hint(),
338 456 : process_remote_consistent_lsn_updates: lc.may_upload_layers_hint(),
339 456 : }
340 456 : }
341 : }
342 :
343 : /// A client for accessing a timeline's data in remote storage.
344 : ///
345 : /// This takes care of managing the number of connections, and balancing them
346 : /// across tenants. This also handles retries of failed uploads.
347 : ///
348 : /// Upload and delete requests are ordered so that before a deletion is
349 : /// performed, we wait for all preceding uploads to finish. This ensures sure
350 : /// that if you perform a compaction operation that reshuffles data in layer
351 : /// files, we don't have a transient state where the old files have already been
352 : /// deleted, but new files have not yet been uploaded.
353 : ///
354 : /// Similarly, this enforces an order between index-file uploads, and layer
355 : /// uploads. Before an index-file upload is performed, all preceding layer
356 : /// uploads must be finished.
357 : ///
358 : /// This also maintains a list of remote files, and automatically includes that
359 : /// in the index part file, whenever timeline metadata is uploaded.
360 : ///
361 : /// Downloads are not queued, they are performed immediately.
362 : pub(crate) struct RemoteTimelineClient {
363 : conf: &'static PageServerConf,
364 :
365 : runtime: tokio::runtime::Handle,
366 :
367 : tenant_shard_id: TenantShardId,
368 : timeline_id: TimelineId,
369 : generation: Generation,
370 :
371 : upload_queue: Mutex<UploadQueue>,
372 :
373 : pub(crate) metrics: Arc<RemoteTimelineClientMetrics>,
374 :
375 : storage_impl: GenericRemoteStorage,
376 :
377 : deletion_queue_client: DeletionQueueClient,
378 :
379 : /// Subset of tenant configuration used to control upload behaviors during migrations
380 : config: std::sync::RwLock<RemoteTimelineClientConfig>,
381 :
382 : cancel: CancellationToken,
383 : }
384 :
385 : impl RemoteTimelineClient {
386 : ///
387 : /// Create a remote storage client for given timeline
388 : ///
389 : /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
390 : /// by calling init_upload_queue.
391 : ///
392 446 : pub(crate) fn new(
393 446 : remote_storage: GenericRemoteStorage,
394 446 : deletion_queue_client: DeletionQueueClient,
395 446 : conf: &'static PageServerConf,
396 446 : tenant_shard_id: TenantShardId,
397 446 : timeline_id: TimelineId,
398 446 : generation: Generation,
399 446 : location_conf: &AttachedLocationConfig,
400 446 : ) -> RemoteTimelineClient {
401 446 : RemoteTimelineClient {
402 446 : conf,
403 446 : runtime: if cfg!(test) {
404 : // remote_timeline_client.rs tests rely on current-thread runtime
405 446 : tokio::runtime::Handle::current()
406 : } else {
407 0 : BACKGROUND_RUNTIME.handle().clone()
408 : },
409 446 : tenant_shard_id,
410 446 : timeline_id,
411 446 : generation,
412 446 : storage_impl: remote_storage,
413 446 : deletion_queue_client,
414 446 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
415 446 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
416 446 : &tenant_shard_id,
417 446 : &timeline_id,
418 446 : )),
419 446 : config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
420 446 : cancel: CancellationToken::new(),
421 446 : }
422 446 : }
423 :
424 : /// Initialize the upload queue for a remote storage that already received
425 : /// an index file upload, i.e., it's not empty.
426 : /// The given `index_part` must be the one on the remote.
427 6 : pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
428 6 : // Set the maximum number of inprogress tasks to the remote storage concurrency. There's
429 6 : // certainly no point in starting more upload tasks than this.
430 6 : let inprogress_limit = self
431 6 : .conf
432 6 : .remote_storage_config
433 6 : .as_ref()
434 6 : .and_then(|r| r.concurrency_limit())
435 6 : .unwrap_or(0);
436 6 : let mut upload_queue = self.upload_queue.lock().unwrap();
437 6 : upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
438 6 : self.update_remote_physical_size_gauge(Some(index_part));
439 6 : info!(
440 0 : "initialized upload queue from remote index with {} layer files",
441 0 : index_part.layer_metadata.len()
442 : );
443 6 : Ok(())
444 6 : }
445 :
446 : /// Initialize the upload queue for the case where the remote storage is empty,
447 : /// i.e., it doesn't have an `IndexPart`.
448 440 : pub fn init_upload_queue_for_empty_remote(
449 440 : &self,
450 440 : local_metadata: &TimelineMetadata,
451 440 : ) -> anyhow::Result<()> {
452 440 : // Set the maximum number of inprogress tasks to the remote storage concurrency. There's
453 440 : // certainly no point in starting more upload tasks than this.
454 440 : let inprogress_limit = self
455 440 : .conf
456 440 : .remote_storage_config
457 440 : .as_ref()
458 440 : .and_then(|r| r.concurrency_limit())
459 440 : .unwrap_or(0);
460 440 : let mut upload_queue = self.upload_queue.lock().unwrap();
461 440 : upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
462 440 : self.update_remote_physical_size_gauge(None);
463 440 : info!("initialized upload queue as empty");
464 440 : Ok(())
465 440 : }
466 :
467 : /// Initialize the queue in stopped state. Used in startup path
468 : /// to continue deletion operation interrupted by pageserver crash or restart.
469 0 : pub fn init_upload_queue_stopped_to_continue_deletion(
470 0 : &self,
471 0 : index_part: &IndexPart,
472 0 : ) -> anyhow::Result<()> {
473 : // FIXME: consider newtype for DeletedIndexPart.
474 0 : let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
475 0 : "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
476 0 : ))?;
477 0 : let inprogress_limit = self
478 0 : .conf
479 0 : .remote_storage_config
480 0 : .as_ref()
481 0 : .and_then(|r| r.concurrency_limit())
482 0 : .unwrap_or(0);
483 0 :
484 0 : let mut upload_queue = self.upload_queue.lock().unwrap();
485 0 : upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
486 0 : self.update_remote_physical_size_gauge(Some(index_part));
487 0 : self.stop_impl(&mut upload_queue);
488 0 :
489 0 : upload_queue
490 0 : .stopped_mut()
491 0 : .expect("stopped above")
492 0 : .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
493 0 :
494 0 : Ok(())
495 0 : }
496 :
497 : /// Notify this client of a change to its parent tenant's config, as this may cause us to
498 : /// take action (unblocking deletions when transitioning from AttachedMulti to AttachedSingle)
499 0 : pub(super) fn update_config(&self, location_conf: &AttachedLocationConfig) {
500 0 : let new_conf = RemoteTimelineClientConfig::from(location_conf);
501 0 : let unblocked = !new_conf.block_deletions;
502 0 :
503 0 : // Update config before draining deletions, so that we don't race with more being
504 0 : // inserted. This can result in deletions happening our of order, but that does not
505 0 : // violate any invariants: deletions only need to be ordered relative to upload of the index
506 0 : // that dereferences the deleted objects, and we are not changing that order.
507 0 : *self.config.write().unwrap() = new_conf;
508 0 :
509 0 : if unblocked {
510 : // If we may now delete layers, drain any that were blocked in our old
511 : // configuration state
512 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
513 :
514 0 : if let Ok(queue) = queue_locked.initialized_mut() {
515 0 : let blocked_deletions = std::mem::take(&mut queue.blocked_deletions);
516 0 : for d in blocked_deletions {
517 0 : if let Err(e) = self.deletion_queue_client.push_layers_sync(
518 0 : self.tenant_shard_id,
519 0 : self.timeline_id,
520 0 : self.generation,
521 0 : d.layers,
522 0 : ) {
523 : // This could happen if the pageserver is shut down while a tenant
524 : // is transitioning from a deletion-blocked state: we will leak some
525 : // S3 objects in this case.
526 0 : warn!("Failed to drain blocked deletions: {}", e);
527 0 : break;
528 0 : }
529 : }
530 0 : }
531 0 : }
532 0 : }
533 :
534 : /// Returns `None` if nothing is yet uplodaded, `Some(disk_consistent_lsn)` otherwise.
535 0 : pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
536 0 : match &mut *self.upload_queue.lock().unwrap() {
537 0 : UploadQueue::Uninitialized => None,
538 0 : UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
539 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
540 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => q
541 0 : .upload_queue_for_deletion
542 0 : .get_last_remote_consistent_lsn_projected(),
543 : }
544 0 : }
545 :
546 0 : pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
547 0 : match &mut *self.upload_queue.lock().unwrap() {
548 0 : UploadQueue::Uninitialized => None,
549 0 : UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
550 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
551 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => Some(
552 0 : q.upload_queue_for_deletion
553 0 : .get_last_remote_consistent_lsn_visible(),
554 0 : ),
555 : }
556 0 : }
557 :
558 : /// Returns true if this timeline was previously detached at this Lsn and the remote timeline
559 : /// client is currently initialized.
560 0 : pub(crate) fn is_previous_ancestor_lsn(&self, lsn: Lsn) -> bool {
561 0 : self.upload_queue
562 0 : .lock()
563 0 : .unwrap()
564 0 : .initialized_mut()
565 0 : .map(|uq| uq.clean.0.lineage.is_previous_ancestor_lsn(lsn))
566 0 : .unwrap_or(false)
567 0 : }
568 :
569 : /// Returns whether the timeline is archived.
570 : /// Return None if the remote index_part hasn't been downloaded yet.
571 0 : pub(crate) fn is_archived(&self) -> Option<bool> {
572 0 : self.upload_queue
573 0 : .lock()
574 0 : .unwrap()
575 0 : .initialized_mut()
576 0 : .map(|q| q.clean.0.archived_at.is_some())
577 0 : .ok()
578 0 : }
579 :
580 : /// Returns `Ok(Some(timestamp))` if the timeline has been archived, `Ok(None)` if the timeline hasn't been archived.
581 : ///
582 : /// Return Err(_) if the remote index_part hasn't been downloaded yet, or the timeline hasn't been stopped yet.
583 2 : pub(crate) fn archived_at_stopped_queue(
584 2 : &self,
585 2 : ) -> Result<Option<NaiveDateTime>, UploadQueueNotReadyError> {
586 2 : self.upload_queue
587 2 : .lock()
588 2 : .unwrap()
589 2 : .stopped_mut()
590 2 : .map(|q| q.upload_queue_for_deletion.clean.0.archived_at)
591 2 : .map_err(|_| UploadQueueNotReadyError)
592 2 : }
593 :
594 1924 : fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
595 1924 : let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
596 1484 : current_remote_index_part
597 1484 : .layer_metadata
598 1484 : .values()
599 17767 : .map(|ilmd| ilmd.file_size)
600 1484 : .sum()
601 : } else {
602 440 : 0
603 : };
604 1924 : self.metrics.remote_physical_size_gauge.set(size);
605 1924 : }
606 :
607 2 : pub fn get_remote_physical_size(&self) -> u64 {
608 2 : self.metrics.remote_physical_size_gauge.get()
609 2 : }
610 :
611 : //
612 : // Download operations.
613 : //
614 : // These don't use the per-timeline queue. They do use the global semaphore in
615 : // S3Bucket, to limit the total number of concurrent operations, though.
616 : //
617 :
618 : /// Download index file
619 20 : pub async fn download_index_file(
620 20 : &self,
621 20 : cancel: &CancellationToken,
622 20 : ) -> Result<MaybeDeletedIndexPart, DownloadError> {
623 20 : let _unfinished_gauge_guard = self.metrics.call_begin(
624 20 : &RemoteOpFileKind::Index,
625 20 : &RemoteOpKind::Download,
626 20 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
627 20 : reason: "no need for a downloads gauge",
628 20 : },
629 20 : );
630 :
631 20 : let (index_part, index_generation, index_last_modified) = download::download_index_part(
632 20 : &self.storage_impl,
633 20 : &self.tenant_shard_id,
634 20 : &self.timeline_id,
635 20 : self.generation,
636 20 : cancel,
637 20 : )
638 20 : .measure_remote_op(
639 20 : RemoteOpFileKind::Index,
640 20 : RemoteOpKind::Download,
641 20 : Arc::clone(&self.metrics),
642 20 : )
643 20 : .await?;
644 :
645 : // Defense in depth: monotonicity of generation numbers is an important correctness guarantee, so when we see a very
646 : // old index, we do extra checks in case this is the result of backward time-travel of the generation number (e.g.
647 : // in case of a bug in the service that issues generation numbers). Indices are allowed to be old, but we expect that
648 : // when we load an old index we are loading the _latest_ index: if we are asked to load an old index and there is
649 : // also a newer index available, that is surprising.
650 : const INDEX_AGE_CHECKS_THRESHOLD: Duration = Duration::from_secs(14 * 24 * 3600);
651 20 : let index_age = index_last_modified.elapsed().unwrap_or_else(|e| {
652 0 : if e.duration() > Duration::from_secs(5) {
653 : // We only warn if the S3 clock and our local clock are >5s out: because this is a low resolution
654 : // timestamp, it is common to be out by at least 1 second.
655 0 : tracing::warn!("Index has modification time in the future: {e}");
656 0 : }
657 0 : Duration::ZERO
658 20 : });
659 20 : if index_age > INDEX_AGE_CHECKS_THRESHOLD {
660 0 : tracing::info!(
661 : ?index_generation,
662 0 : age = index_age.as_secs_f64(),
663 0 : "Loaded an old index, checking for other indices..."
664 : );
665 :
666 : // Find the highest-generation index
667 0 : let (_latest_index_part, latest_index_generation, latest_index_mtime) =
668 0 : download::download_index_part(
669 0 : &self.storage_impl,
670 0 : &self.tenant_shard_id,
671 0 : &self.timeline_id,
672 0 : Generation::MAX,
673 0 : cancel,
674 0 : )
675 0 : .await?;
676 :
677 0 : if latest_index_generation > index_generation {
678 : // Unexpected! Why are we loading such an old index if a more recent one exists?
679 : // We will refuse to proceed, as there is no reasonable scenario where this should happen, but
680 : // there _is_ a clear bug/corruption scenario where it would happen (controller sets the generation
681 : // backwards).
682 0 : tracing::error!(
683 : ?index_generation,
684 : ?latest_index_generation,
685 : ?latest_index_mtime,
686 0 : "Found a newer index while loading an old one"
687 : );
688 0 : return Err(DownloadError::Fatal(
689 0 : "Index age exceeds threshold and a newer index exists".into(),
690 0 : ));
691 0 : }
692 20 : }
693 :
694 20 : if index_part.deleted_at.is_some() {
695 0 : Ok(MaybeDeletedIndexPart::Deleted(index_part))
696 : } else {
697 20 : Ok(MaybeDeletedIndexPart::IndexPart(index_part))
698 : }
699 20 : }
700 :
701 : /// Download a (layer) file from `path`, into local filesystem.
702 : ///
703 : /// 'layer_metadata' is the metadata from the remote index file.
704 : ///
705 : /// On success, returns the size of the downloaded file.
706 6 : pub async fn download_layer_file(
707 6 : &self,
708 6 : layer_file_name: &LayerName,
709 6 : layer_metadata: &LayerFileMetadata,
710 6 : local_path: &Utf8Path,
711 6 : gate: &utils::sync::gate::Gate,
712 6 : cancel: &CancellationToken,
713 6 : ctx: &RequestContext,
714 6 : ) -> Result<u64, DownloadError> {
715 6 : let downloaded_size = {
716 6 : let _unfinished_gauge_guard = self.metrics.call_begin(
717 6 : &RemoteOpFileKind::Layer,
718 6 : &RemoteOpKind::Download,
719 6 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
720 6 : reason: "no need for a downloads gauge",
721 6 : },
722 6 : );
723 6 : download::download_layer_file(
724 6 : self.conf,
725 6 : &self.storage_impl,
726 6 : self.tenant_shard_id,
727 6 : self.timeline_id,
728 6 : layer_file_name,
729 6 : layer_metadata,
730 6 : local_path,
731 6 : gate,
732 6 : cancel,
733 6 : ctx,
734 6 : )
735 6 : .measure_remote_op(
736 6 : RemoteOpFileKind::Layer,
737 6 : RemoteOpKind::Download,
738 6 : Arc::clone(&self.metrics),
739 6 : )
740 6 : .await?
741 : };
742 :
743 6 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
744 6 : REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
745 6 :
746 6 : Ok(downloaded_size)
747 6 : }
748 :
749 : //
750 : // Upload operations.
751 : //
752 :
753 : /// Launch an index-file upload operation in the background, with
754 : /// fully updated metadata.
755 : ///
756 : /// This should only be used to upload initial metadata to remote storage.
757 : ///
758 : /// The upload will be added to the queue immediately, but it
759 : /// won't be performed until all previously scheduled layer file
760 : /// upload operations have completed successfully. This is to
761 : /// ensure that when the index file claims that layers X, Y and Z
762 : /// exist in remote storage, they really do. To wait for the upload
763 : /// to complete, use `wait_completion`.
764 : ///
765 : /// If there were any changes to the list of files, i.e. if any
766 : /// layer file uploads were scheduled, since the last index file
767 : /// upload, those will be included too.
768 230 : pub fn schedule_index_upload_for_full_metadata_update(
769 230 : self: &Arc<Self>,
770 230 : metadata: &TimelineMetadata,
771 230 : ) -> anyhow::Result<()> {
772 230 : let mut guard = self.upload_queue.lock().unwrap();
773 230 : let upload_queue = guard.initialized_mut()?;
774 :
775 : // As documented in the struct definition, it's ok for latest_metadata to be
776 : // ahead of what's _actually_ on the remote during index upload.
777 230 : upload_queue.dirty.metadata = metadata.clone();
778 230 :
779 230 : self.schedule_index_upload(upload_queue);
780 230 :
781 230 : Ok(())
782 230 : }
783 :
784 : /// Launch an index-file upload operation in the background, with only parts of the metadata
785 : /// updated.
786 : ///
787 : /// This is the regular way of updating metadata on layer flushes or Gc.
788 : ///
789 : /// Using this lighter update mechanism allows for reparenting and detaching without changes to
790 : /// `index_part.json`, while being more clear on what values update regularly.
791 1222 : pub(crate) fn schedule_index_upload_for_metadata_update(
792 1222 : self: &Arc<Self>,
793 1222 : update: &MetadataUpdate,
794 1222 : ) -> anyhow::Result<()> {
795 1222 : let mut guard = self.upload_queue.lock().unwrap();
796 1222 : let upload_queue = guard.initialized_mut()?;
797 :
798 1222 : upload_queue.dirty.metadata.apply(update);
799 1222 :
800 1222 : self.schedule_index_upload(upload_queue);
801 1222 :
802 1222 : Ok(())
803 1222 : }
804 :
805 : /// Launch an index-file upload operation in the background, with only the `archived_at` field updated.
806 : ///
807 : /// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded,
808 : /// so either if the change is already sitting in the queue, but not commited yet, or the change has not
809 : /// been in the queue yet.
810 2 : pub(crate) fn schedule_index_upload_for_timeline_archival_state(
811 2 : self: &Arc<Self>,
812 2 : state: TimelineArchivalState,
813 2 : ) -> anyhow::Result<bool> {
814 2 : let mut guard = self.upload_queue.lock().unwrap();
815 2 : let upload_queue = guard.initialized_mut()?;
816 :
817 : /// Returns Some(_) if a change is needed, and Some(true) if it's a
818 : /// change needed to set archived_at.
819 4 : fn need_change(
820 4 : archived_at: &Option<NaiveDateTime>,
821 4 : state: TimelineArchivalState,
822 4 : ) -> Option<bool> {
823 4 : match (archived_at, state) {
824 : (Some(_), TimelineArchivalState::Archived)
825 : | (None, TimelineArchivalState::Unarchived) => {
826 : // Nothing to do
827 0 : tracing::info!("intended state matches present state");
828 0 : None
829 : }
830 4 : (None, TimelineArchivalState::Archived) => Some(true),
831 0 : (Some(_), TimelineArchivalState::Unarchived) => Some(false),
832 : }
833 4 : }
834 2 : let need_upload_scheduled = need_change(&upload_queue.dirty.archived_at, state);
835 :
836 2 : if let Some(archived_at_set) = need_upload_scheduled {
837 2 : let intended_archived_at = archived_at_set.then(|| Utc::now().naive_utc());
838 2 : upload_queue.dirty.archived_at = intended_archived_at;
839 2 : self.schedule_index_upload(upload_queue);
840 2 : }
841 :
842 2 : let need_wait = need_change(&upload_queue.clean.0.archived_at, state).is_some();
843 2 : Ok(need_wait)
844 2 : }
845 :
846 : /// Shuts the timeline client down, but only if the timeline is archived.
847 : ///
848 : /// This function and [`Self::schedule_index_upload_for_timeline_archival_state`] use the
849 : /// same lock to prevent races between unarchival and offloading: unarchival requires the
850 : /// upload queue to be initialized, and leaves behind an upload queue where either dirty
851 : /// or clean has archived_at of `None`. offloading leaves behind an uninitialized upload
852 : /// queue.
853 2 : pub(crate) async fn shutdown_if_archived(
854 2 : self: &Arc<Self>,
855 2 : ) -> Result<(), ShutdownIfArchivedError> {
856 2 : {
857 2 : let mut guard = self.upload_queue.lock().unwrap();
858 2 : let upload_queue = guard
859 2 : .initialized_mut()
860 2 : .map_err(ShutdownIfArchivedError::NotInitialized)?;
861 :
862 2 : match (
863 2 : upload_queue.dirty.archived_at.is_none(),
864 2 : upload_queue.clean.0.archived_at.is_none(),
865 2 : ) {
866 : // The expected case: the timeline is archived and we don't want to unarchive
867 2 : (false, false) => {}
868 : (true, false) => {
869 0 : tracing::info!("can't shut down timeline: timeline slated for unarchival");
870 0 : return Err(ShutdownIfArchivedError::NotArchived);
871 : }
872 0 : (dirty_archived, true) => {
873 0 : tracing::info!(%dirty_archived, "can't shut down timeline: timeline not archived in remote storage");
874 0 : return Err(ShutdownIfArchivedError::NotArchived);
875 : }
876 : }
877 :
878 : // Set the shutting_down flag while the guard from the archival check is held.
879 : // This prevents a race with unarchival, as initialized_mut will not return
880 : // an upload queue from this point.
881 : // Also launch the queued tasks like shutdown() does.
882 2 : if !upload_queue.shutting_down {
883 2 : upload_queue.shutting_down = true;
884 2 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
885 2 : // this operation is not counted similar to Barrier
886 2 : self.launch_queued_tasks(upload_queue);
887 2 : }
888 : }
889 :
890 2 : self.shutdown().await;
891 :
892 2 : Ok(())
893 2 : }
894 :
895 : /// Launch an index-file upload operation in the background, setting `import_pgdata` field.
896 0 : pub(crate) fn schedule_index_upload_for_import_pgdata_state_update(
897 0 : self: &Arc<Self>,
898 0 : state: Option<import_pgdata::index_part_format::Root>,
899 0 : ) -> anyhow::Result<()> {
900 0 : let mut guard = self.upload_queue.lock().unwrap();
901 0 : let upload_queue = guard.initialized_mut()?;
902 0 : upload_queue.dirty.import_pgdata = state;
903 0 : self.schedule_index_upload(upload_queue);
904 0 : Ok(())
905 0 : }
906 :
907 : ///
908 : /// Launch an index-file upload operation in the background, if necessary.
909 : ///
910 : /// Use this function to schedule the update of the index file after
911 : /// scheduling file uploads or deletions. If no file uploads or deletions
912 : /// have been scheduled since the last index file upload, this does
913 : /// nothing.
914 : ///
915 : /// Like schedule_index_upload_for_metadata_update(), this merely adds
916 : /// the upload to the upload queue and returns quickly.
917 370 : pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> Result<(), NotInitialized> {
918 370 : let mut guard = self.upload_queue.lock().unwrap();
919 370 : let upload_queue = guard.initialized_mut()?;
920 :
921 370 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
922 14 : self.schedule_index_upload(upload_queue);
923 356 : }
924 :
925 370 : Ok(())
926 370 : }
927 :
928 : /// Launch an index-file upload operation in the background (internal function)
929 1538 : fn schedule_index_upload(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
930 1538 : let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn();
931 1538 : // fix up the duplicated field
932 1538 : upload_queue.dirty.disk_consistent_lsn = disk_consistent_lsn;
933 1538 :
934 1538 : // make sure it serializes before doing it in perform_upload_task so that it doesn't
935 1538 : // look like a retryable error
936 1538 : let void = std::io::sink();
937 1538 : serde_json::to_writer(void, &upload_queue.dirty).expect("serialize index_part.json");
938 1538 :
939 1538 : let index_part = &upload_queue.dirty;
940 1538 :
941 1538 : info!(
942 0 : "scheduling metadata upload up to consistent LSN {disk_consistent_lsn} with {} files ({} changed)",
943 0 : index_part.layer_metadata.len(),
944 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
945 : );
946 :
947 1538 : let op = UploadOp::UploadMetadata {
948 1538 : uploaded: Box::new(index_part.clone()),
949 1538 : };
950 1538 : self.metric_begin(&op);
951 1538 : upload_queue.queued_operations.push_back(op);
952 1538 : upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
953 1538 :
954 1538 : // Launch the task immediately, if possible
955 1538 : self.launch_queued_tasks(upload_queue);
956 1538 : }
957 :
958 : /// Reparent this timeline to a new parent.
959 : ///
960 : /// A retryable step of timeline ancestor detach.
961 0 : pub(crate) async fn schedule_reparenting_and_wait(
962 0 : self: &Arc<Self>,
963 0 : new_parent: &TimelineId,
964 0 : ) -> anyhow::Result<()> {
965 0 : let receiver = {
966 0 : let mut guard = self.upload_queue.lock().unwrap();
967 0 : let upload_queue = guard.initialized_mut()?;
968 :
969 0 : let Some(prev) = upload_queue.dirty.metadata.ancestor_timeline() else {
970 0 : return Err(anyhow::anyhow!(
971 0 : "cannot reparent without a current ancestor"
972 0 : ));
973 : };
974 :
975 0 : let uploaded = &upload_queue.clean.0.metadata;
976 0 :
977 0 : if uploaded.ancestor_timeline().is_none() && !uploaded.ancestor_lsn().is_valid() {
978 : // nothing to do
979 0 : None
980 : } else {
981 0 : upload_queue.dirty.metadata.reparent(new_parent);
982 0 : upload_queue.dirty.lineage.record_previous_ancestor(&prev);
983 0 :
984 0 : self.schedule_index_upload(upload_queue);
985 0 :
986 0 : Some(self.schedule_barrier0(upload_queue))
987 : }
988 : };
989 :
990 0 : if let Some(receiver) = receiver {
991 0 : Self::wait_completion0(receiver).await?;
992 0 : }
993 0 : Ok(())
994 0 : }
995 :
996 : /// Schedules uploading a new version of `index_part.json` with the given layers added,
997 : /// detaching from ancestor and waits for it to complete.
998 : ///
999 : /// This is used with `Timeline::detach_ancestor` functionality.
1000 0 : pub(crate) async fn schedule_adding_existing_layers_to_index_detach_and_wait(
1001 0 : self: &Arc<Self>,
1002 0 : layers: &[Layer],
1003 0 : adopted: (TimelineId, Lsn),
1004 0 : ) -> anyhow::Result<()> {
1005 0 : let barrier = {
1006 0 : let mut guard = self.upload_queue.lock().unwrap();
1007 0 : let upload_queue = guard.initialized_mut()?;
1008 :
1009 0 : if upload_queue.clean.0.lineage.detached_previous_ancestor() == Some(adopted) {
1010 0 : None
1011 : } else {
1012 0 : upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
1013 0 : upload_queue.dirty.lineage.record_detaching(&adopted);
1014 :
1015 0 : for layer in layers {
1016 0 : let prev = upload_queue
1017 0 : .dirty
1018 0 : .layer_metadata
1019 0 : .insert(layer.layer_desc().layer_name(), layer.metadata());
1020 0 : assert!(prev.is_none(), "copied layer existed already {layer}");
1021 : }
1022 :
1023 0 : self.schedule_index_upload(upload_queue);
1024 0 :
1025 0 : Some(self.schedule_barrier0(upload_queue))
1026 : }
1027 : };
1028 :
1029 0 : if let Some(barrier) = barrier {
1030 0 : Self::wait_completion0(barrier).await?;
1031 0 : }
1032 0 : Ok(())
1033 0 : }
1034 :
1035 : /// Adds a gc blocking reason for this timeline if one does not exist already.
1036 : ///
1037 : /// A retryable step of timeline detach ancestor.
1038 : ///
1039 : /// Returns a future which waits until the completion of the upload.
1040 0 : pub(crate) fn schedule_insert_gc_block_reason(
1041 0 : self: &Arc<Self>,
1042 0 : reason: index::GcBlockingReason,
1043 0 : ) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
1044 0 : {
1045 0 : let maybe_barrier = {
1046 0 : let mut guard = self.upload_queue.lock().unwrap();
1047 0 : let upload_queue = guard.initialized_mut()?;
1048 :
1049 0 : if let index::GcBlockingReason::DetachAncestor = reason {
1050 0 : if upload_queue.dirty.metadata.ancestor_timeline().is_none() {
1051 0 : drop(guard);
1052 0 : panic!("cannot start detach ancestor if there is nothing to detach from");
1053 0 : }
1054 0 : }
1055 :
1056 0 : let wanted = |x: Option<&index::GcBlocking>| x.is_some_and(|x| x.blocked_by(reason));
1057 :
1058 0 : let current = upload_queue.dirty.gc_blocking.as_ref();
1059 0 : let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
1060 0 :
1061 0 : match (current, uploaded) {
1062 0 : (x, y) if wanted(x) && wanted(y) => None,
1063 0 : (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
1064 : // Usual case: !wanted(x) && !wanted(y)
1065 : //
1066 : // Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to
1067 : // turn on and off some reason.
1068 0 : (x, y) => {
1069 0 : if !wanted(x) && wanted(y) {
1070 : // this could be avoided by having external in-memory synchronization, like
1071 : // timeline detach ancestor
1072 0 : warn!(?reason, op="insert", "unexpected: two racing processes to enable and disable a gc blocking reason");
1073 0 : }
1074 :
1075 : // at this point, the metadata must always show that there is a parent
1076 0 : upload_queue.dirty.gc_blocking = current
1077 0 : .map(|x| x.with_reason(reason))
1078 0 : .or_else(|| Some(index::GcBlocking::started_now_for(reason)));
1079 0 : self.schedule_index_upload(upload_queue);
1080 0 : Some(self.schedule_barrier0(upload_queue))
1081 : }
1082 : }
1083 : };
1084 :
1085 0 : Ok(async move {
1086 0 : if let Some(barrier) = maybe_barrier {
1087 0 : Self::wait_completion0(barrier).await?;
1088 0 : }
1089 0 : Ok(())
1090 0 : })
1091 0 : }
1092 :
1093 : /// Removes a gc blocking reason for this timeline if one exists.
1094 : ///
1095 : /// A retryable step of timeline detach ancestor.
1096 : ///
1097 : /// Returns a future which waits until the completion of the upload.
1098 0 : pub(crate) fn schedule_remove_gc_block_reason(
1099 0 : self: &Arc<Self>,
1100 0 : reason: index::GcBlockingReason,
1101 0 : ) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
1102 0 : {
1103 0 : let maybe_barrier = {
1104 0 : let mut guard = self.upload_queue.lock().unwrap();
1105 0 : let upload_queue = guard.initialized_mut()?;
1106 :
1107 0 : if let index::GcBlockingReason::DetachAncestor = reason {
1108 0 : if !upload_queue.clean.0.lineage.is_detached_from_ancestor() {
1109 0 : drop(guard);
1110 0 : panic!("cannot complete timeline_ancestor_detach while not detached");
1111 0 : }
1112 0 : }
1113 :
1114 0 : let wanted = |x: Option<&index::GcBlocking>| {
1115 0 : x.is_none() || x.is_some_and(|b| !b.blocked_by(reason))
1116 0 : };
1117 :
1118 0 : let current = upload_queue.dirty.gc_blocking.as_ref();
1119 0 : let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
1120 0 :
1121 0 : match (current, uploaded) {
1122 0 : (x, y) if wanted(x) && wanted(y) => None,
1123 0 : (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
1124 0 : (x, y) => {
1125 0 : if !wanted(x) && wanted(y) {
1126 0 : warn!(?reason, op="remove", "unexpected: two racing processes to enable and disable a gc blocking reason (remove)");
1127 0 : }
1128 :
1129 0 : upload_queue.dirty.gc_blocking =
1130 0 : current.as_ref().and_then(|x| x.without_reason(reason));
1131 0 : assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
1132 0 : self.schedule_index_upload(upload_queue);
1133 0 : Some(self.schedule_barrier0(upload_queue))
1134 : }
1135 : }
1136 : };
1137 :
1138 0 : Ok(async move {
1139 0 : if let Some(barrier) = maybe_barrier {
1140 0 : Self::wait_completion0(barrier).await?;
1141 0 : }
1142 0 : Ok(())
1143 0 : })
1144 0 : }
1145 :
1146 : /// Launch an upload operation in the background; the file is added to be included in next
1147 : /// `index_part.json` upload.
1148 1218 : pub(crate) fn schedule_layer_file_upload(
1149 1218 : self: &Arc<Self>,
1150 1218 : layer: ResidentLayer,
1151 1218 : ) -> Result<(), NotInitialized> {
1152 1218 : let mut guard = self.upload_queue.lock().unwrap();
1153 1218 : let upload_queue = guard.initialized_mut()?;
1154 :
1155 1218 : self.schedule_layer_file_upload0(upload_queue, layer);
1156 1218 : self.launch_queued_tasks(upload_queue);
1157 1218 : Ok(())
1158 1218 : }
1159 :
1160 1580 : fn schedule_layer_file_upload0(
1161 1580 : self: &Arc<Self>,
1162 1580 : upload_queue: &mut UploadQueueInitialized,
1163 1580 : layer: ResidentLayer,
1164 1580 : ) {
1165 1580 : let metadata = layer.metadata();
1166 1580 :
1167 1580 : upload_queue
1168 1580 : .dirty
1169 1580 : .layer_metadata
1170 1580 : .insert(layer.layer_desc().layer_name(), metadata.clone());
1171 1580 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
1172 1580 :
1173 1580 : info!(
1174 : gen=?metadata.generation,
1175 : shard=?metadata.shard,
1176 0 : "scheduled layer file upload {layer}",
1177 : );
1178 :
1179 1580 : let op = UploadOp::UploadLayer(layer, metadata, None);
1180 1580 : self.metric_begin(&op);
1181 1580 : upload_queue.queued_operations.push_back(op);
1182 1580 : }
1183 :
1184 : /// Launch a delete operation in the background.
1185 : ///
1186 : /// The operation does not modify local filesystem state.
1187 : ///
1188 : /// Note: This schedules an index file upload before the deletions. The
1189 : /// deletion won't actually be performed, until all previously scheduled
1190 : /// upload operations, and the index file upload, have completed
1191 : /// successfully.
1192 8 : pub fn schedule_layer_file_deletion(
1193 8 : self: &Arc<Self>,
1194 8 : names: &[LayerName],
1195 8 : ) -> anyhow::Result<()> {
1196 8 : let mut guard = self.upload_queue.lock().unwrap();
1197 8 : let upload_queue = guard.initialized_mut()?;
1198 :
1199 8 : let with_metadata =
1200 8 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
1201 8 :
1202 8 : self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
1203 8 :
1204 8 : // Launch the tasks immediately, if possible
1205 8 : self.launch_queued_tasks(upload_queue);
1206 8 : Ok(())
1207 8 : }
1208 :
1209 : /// Unlinks the layer files from `index_part.json` but does not yet schedule deletion for the
1210 : /// layer files, leaving them dangling.
1211 : ///
1212 : /// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
1213 : /// is invoked on them.
1214 4 : pub(crate) fn schedule_gc_update(
1215 4 : self: &Arc<Self>,
1216 4 : gc_layers: &[Layer],
1217 4 : ) -> Result<(), NotInitialized> {
1218 4 : let mut guard = self.upload_queue.lock().unwrap();
1219 4 : let upload_queue = guard.initialized_mut()?;
1220 :
1221 : // just forget the return value; after uploading the next index_part.json, we can consider
1222 : // the layer files as "dangling". this is fine, at worst case we create work for the
1223 : // scrubber.
1224 :
1225 4 : let names = gc_layers.iter().map(|x| x.layer_desc().layer_name());
1226 4 :
1227 4 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
1228 4 :
1229 4 : self.launch_queued_tasks(upload_queue);
1230 4 :
1231 4 : Ok(())
1232 4 : }
1233 :
1234 : /// Update the remote index file, removing the to-be-deleted files from the index,
1235 : /// allowing scheduling of actual deletions later.
1236 88 : fn schedule_unlinking_of_layers_from_index_part0<I>(
1237 88 : self: &Arc<Self>,
1238 88 : upload_queue: &mut UploadQueueInitialized,
1239 88 : names: I,
1240 88 : ) -> Vec<(LayerName, LayerFileMetadata)>
1241 88 : where
1242 88 : I: IntoIterator<Item = LayerName>,
1243 88 : {
1244 88 : // Decorate our list of names with each name's metadata, dropping
1245 88 : // names that are unexpectedly missing from our metadata. This metadata
1246 88 : // is later used when physically deleting layers, to construct key paths.
1247 88 : let with_metadata: Vec<_> = names
1248 88 : .into_iter()
1249 512 : .filter_map(|name| {
1250 512 : let meta = upload_queue.dirty.layer_metadata.remove(&name);
1251 :
1252 512 : if let Some(meta) = meta {
1253 446 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
1254 446 : Some((name, meta))
1255 : } else {
1256 : // This can only happen if we forgot to to schedule the file upload
1257 : // before scheduling the delete. Log it because it is a rare/strange
1258 : // situation, and in case something is misbehaving, we'd like to know which
1259 : // layers experienced this.
1260 66 : info!("Deleting layer {name} not found in latest_files list, never uploaded?");
1261 66 : None
1262 : }
1263 512 : })
1264 88 : .collect();
1265 :
1266 : #[cfg(feature = "testing")]
1267 534 : for (name, metadata) in &with_metadata {
1268 446 : let gen = metadata.generation;
1269 446 : if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen) {
1270 0 : if unexpected == gen {
1271 0 : tracing::error!("{name} was unlinked twice with same generation");
1272 : } else {
1273 0 : tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
1274 : }
1275 446 : }
1276 : }
1277 :
1278 : // after unlinking files from the upload_queue.latest_files we must always schedule an
1279 : // index_part update, because that needs to be uploaded before we can actually delete the
1280 : // files.
1281 88 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
1282 70 : self.schedule_index_upload(upload_queue);
1283 70 : }
1284 :
1285 88 : with_metadata
1286 88 : }
1287 :
1288 : /// Schedules deletion for layer files which have previously been unlinked from the
1289 : /// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
1290 510 : pub(crate) fn schedule_deletion_of_unlinked(
1291 510 : self: &Arc<Self>,
1292 510 : layers: Vec<(LayerName, LayerFileMetadata)>,
1293 510 : ) -> anyhow::Result<()> {
1294 510 : let mut guard = self.upload_queue.lock().unwrap();
1295 510 : let upload_queue = guard.initialized_mut()?;
1296 :
1297 510 : self.schedule_deletion_of_unlinked0(upload_queue, layers);
1298 510 : self.launch_queued_tasks(upload_queue);
1299 510 : Ok(())
1300 510 : }
1301 :
1302 516 : fn schedule_deletion_of_unlinked0(
1303 516 : self: &Arc<Self>,
1304 516 : upload_queue: &mut UploadQueueInitialized,
1305 516 : mut with_metadata: Vec<(LayerName, LayerFileMetadata)>,
1306 516 : ) {
1307 516 : // Filter out any layers which were not created by this tenant shard. These are
1308 516 : // layers that originate from some ancestor shard after a split, and may still
1309 516 : // be referenced by other shards. We are free to delete them locally and remove
1310 516 : // them from our index (and would have already done so when we reach this point
1311 516 : // in the code), but we may not delete them remotely.
1312 516 : with_metadata.retain(|(name, meta)| {
1313 510 : let retain = meta.shard.shard_number == self.tenant_shard_id.shard_number
1314 510 : && meta.shard.shard_count == self.tenant_shard_id.shard_count;
1315 510 : if !retain {
1316 0 : tracing::debug!(
1317 0 : "Skipping deletion of ancestor-shard layer {name}, from shard {}",
1318 : meta.shard
1319 : );
1320 510 : }
1321 510 : retain
1322 516 : });
1323 :
1324 1026 : for (name, meta) in &with_metadata {
1325 510 : info!(
1326 0 : "scheduling deletion of layer {}{} (shard {})",
1327 0 : name,
1328 0 : meta.generation.get_suffix(),
1329 : meta.shard
1330 : );
1331 : }
1332 :
1333 : #[cfg(feature = "testing")]
1334 1026 : for (name, meta) in &with_metadata {
1335 510 : let gen = meta.generation;
1336 510 : match upload_queue.dangling_files.remove(name) {
1337 440 : Some(same) if same == gen => { /* expected */ }
1338 0 : Some(other) => {
1339 0 : tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
1340 : }
1341 : None => {
1342 70 : tracing::error!("{name} was unlinked but was not dangling");
1343 : }
1344 : }
1345 : }
1346 :
1347 : // schedule the actual deletions
1348 516 : if with_metadata.is_empty() {
1349 : // avoid scheduling the op & bumping the metric
1350 6 : return;
1351 510 : }
1352 510 : let op = UploadOp::Delete(Delete {
1353 510 : layers: with_metadata,
1354 510 : });
1355 510 : self.metric_begin(&op);
1356 510 : upload_queue.queued_operations.push_back(op);
1357 516 : }
1358 :
1359 : /// Schedules a compaction update to the remote `index_part.json`.
1360 : ///
1361 : /// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
1362 76 : pub(crate) fn schedule_compaction_update(
1363 76 : self: &Arc<Self>,
1364 76 : compacted_from: &[Layer],
1365 76 : compacted_to: &[ResidentLayer],
1366 76 : ) -> Result<(), NotInitialized> {
1367 76 : let mut guard = self.upload_queue.lock().unwrap();
1368 76 : let upload_queue = guard.initialized_mut()?;
1369 :
1370 438 : for layer in compacted_to {
1371 362 : self.schedule_layer_file_upload0(upload_queue, layer.clone());
1372 362 : }
1373 :
1374 506 : let names = compacted_from.iter().map(|x| x.layer_desc().layer_name());
1375 76 :
1376 76 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
1377 76 : self.launch_queued_tasks(upload_queue);
1378 76 :
1379 76 : Ok(())
1380 76 : }
1381 :
1382 : /// Wait for all previously scheduled uploads/deletions to complete
1383 1386 : pub(crate) async fn wait_completion(self: &Arc<Self>) -> Result<(), WaitCompletionError> {
1384 1386 : let receiver = {
1385 1386 : let mut guard = self.upload_queue.lock().unwrap();
1386 1386 : let upload_queue = guard
1387 1386 : .initialized_mut()
1388 1386 : .map_err(WaitCompletionError::NotInitialized)?;
1389 1386 : self.schedule_barrier0(upload_queue)
1390 1386 : };
1391 1386 :
1392 1386 : Self::wait_completion0(receiver).await
1393 1386 : }
1394 :
1395 1386 : async fn wait_completion0(
1396 1386 : mut receiver: tokio::sync::watch::Receiver<()>,
1397 1386 : ) -> Result<(), WaitCompletionError> {
1398 1386 : if receiver.changed().await.is_err() {
1399 0 : return Err(WaitCompletionError::UploadQueueShutDownOrStopped);
1400 1386 : }
1401 1386 :
1402 1386 : Ok(())
1403 1386 : }
1404 :
1405 6 : pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
1406 6 : let mut guard = self.upload_queue.lock().unwrap();
1407 6 : let upload_queue = guard.initialized_mut()?;
1408 6 : self.schedule_barrier0(upload_queue);
1409 6 : Ok(())
1410 6 : }
1411 :
1412 1392 : fn schedule_barrier0(
1413 1392 : self: &Arc<Self>,
1414 1392 : upload_queue: &mut UploadQueueInitialized,
1415 1392 : ) -> tokio::sync::watch::Receiver<()> {
1416 1392 : let (sender, receiver) = tokio::sync::watch::channel(());
1417 1392 : let barrier_op = UploadOp::Barrier(sender);
1418 1392 :
1419 1392 : upload_queue.queued_operations.push_back(barrier_op);
1420 1392 : // Don't count this kind of operation!
1421 1392 :
1422 1392 : // Launch the task immediately, if possible
1423 1392 : self.launch_queued_tasks(upload_queue);
1424 1392 :
1425 1392 : receiver
1426 1392 : }
1427 :
1428 : /// Wait for all previously scheduled operations to complete, and then stop.
1429 : ///
1430 : /// Not cancellation safe
1431 10 : pub(crate) async fn shutdown(self: &Arc<Self>) {
1432 10 : // On cancellation the queue is left in ackward state of refusing new operations but
1433 10 : // proper stop is yet to be called. On cancel the original or some later task must call
1434 10 : // `stop` or `shutdown`.
1435 10 : let sg = scopeguard::guard((), |_| {
1436 0 : tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
1437 10 : });
1438 :
1439 8 : let fut = {
1440 10 : let mut guard = self.upload_queue.lock().unwrap();
1441 10 : let upload_queue = match &mut *guard {
1442 : UploadQueue::Stopped(_) => {
1443 2 : scopeguard::ScopeGuard::into_inner(sg);
1444 2 : return;
1445 : }
1446 : UploadQueue::Uninitialized => {
1447 : // transition into Stopped state
1448 0 : self.stop_impl(&mut guard);
1449 0 : scopeguard::ScopeGuard::into_inner(sg);
1450 0 : return;
1451 : }
1452 8 : UploadQueue::Initialized(ref mut init) => init,
1453 8 : };
1454 8 :
1455 8 : // if the queue is already stuck due to a shutdown operation which was cancelled, then
1456 8 : // just don't add more of these as they would never complete.
1457 8 : //
1458 8 : // TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue
1459 8 : // in every place we would not have to jump through this hoop, and this method could be
1460 8 : // made cancellable.
1461 8 : if !upload_queue.shutting_down {
1462 6 : upload_queue.shutting_down = true;
1463 6 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
1464 6 : // this operation is not counted similar to Barrier
1465 6 :
1466 6 : self.launch_queued_tasks(upload_queue);
1467 6 : }
1468 :
1469 8 : upload_queue.shutdown_ready.clone().acquire_owned()
1470 : };
1471 :
1472 8 : let res = fut.await;
1473 :
1474 8 : scopeguard::ScopeGuard::into_inner(sg);
1475 8 :
1476 8 : match res {
1477 0 : Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
1478 8 : Err(_closed) => {
1479 8 : // expected
1480 8 : }
1481 8 : }
1482 8 :
1483 8 : self.stop();
1484 10 : }
1485 :
1486 : /// Set the deleted_at field in the remote index file.
1487 : ///
1488 : /// This fails if the upload queue has not been `stop()`ed.
1489 : ///
1490 : /// The caller is responsible for calling `stop()` AND for waiting
1491 : /// for any ongoing upload tasks to finish after `stop()` has succeeded.
1492 : /// Check method [`RemoteTimelineClient::stop`] for details.
1493 : #[instrument(skip_all)]
1494 : pub(crate) async fn persist_index_part_with_deleted_flag(
1495 : self: &Arc<Self>,
1496 : ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
1497 : let index_part_with_deleted_at = {
1498 : let mut locked = self.upload_queue.lock().unwrap();
1499 :
1500 : // We must be in stopped state because otherwise
1501 : // we can have inprogress index part upload that can overwrite the file
1502 : // with missing is_deleted flag that we going to set below
1503 : let stopped = locked.stopped_mut()?;
1504 :
1505 : match stopped.deleted_at {
1506 : SetDeletedFlagProgress::NotRunning => (), // proceed
1507 : SetDeletedFlagProgress::InProgress(at) => {
1508 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
1509 : }
1510 : SetDeletedFlagProgress::Successful(at) => {
1511 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
1512 : }
1513 : };
1514 : let deleted_at = Utc::now().naive_utc();
1515 : stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
1516 :
1517 : let mut index_part = stopped.upload_queue_for_deletion.dirty.clone();
1518 : index_part.deleted_at = Some(deleted_at);
1519 : index_part
1520 : };
1521 :
1522 0 : let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
1523 0 : let mut locked = self_clone.upload_queue.lock().unwrap();
1524 0 : let stopped = locked
1525 0 : .stopped_mut()
1526 0 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1527 0 : stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
1528 0 : });
1529 :
1530 : pausable_failpoint!("persist_deleted_index_part");
1531 :
1532 : backoff::retry(
1533 0 : || {
1534 0 : upload::upload_index_part(
1535 0 : &self.storage_impl,
1536 0 : &self.tenant_shard_id,
1537 0 : &self.timeline_id,
1538 0 : self.generation,
1539 0 : &index_part_with_deleted_at,
1540 0 : &self.cancel,
1541 0 : )
1542 0 : },
1543 0 : |_e| false,
1544 : 1,
1545 : // have just a couple of attempts
1546 : // when executed as part of timeline deletion this happens in context of api call
1547 : // when executed as part of tenant deletion this happens in the background
1548 : 2,
1549 : "persist_index_part_with_deleted_flag",
1550 : &self.cancel,
1551 : )
1552 : .await
1553 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1554 0 : .and_then(|x| x)?;
1555 :
1556 : // all good, disarm the guard and mark as success
1557 : ScopeGuard::into_inner(undo_deleted_at);
1558 : {
1559 : let mut locked = self.upload_queue.lock().unwrap();
1560 :
1561 : let stopped = locked
1562 : .stopped_mut()
1563 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1564 : stopped.deleted_at = SetDeletedFlagProgress::Successful(
1565 : index_part_with_deleted_at
1566 : .deleted_at
1567 : .expect("we set it above"),
1568 : );
1569 : }
1570 :
1571 : Ok(())
1572 : }
1573 :
1574 0 : pub(crate) fn is_deleting(&self) -> bool {
1575 0 : let mut locked = self.upload_queue.lock().unwrap();
1576 0 : locked.stopped_mut().is_ok()
1577 0 : }
1578 :
1579 0 : pub(crate) async fn preserve_initdb_archive(
1580 0 : self: &Arc<Self>,
1581 0 : tenant_id: &TenantId,
1582 0 : timeline_id: &TimelineId,
1583 0 : cancel: &CancellationToken,
1584 0 : ) -> anyhow::Result<()> {
1585 0 : backoff::retry(
1586 0 : || async {
1587 0 : upload::preserve_initdb_archive(&self.storage_impl, tenant_id, timeline_id, cancel)
1588 0 : .await
1589 0 : },
1590 0 : TimeoutOrCancel::caused_by_cancel,
1591 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
1592 0 : FAILED_REMOTE_OP_RETRIES,
1593 0 : "preserve_initdb_tar_zst",
1594 0 : &cancel.clone(),
1595 0 : )
1596 0 : .await
1597 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1598 0 : .and_then(|x| x)
1599 0 : .context("backing up initdb archive")?;
1600 0 : Ok(())
1601 0 : }
1602 :
1603 : /// Uploads the given layer **without** adding it to be part of a future `index_part.json` upload.
1604 : ///
1605 : /// This is not normally needed.
1606 0 : pub(crate) async fn upload_layer_file(
1607 0 : self: &Arc<Self>,
1608 0 : uploaded: &ResidentLayer,
1609 0 : cancel: &CancellationToken,
1610 0 : ) -> anyhow::Result<()> {
1611 0 : let remote_path = remote_layer_path(
1612 0 : &self.tenant_shard_id.tenant_id,
1613 0 : &self.timeline_id,
1614 0 : uploaded.metadata().shard,
1615 0 : &uploaded.layer_desc().layer_name(),
1616 0 : uploaded.metadata().generation,
1617 0 : );
1618 0 :
1619 0 : backoff::retry(
1620 0 : || async {
1621 0 : upload::upload_timeline_layer(
1622 0 : &self.storage_impl,
1623 0 : uploaded.local_path(),
1624 0 : &remote_path,
1625 0 : uploaded.metadata().file_size,
1626 0 : cancel,
1627 0 : )
1628 0 : .await
1629 0 : },
1630 0 : TimeoutOrCancel::caused_by_cancel,
1631 0 : FAILED_UPLOAD_WARN_THRESHOLD,
1632 0 : FAILED_REMOTE_OP_RETRIES,
1633 0 : "upload a layer without adding it to latest files",
1634 0 : cancel,
1635 0 : )
1636 0 : .await
1637 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1638 0 : .and_then(|x| x)
1639 0 : .context("upload a layer without adding it to latest files")
1640 0 : }
1641 :
1642 : /// Copies the `adopted` remote existing layer to the remote path of `adopted_as`. The layer is
1643 : /// not added to be part of a future `index_part.json` upload.
1644 0 : pub(crate) async fn copy_timeline_layer(
1645 0 : self: &Arc<Self>,
1646 0 : adopted: &Layer,
1647 0 : adopted_as: &Layer,
1648 0 : cancel: &CancellationToken,
1649 0 : ) -> anyhow::Result<()> {
1650 0 : let source_remote_path = remote_layer_path(
1651 0 : &self.tenant_shard_id.tenant_id,
1652 0 : &adopted
1653 0 : .get_timeline_id()
1654 0 : .expect("Source timeline should be alive"),
1655 0 : adopted.metadata().shard,
1656 0 : &adopted.layer_desc().layer_name(),
1657 0 : adopted.metadata().generation,
1658 0 : );
1659 0 :
1660 0 : let target_remote_path = remote_layer_path(
1661 0 : &self.tenant_shard_id.tenant_id,
1662 0 : &self.timeline_id,
1663 0 : adopted_as.metadata().shard,
1664 0 : &adopted_as.layer_desc().layer_name(),
1665 0 : adopted_as.metadata().generation,
1666 0 : );
1667 0 :
1668 0 : backoff::retry(
1669 0 : || async {
1670 0 : upload::copy_timeline_layer(
1671 0 : &self.storage_impl,
1672 0 : &source_remote_path,
1673 0 : &target_remote_path,
1674 0 : cancel,
1675 0 : )
1676 0 : .await
1677 0 : },
1678 0 : TimeoutOrCancel::caused_by_cancel,
1679 0 : FAILED_UPLOAD_WARN_THRESHOLD,
1680 0 : FAILED_REMOTE_OP_RETRIES,
1681 0 : "copy timeline layer",
1682 0 : cancel,
1683 0 : )
1684 0 : .await
1685 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1686 0 : .and_then(|x| x)
1687 0 : .context("remote copy timeline layer")
1688 0 : }
1689 :
1690 0 : async fn flush_deletion_queue(&self) -> Result<(), DeletionQueueError> {
1691 0 : match tokio::time::timeout(
1692 0 : DELETION_QUEUE_FLUSH_TIMEOUT,
1693 0 : self.deletion_queue_client.flush_immediate(),
1694 0 : )
1695 0 : .await
1696 : {
1697 0 : Ok(result) => result,
1698 0 : Err(_timeout) => {
1699 0 : // Flushing remote deletions is not mandatory: we flush here to make the system easier to test, and
1700 0 : // to ensure that _usually_ objects are really gone after a DELETE is acked. However, in case of deletion
1701 0 : // queue issues (https://github.com/neondatabase/neon/issues/6440), we don't want to wait indefinitely here.
1702 0 : tracing::warn!(
1703 0 : "Timed out waiting for deletion queue flush, acking deletion anyway"
1704 : );
1705 0 : Ok(())
1706 : }
1707 : }
1708 0 : }
1709 :
1710 : /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
1711 : /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
1712 : /// deletes leaked files if any and proceeds with deletion of index file at the end.
1713 0 : pub(crate) async fn delete_all(self: &Arc<Self>) -> Result<(), DeleteTimelineError> {
1714 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1715 :
1716 0 : let layers: Vec<RemotePath> = {
1717 0 : let mut locked = self.upload_queue.lock().unwrap();
1718 0 : let stopped = locked.stopped_mut().map_err(DeleteTimelineError::Other)?;
1719 :
1720 0 : if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
1721 0 : return Err(DeleteTimelineError::Other(anyhow::anyhow!(
1722 0 : "deleted_at is not set"
1723 0 : )));
1724 0 : }
1725 0 :
1726 0 : debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
1727 :
1728 0 : stopped
1729 0 : .upload_queue_for_deletion
1730 0 : .dirty
1731 0 : .layer_metadata
1732 0 : .drain()
1733 0 : .filter(|(_file_name, meta)| {
1734 0 : // Filter out layers that belonged to an ancestor shard. Since we are deleting the whole timeline from
1735 0 : // all shards anyway, we _could_ delete these, but
1736 0 : // - it creates a potential race if other shards are still
1737 0 : // using the layers while this shard deletes them.
1738 0 : // - it means that if we rolled back the shard split, the ancestor shards would be in a state where
1739 0 : // these timelines are present but corrupt (their index exists but some layers don't)
1740 0 : //
1741 0 : // These layers will eventually be cleaned up by the scrubber when it does physical GC.
1742 0 : meta.shard.shard_number == self.tenant_shard_id.shard_number
1743 0 : && meta.shard.shard_count == self.tenant_shard_id.shard_count
1744 0 : })
1745 0 : .map(|(file_name, meta)| {
1746 0 : remote_layer_path(
1747 0 : &self.tenant_shard_id.tenant_id,
1748 0 : &self.timeline_id,
1749 0 : meta.shard,
1750 0 : &file_name,
1751 0 : meta.generation,
1752 0 : )
1753 0 : })
1754 0 : .collect()
1755 0 : };
1756 0 :
1757 0 : let layer_deletion_count = layers.len();
1758 0 : self.deletion_queue_client
1759 0 : .push_immediate(layers)
1760 0 : .await
1761 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1762 :
1763 : // Delete the initdb.tar.zst, which is not always present, but deletion attempts of
1764 : // inexistant objects are not considered errors.
1765 0 : let initdb_path =
1766 0 : remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id);
1767 0 : self.deletion_queue_client
1768 0 : .push_immediate(vec![initdb_path])
1769 0 : .await
1770 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1771 :
1772 : // Do not delete index part yet, it is needed for possible retry. If we remove it first
1773 : // and retry will arrive to different pageserver there wont be any traces of it on remote storage
1774 0 : let timeline_storage_path = remote_timeline_path(&self.tenant_shard_id, &self.timeline_id);
1775 0 :
1776 0 : // Execute all pending deletions, so that when we proceed to do a listing below, we aren't
1777 0 : // taking the burden of listing all the layers that we already know we should delete.
1778 0 : self.flush_deletion_queue()
1779 0 : .await
1780 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1781 :
1782 0 : let cancel = shutdown_token();
1783 :
1784 0 : let remaining = download_retry(
1785 0 : || async {
1786 0 : self.storage_impl
1787 0 : .list(
1788 0 : Some(&timeline_storage_path),
1789 0 : ListingMode::NoDelimiter,
1790 0 : None,
1791 0 : &cancel,
1792 0 : )
1793 0 : .await
1794 0 : },
1795 0 : "list remaining files",
1796 0 : &cancel,
1797 0 : )
1798 0 : .await
1799 0 : .context("list files remaining files")?
1800 : .keys;
1801 :
1802 : // We will delete the current index_part object last, since it acts as a deletion
1803 : // marker via its deleted_at attribute
1804 0 : let latest_index = remaining
1805 0 : .iter()
1806 0 : .filter(|o| {
1807 0 : o.key
1808 0 : .object_name()
1809 0 : .map(|n| n.starts_with(IndexPart::FILE_NAME))
1810 0 : .unwrap_or(false)
1811 0 : })
1812 0 : .filter_map(|o| parse_remote_index_path(o.key.clone()).map(|gen| (o.key.clone(), gen)))
1813 0 : .max_by_key(|i| i.1)
1814 0 : .map(|i| i.0.clone())
1815 0 : .unwrap_or(
1816 0 : // No generation-suffixed indices, assume we are dealing with
1817 0 : // a legacy index.
1818 0 : remote_index_path(&self.tenant_shard_id, &self.timeline_id, Generation::none()),
1819 0 : );
1820 0 :
1821 0 : let remaining_layers: Vec<RemotePath> = remaining
1822 0 : .into_iter()
1823 0 : .filter_map(|o| {
1824 0 : if o.key == latest_index || o.key.object_name() == Some(INITDB_PRESERVED_PATH) {
1825 0 : None
1826 : } else {
1827 0 : Some(o.key)
1828 : }
1829 0 : })
1830 0 : .inspect(|path| {
1831 0 : if let Some(name) = path.object_name() {
1832 0 : info!(%name, "deleting a file not referenced from index_part.json");
1833 : } else {
1834 0 : warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
1835 : }
1836 0 : })
1837 0 : .collect();
1838 0 :
1839 0 : let not_referenced_count = remaining_layers.len();
1840 0 : if !remaining_layers.is_empty() {
1841 0 : self.deletion_queue_client
1842 0 : .push_immediate(remaining_layers)
1843 0 : .await
1844 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1845 0 : }
1846 :
1847 0 : fail::fail_point!("timeline-delete-before-index-delete", |_| {
1848 0 : Err(DeleteTimelineError::Other(anyhow::anyhow!(
1849 0 : "failpoint: timeline-delete-before-index-delete"
1850 0 : )))?
1851 0 : });
1852 :
1853 0 : debug!("enqueuing index part deletion");
1854 0 : self.deletion_queue_client
1855 0 : .push_immediate([latest_index].to_vec())
1856 0 : .await
1857 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1858 :
1859 : // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
1860 : // for a flush to a persistent deletion list so that we may be sure deletion will occur.
1861 0 : self.flush_deletion_queue()
1862 0 : .await
1863 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1864 :
1865 0 : fail::fail_point!("timeline-delete-after-index-delete", |_| {
1866 0 : Err(DeleteTimelineError::Other(anyhow::anyhow!(
1867 0 : "failpoint: timeline-delete-after-index-delete"
1868 0 : )))?
1869 0 : });
1870 :
1871 0 : info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
1872 :
1873 0 : Ok(())
1874 0 : }
1875 :
1876 : /// Pick next tasks from the queue, and start as many of them as possible without violating
1877 : /// the ordering constraints.
1878 : ///
1879 : /// TODO: consider limiting the number of in-progress tasks, beyond what remote_storage does.
1880 : /// This can launch an unbounded number of queued tasks. `UploadQueue::next_ready()` also has
1881 : /// worst-case quadratic cost in the number of tasks, and may struggle beyond 10,000 tasks.
1882 7968 : fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
1883 12788 : while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() {
1884 4820 : debug!("starting op: {next_op}");
1885 :
1886 : // Prepare upload.
1887 4820 : match &mut next_op {
1888 1580 : UploadOp::UploadLayer(layer, meta, mode) => {
1889 1580 : if upload_queue
1890 1580 : .recently_deleted
1891 1580 : .remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
1892 0 : {
1893 0 : *mode = Some(OpType::FlushDeletion);
1894 0 : } else {
1895 1580 : *mode = Some(OpType::MayReorder)
1896 : }
1897 : }
1898 1501 : UploadOp::UploadMetadata { .. } => {}
1899 347 : UploadOp::Delete(Delete { layers }) => {
1900 694 : for (name, meta) in layers {
1901 347 : upload_queue
1902 347 : .recently_deleted
1903 347 : .insert((name.clone(), meta.generation));
1904 347 : }
1905 : }
1906 1392 : UploadOp::Barrier(sender) => {
1907 1392 : sender.send_replace(());
1908 1392 : continue;
1909 : }
1910 0 : UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
1911 : };
1912 :
1913 : // Assign unique ID to this task
1914 3428 : upload_queue.task_counter += 1;
1915 3428 : let upload_task_id = upload_queue.task_counter;
1916 3428 :
1917 3428 : // Add it to the in-progress map
1918 3428 : let task = Arc::new(UploadTask {
1919 3428 : task_id: upload_task_id,
1920 3428 : op: next_op,
1921 3428 : coalesced_ops,
1922 3428 : retries: AtomicU32::new(0),
1923 3428 : });
1924 3428 : upload_queue
1925 3428 : .inprogress_tasks
1926 3428 : .insert(task.task_id, Arc::clone(&task));
1927 3428 :
1928 3428 : // Spawn task to perform the task
1929 3428 : let self_rc = Arc::clone(self);
1930 3428 : let tenant_shard_id = self.tenant_shard_id;
1931 3428 : let timeline_id = self.timeline_id;
1932 3428 : task_mgr::spawn(
1933 3428 : &self.runtime,
1934 3428 : TaskKind::RemoteUploadTask,
1935 3428 : self.tenant_shard_id,
1936 3428 : Some(self.timeline_id),
1937 3428 : "remote upload",
1938 3308 : async move {
1939 3308 : self_rc.perform_upload_task(task).await;
1940 3216 : Ok(())
1941 3216 : }
1942 3428 : .instrument(info_span!(parent: None, "remote_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, %upload_task_id)),
1943 : );
1944 :
1945 : // Loop back to process next task
1946 : }
1947 7968 : }
1948 :
1949 : ///
1950 : /// Perform an upload task.
1951 : ///
1952 : /// The task is in the `inprogress_tasks` list. This function will try to
1953 : /// execute it, retrying forever. On successful completion, the task is
1954 : /// removed it from the `inprogress_tasks` list, and any next task(s) in the
1955 : /// queue that were waiting by the completion are launched.
1956 : ///
1957 : /// The task can be shut down, however. That leads to stopping the whole
1958 : /// queue.
1959 : ///
1960 3308 : async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
1961 3308 : let cancel = shutdown_token();
1962 : // Loop to retry until it completes.
1963 : loop {
1964 : // If we're requested to shut down, close up shop and exit.
1965 : //
1966 : // Note: We only check for the shutdown requests between retries, so
1967 : // if a shutdown request arrives while we're busy uploading, in the
1968 : // upload::upload:*() call below, we will wait not exit until it has
1969 : // finished. We probably could cancel the upload by simply dropping
1970 : // the Future, but we're not 100% sure if the remote storage library
1971 : // is cancellation safe, so we don't dare to do that. Hopefully, the
1972 : // upload finishes or times out soon enough.
1973 3308 : if cancel.is_cancelled() {
1974 0 : info!("upload task cancelled by shutdown request");
1975 0 : self.stop();
1976 0 : return;
1977 3308 : }
1978 3308 :
1979 3308 : // Assert that we don't modify a layer that's referenced by the current index.
1980 3308 : if cfg!(debug_assertions) {
1981 3308 : let modified = match &task.op {
1982 1481 : UploadOp::UploadLayer(layer, layer_metadata, _) => {
1983 1481 : vec![(layer.layer_desc().layer_name(), layer_metadata)]
1984 : }
1985 340 : UploadOp::Delete(delete) => {
1986 340 : delete.layers.iter().map(|(n, m)| (n.clone(), m)).collect()
1987 : }
1988 : // These don't modify layers.
1989 1487 : UploadOp::UploadMetadata { .. } => Vec::new(),
1990 0 : UploadOp::Barrier(_) => Vec::new(),
1991 0 : UploadOp::Shutdown => Vec::new(),
1992 : };
1993 3308 : if let Ok(queue) = self.upload_queue.lock().unwrap().initialized_mut() {
1994 5125 : for (ref name, metadata) in modified {
1995 1821 : debug_assert!(
1996 1821 : !queue.clean.0.references(name, metadata),
1997 4 : "layer {name} modified while referenced by index",
1998 : );
1999 : }
2000 0 : }
2001 0 : }
2002 :
2003 3304 : let upload_result: anyhow::Result<()> = match &task.op {
2004 1481 : UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => {
2005 : // TODO: check if this mechanism can be removed now that can_bypass() performs
2006 : // conflict checks during scheduling.
2007 1481 : if let Some(OpType::FlushDeletion) = mode {
2008 0 : if self.config.read().unwrap().block_deletions {
2009 : // Of course, this is not efficient... but usually the queue should be empty.
2010 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
2011 0 : let mut detected = false;
2012 0 : if let Ok(queue) = queue_locked.initialized_mut() {
2013 0 : for list in queue.blocked_deletions.iter_mut() {
2014 0 : list.layers.retain(|(name, meta)| {
2015 0 : if name == &layer.layer_desc().layer_name()
2016 0 : && meta.generation == layer_metadata.generation
2017 : {
2018 0 : detected = true;
2019 0 : // remove the layer from deletion queue
2020 0 : false
2021 : } else {
2022 : // keep the layer
2023 0 : true
2024 : }
2025 0 : });
2026 0 : }
2027 0 : }
2028 0 : if detected {
2029 0 : info!(
2030 0 : "cancelled blocked deletion of layer {} at gen {:?}",
2031 0 : layer.layer_desc().layer_name(),
2032 : layer_metadata.generation
2033 : );
2034 0 : }
2035 : } else {
2036 : // TODO: we did not guarantee that upload task starts after deletion task, so there could be possibly race conditions
2037 : // that we still get the layer deleted. But this only happens if someone creates a layer immediately after it's deleted,
2038 : // which is not possible in the current system.
2039 0 : info!(
2040 0 : "waiting for deletion queue flush to complete before uploading layer {} at gen {:?}",
2041 0 : layer.layer_desc().layer_name(),
2042 : layer_metadata.generation
2043 : );
2044 : {
2045 : // We are going to flush, we can clean up the recently deleted list.
2046 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
2047 0 : if let Ok(queue) = queue_locked.initialized_mut() {
2048 0 : queue.recently_deleted.clear();
2049 0 : }
2050 : }
2051 0 : if let Err(e) = self.deletion_queue_client.flush_execute().await {
2052 0 : warn!(
2053 0 : "failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ",
2054 0 : layer.layer_desc().layer_name(),
2055 : layer_metadata.generation
2056 : );
2057 : } else {
2058 0 : info!(
2059 0 : "done flushing deletion queue before uploading layer {} at gen {:?}",
2060 0 : layer.layer_desc().layer_name(),
2061 : layer_metadata.generation
2062 : );
2063 : }
2064 : }
2065 1481 : }
2066 1481 : let local_path = layer.local_path();
2067 1481 :
2068 1481 : // We should only be uploading layers created by this `Tenant`'s lifetime, so
2069 1481 : // the metadata in the upload should always match our current generation.
2070 1481 : assert_eq!(layer_metadata.generation, self.generation);
2071 :
2072 1481 : let remote_path = remote_layer_path(
2073 1481 : &self.tenant_shard_id.tenant_id,
2074 1481 : &self.timeline_id,
2075 1481 : layer_metadata.shard,
2076 1481 : &layer.layer_desc().layer_name(),
2077 1481 : layer_metadata.generation,
2078 1481 : );
2079 1481 :
2080 1481 : upload::upload_timeline_layer(
2081 1481 : &self.storage_impl,
2082 1481 : local_path,
2083 1481 : &remote_path,
2084 1481 : layer_metadata.file_size,
2085 1481 : &self.cancel,
2086 1481 : )
2087 1481 : .measure_remote_op(
2088 1481 : RemoteOpFileKind::Layer,
2089 1481 : RemoteOpKind::Upload,
2090 1481 : Arc::clone(&self.metrics),
2091 1481 : )
2092 1481 : .await
2093 : }
2094 1487 : UploadOp::UploadMetadata { ref uploaded } => {
2095 1487 : let res = upload::upload_index_part(
2096 1487 : &self.storage_impl,
2097 1487 : &self.tenant_shard_id,
2098 1487 : &self.timeline_id,
2099 1487 : self.generation,
2100 1487 : uploaded,
2101 1487 : &self.cancel,
2102 1487 : )
2103 1487 : .measure_remote_op(
2104 1487 : RemoteOpFileKind::Index,
2105 1487 : RemoteOpKind::Upload,
2106 1487 : Arc::clone(&self.metrics),
2107 1487 : )
2108 1487 : .await;
2109 1478 : if res.is_ok() {
2110 1478 : self.update_remote_physical_size_gauge(Some(uploaded));
2111 1478 : let mention_having_future_layers = if cfg!(feature = "testing") {
2112 1478 : uploaded
2113 1478 : .layer_metadata
2114 1478 : .keys()
2115 17694 : .any(|x| x.is_in_future(uploaded.metadata.disk_consistent_lsn()))
2116 : } else {
2117 0 : false
2118 : };
2119 1478 : if mention_having_future_layers {
2120 : // find rationale near crate::tenant::timeline::init::cleanup_future_layer
2121 41 : tracing::info!(
2122 0 : disk_consistent_lsn = %uploaded.metadata.disk_consistent_lsn(),
2123 0 : "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup"
2124 : );
2125 1437 : }
2126 0 : }
2127 1478 : res
2128 : }
2129 336 : UploadOp::Delete(delete) => {
2130 336 : if self.config.read().unwrap().block_deletions {
2131 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
2132 0 : if let Ok(queue) = queue_locked.initialized_mut() {
2133 0 : queue.blocked_deletions.push(delete.clone());
2134 0 : }
2135 0 : Ok(())
2136 : } else {
2137 336 : pausable_failpoint!("before-delete-layer-pausable");
2138 333 : self.deletion_queue_client
2139 333 : .push_layers(
2140 333 : self.tenant_shard_id,
2141 333 : self.timeline_id,
2142 333 : self.generation,
2143 333 : delete.layers.clone(),
2144 333 : )
2145 333 : .await
2146 333 : .map_err(|e| anyhow::anyhow!(e))
2147 : }
2148 : }
2149 0 : unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
2150 : // unreachable. Barrier operations are handled synchronously in
2151 : // launch_queued_tasks
2152 0 : warn!("unexpected {unexpected:?} operation in perform_upload_task");
2153 0 : break;
2154 : }
2155 : };
2156 :
2157 0 : match upload_result {
2158 : Ok(()) => {
2159 3216 : break;
2160 : }
2161 0 : Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => {
2162 0 : // loop around to do the proper stopping
2163 0 : continue;
2164 : }
2165 0 : Err(e) => {
2166 0 : let retries = task.retries.fetch_add(1, Ordering::SeqCst);
2167 0 :
2168 0 : // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
2169 0 : // or other external reasons. Such issues are relatively regular, so log them
2170 0 : // at info level at first, and only WARN if the operation fails repeatedly.
2171 0 : //
2172 0 : // (See similar logic for downloads in `download::download_retry`)
2173 0 : if retries < FAILED_UPLOAD_WARN_THRESHOLD {
2174 0 : info!(
2175 0 : "failed to perform remote task {}, will retry (attempt {}): {:#}",
2176 0 : task.op, retries, e
2177 : );
2178 : } else {
2179 0 : warn!(
2180 0 : "failed to perform remote task {}, will retry (attempt {}): {:?}",
2181 0 : task.op, retries, e
2182 : );
2183 : }
2184 :
2185 : // sleep until it's time to retry, or we're cancelled
2186 0 : exponential_backoff(
2187 0 : retries,
2188 0 : DEFAULT_BASE_BACKOFF_SECONDS,
2189 0 : DEFAULT_MAX_BACKOFF_SECONDS,
2190 0 : &cancel,
2191 0 : )
2192 0 : .await;
2193 : }
2194 : }
2195 : }
2196 :
2197 3216 : let retries = task.retries.load(Ordering::SeqCst);
2198 3216 : if retries > 0 {
2199 0 : info!(
2200 0 : "remote task {} completed successfully after {} retries",
2201 0 : task.op, retries
2202 : );
2203 : } else {
2204 3216 : debug!("remote task {} completed successfully", task.op);
2205 : }
2206 :
2207 : // The task has completed successfully. Remove it from the in-progress list.
2208 3216 : let lsn_update = {
2209 3216 : let mut upload_queue_guard = self.upload_queue.lock().unwrap();
2210 3216 : let upload_queue = match upload_queue_guard.deref_mut() {
2211 0 : UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
2212 0 : UploadQueue::Stopped(_stopped) => {
2213 0 : None
2214 : },
2215 3216 : UploadQueue::Initialized(qi) => { Some(qi) }
2216 : };
2217 :
2218 3216 : let upload_queue = match upload_queue {
2219 3216 : Some(upload_queue) => upload_queue,
2220 : None => {
2221 0 : info!("another concurrent task already stopped the queue");
2222 0 : return;
2223 : }
2224 : };
2225 :
2226 3216 : upload_queue.inprogress_tasks.remove(&task.task_id);
2227 :
2228 3216 : let lsn_update = match task.op {
2229 1405 : UploadOp::UploadLayer(_, _, _) => None,
2230 1478 : UploadOp::UploadMetadata { ref uploaded } => {
2231 1478 : // the task id is reused as a monotonicity check for storing the "clean"
2232 1478 : // IndexPart.
2233 1478 : let last_updater = upload_queue.clean.1;
2234 1478 : let is_later = last_updater.is_some_and(|task_id| task_id < task.task_id);
2235 1478 : let monotone = is_later || last_updater.is_none();
2236 :
2237 1478 : assert!(monotone, "no two index uploads should be completing at the same time, prev={last_updater:?}, task.task_id={}", task.task_id);
2238 :
2239 : // not taking ownership is wasteful
2240 1478 : upload_queue.clean.0.clone_from(uploaded);
2241 1478 : upload_queue.clean.1 = Some(task.task_id);
2242 1478 :
2243 1478 : let lsn = upload_queue.clean.0.metadata.disk_consistent_lsn();
2244 1478 : self.metrics
2245 1478 : .projected_remote_consistent_lsn_gauge
2246 1478 : .set(lsn.0);
2247 1478 :
2248 1478 : if self.generation.is_none() {
2249 : // Legacy mode: skip validating generation
2250 0 : upload_queue.visible_remote_consistent_lsn.store(lsn);
2251 0 : None
2252 1478 : } else if self
2253 1478 : .config
2254 1478 : .read()
2255 1478 : .unwrap()
2256 1478 : .process_remote_consistent_lsn_updates
2257 : {
2258 1478 : Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
2259 : } else {
2260 : // Our config disables remote_consistent_lsn updates: drop it.
2261 0 : None
2262 : }
2263 : }
2264 333 : UploadOp::Delete(_) => None,
2265 0 : UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
2266 : };
2267 :
2268 : // Launch any queued tasks that were unblocked by this one.
2269 3216 : self.launch_queued_tasks(upload_queue);
2270 3216 : lsn_update
2271 : };
2272 :
2273 3216 : if let Some((lsn, slot)) = lsn_update {
2274 : // Updates to the remote_consistent_lsn we advertise to pageservers
2275 : // are all routed through the DeletionQueue, to enforce important
2276 : // data safety guarantees (see docs/rfcs/025-generation-numbers.md)
2277 1478 : self.deletion_queue_client
2278 1478 : .update_remote_consistent_lsn(
2279 1478 : self.tenant_shard_id,
2280 1478 : self.timeline_id,
2281 1478 : self.generation,
2282 1478 : lsn,
2283 1478 : slot,
2284 1478 : )
2285 1478 : .await;
2286 1738 : }
2287 :
2288 3216 : self.metric_end(&task.op);
2289 3216 : for coalesced_op in &task.coalesced_ops {
2290 2 : self.metric_end(coalesced_op);
2291 2 : }
2292 3216 : }
2293 :
2294 6854 : fn metric_impl(
2295 6854 : &self,
2296 6854 : op: &UploadOp,
2297 6854 : ) -> Option<(
2298 6854 : RemoteOpFileKind,
2299 6854 : RemoteOpKind,
2300 6854 : RemoteTimelineClientMetricsCallTrackSize,
2301 6854 : )> {
2302 : use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
2303 6854 : let res = match op {
2304 2985 : UploadOp::UploadLayer(_, m, _) => (
2305 2985 : RemoteOpFileKind::Layer,
2306 2985 : RemoteOpKind::Upload,
2307 2985 : RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
2308 2985 : ),
2309 3018 : UploadOp::UploadMetadata { .. } => (
2310 3018 : RemoteOpFileKind::Index,
2311 3018 : RemoteOpKind::Upload,
2312 3018 : DontTrackSize {
2313 3018 : reason: "metadata uploads are tiny",
2314 3018 : },
2315 3018 : ),
2316 843 : UploadOp::Delete(_delete) => (
2317 843 : RemoteOpFileKind::Layer,
2318 843 : RemoteOpKind::Delete,
2319 843 : DontTrackSize {
2320 843 : reason: "should we track deletes? positive or negative sign?",
2321 843 : },
2322 843 : ),
2323 : UploadOp::Barrier(..) | UploadOp::Shutdown => {
2324 : // we do not account these
2325 8 : return None;
2326 : }
2327 : };
2328 6846 : Some(res)
2329 6854 : }
2330 :
2331 3628 : fn metric_begin(&self, op: &UploadOp) {
2332 3628 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
2333 3628 : Some(x) => x,
2334 0 : None => return,
2335 : };
2336 3628 : let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
2337 3628 : guard.will_decrement_manually(); // in metric_end(), see right below
2338 3628 : }
2339 :
2340 3226 : fn metric_end(&self, op: &UploadOp) {
2341 3226 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
2342 3218 : Some(x) => x,
2343 8 : None => return,
2344 : };
2345 3218 : self.metrics.call_end(&file_kind, &op_kind, track_bytes);
2346 3226 : }
2347 :
2348 : /// Close the upload queue for new operations and cancel queued operations.
2349 : ///
2350 : /// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
2351 : ///
2352 : /// In-progress operations will still be running after this function returns.
2353 : /// Use `task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), Some(self.tenant_shard_id), Some(timeline_id))`
2354 : /// to wait for them to complete, after calling this function.
2355 18 : pub(crate) fn stop(&self) {
2356 18 : // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
2357 18 : // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
2358 18 : // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
2359 18 : let mut guard = self.upload_queue.lock().unwrap();
2360 18 : self.stop_impl(&mut guard);
2361 18 : }
2362 :
2363 18 : fn stop_impl(&self, guard: &mut std::sync::MutexGuard<UploadQueue>) {
2364 18 : match &mut **guard {
2365 : UploadQueue::Uninitialized => {
2366 0 : info!("UploadQueue is in state Uninitialized, nothing to do");
2367 0 : **guard = UploadQueue::Stopped(UploadQueueStopped::Uninitialized);
2368 : }
2369 : UploadQueue::Stopped(_) => {
2370 : // nothing to do
2371 8 : info!("another concurrent task already shut down the queue");
2372 : }
2373 10 : UploadQueue::Initialized(initialized) => {
2374 10 : info!("shutting down upload queue");
2375 :
2376 : // Replace the queue with the Stopped state, taking ownership of the old
2377 : // Initialized queue. We will do some checks on it, and then drop it.
2378 10 : let qi = {
2379 : // Here we preserve working version of the upload queue for possible use during deletions.
2380 : // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
2381 : // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
2382 : // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
2383 10 : let upload_queue_for_deletion = UploadQueueInitialized {
2384 10 : inprogress_limit: initialized.inprogress_limit,
2385 10 : task_counter: 0,
2386 10 : dirty: initialized.dirty.clone(),
2387 10 : clean: initialized.clean.clone(),
2388 10 : latest_files_changes_since_metadata_upload_scheduled: 0,
2389 10 : visible_remote_consistent_lsn: initialized
2390 10 : .visible_remote_consistent_lsn
2391 10 : .clone(),
2392 10 : inprogress_tasks: HashMap::default(),
2393 10 : queued_operations: VecDeque::default(),
2394 10 : #[cfg(feature = "testing")]
2395 10 : dangling_files: HashMap::default(),
2396 10 : blocked_deletions: Vec::new(),
2397 10 : shutting_down: false,
2398 10 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
2399 10 : recently_deleted: HashSet::new(),
2400 10 : };
2401 10 :
2402 10 : let upload_queue = std::mem::replace(
2403 10 : &mut **guard,
2404 10 : UploadQueue::Stopped(UploadQueueStopped::Deletable(
2405 10 : UploadQueueStoppedDeletable {
2406 10 : upload_queue_for_deletion,
2407 10 : deleted_at: SetDeletedFlagProgress::NotRunning,
2408 10 : },
2409 10 : )),
2410 10 : );
2411 10 : if let UploadQueue::Initialized(qi) = upload_queue {
2412 10 : qi
2413 : } else {
2414 0 : unreachable!("we checked in the match above that it is Initialized");
2415 : }
2416 : };
2417 :
2418 : // We don't need to do anything here for in-progress tasks. They will finish
2419 : // on their own, decrement the unfinished-task counter themselves, and observe
2420 : // that the queue is Stopped.
2421 10 : drop(qi.inprogress_tasks);
2422 :
2423 : // Tear down queued ops
2424 10 : for op in qi.queued_operations.into_iter() {
2425 8 : self.metric_end(&op);
2426 8 : // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
2427 8 : // which is exactly what we want to happen.
2428 8 : drop(op);
2429 8 : }
2430 : }
2431 : }
2432 18 : }
2433 :
2434 : /// Returns an accessor which will hold the UploadQueue mutex for accessing the upload queue
2435 : /// externally to RemoteTimelineClient.
2436 0 : pub(crate) fn initialized_upload_queue(
2437 0 : &self,
2438 0 : ) -> Result<UploadQueueAccessor<'_>, NotInitialized> {
2439 0 : let mut inner = self.upload_queue.lock().unwrap();
2440 0 : inner.initialized_mut()?;
2441 0 : Ok(UploadQueueAccessor { inner })
2442 0 : }
2443 :
2444 8 : pub(crate) fn no_pending_work(&self) -> bool {
2445 8 : let inner = self.upload_queue.lock().unwrap();
2446 8 : match &*inner {
2447 : UploadQueue::Uninitialized
2448 0 : | UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => true,
2449 8 : UploadQueue::Stopped(UploadQueueStopped::Deletable(x)) => {
2450 8 : x.upload_queue_for_deletion.no_pending_work()
2451 : }
2452 0 : UploadQueue::Initialized(x) => x.no_pending_work(),
2453 : }
2454 8 : }
2455 :
2456 : /// 'foreign' in the sense that it does not belong to this tenant shard. This method
2457 : /// is used during GC for other shards to get the index of shard zero.
2458 0 : pub(crate) async fn download_foreign_index(
2459 0 : &self,
2460 0 : shard_number: ShardNumber,
2461 0 : cancel: &CancellationToken,
2462 0 : ) -> Result<(IndexPart, Generation, std::time::SystemTime), DownloadError> {
2463 0 : let foreign_shard_id = TenantShardId {
2464 0 : shard_number,
2465 0 : shard_count: self.tenant_shard_id.shard_count,
2466 0 : tenant_id: self.tenant_shard_id.tenant_id,
2467 0 : };
2468 0 : download_index_part(
2469 0 : &self.storage_impl,
2470 0 : &foreign_shard_id,
2471 0 : &self.timeline_id,
2472 0 : Generation::MAX,
2473 0 : cancel,
2474 0 : )
2475 0 : .await
2476 0 : }
2477 : }
2478 :
2479 : pub(crate) struct UploadQueueAccessor<'a> {
2480 : inner: std::sync::MutexGuard<'a, UploadQueue>,
2481 : }
2482 :
2483 : impl UploadQueueAccessor<'_> {
2484 0 : pub(crate) fn latest_uploaded_index_part(&self) -> &IndexPart {
2485 0 : match &*self.inner {
2486 0 : UploadQueue::Initialized(x) => &x.clean.0,
2487 : UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
2488 0 : unreachable!("checked before constructing")
2489 : }
2490 : }
2491 0 : }
2492 : }
2493 :
2494 0 : pub fn remote_tenant_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2495 0 : let path = format!("tenants/{tenant_shard_id}");
2496 0 : RemotePath::from_string(&path).expect("Failed to construct path")
2497 0 : }
2498 :
2499 662 : pub fn remote_tenant_manifest_path(
2500 662 : tenant_shard_id: &TenantShardId,
2501 662 : generation: Generation,
2502 662 : ) -> RemotePath {
2503 662 : let path = format!(
2504 662 : "tenants/{tenant_shard_id}/tenant-manifest{}.json",
2505 662 : generation.get_suffix()
2506 662 : );
2507 662 : RemotePath::from_string(&path).expect("Failed to construct path")
2508 662 : }
2509 :
2510 : /// Prefix to all generations' manifest objects in a tenant shard
2511 220 : pub fn remote_tenant_manifest_prefix(tenant_shard_id: &TenantShardId) -> RemotePath {
2512 220 : let path = format!("tenants/{tenant_shard_id}/tenant-manifest",);
2513 220 : RemotePath::from_string(&path).expect("Failed to construct path")
2514 220 : }
2515 :
2516 250 : pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2517 250 : let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
2518 250 : RemotePath::from_string(&path).expect("Failed to construct path")
2519 250 : }
2520 :
2521 0 : fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath {
2522 0 : let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
2523 0 : RemotePath::from_string(&path).expect("Failed to construct path")
2524 0 : }
2525 :
2526 30 : pub fn remote_timeline_path(
2527 30 : tenant_shard_id: &TenantShardId,
2528 30 : timeline_id: &TimelineId,
2529 30 : ) -> RemotePath {
2530 30 : remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
2531 30 : }
2532 :
2533 : /// Obtains the path of the given Layer in the remote
2534 : ///
2535 : /// Note that the shard component of a remote layer path is _not_ always the same
2536 : /// as in the TenantShardId of the caller: tenants may reference layers from a different
2537 : /// ShardIndex. Use the ShardIndex from the layer's metadata.
2538 1773 : pub fn remote_layer_path(
2539 1773 : tenant_id: &TenantId,
2540 1773 : timeline_id: &TimelineId,
2541 1773 : shard: ShardIndex,
2542 1773 : layer_file_name: &LayerName,
2543 1773 : generation: Generation,
2544 1773 : ) -> RemotePath {
2545 1773 : // Generation-aware key format
2546 1773 : let path = format!(
2547 1773 : "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
2548 1773 : shard.get_suffix(),
2549 1773 : layer_file_name,
2550 1773 : generation.get_suffix()
2551 1773 : );
2552 1773 :
2553 1773 : RemotePath::from_string(&path).expect("Failed to construct path")
2554 1773 : }
2555 :
2556 : /// Returns true if a and b have the same layer path within a tenant/timeline. This is essentially
2557 : /// remote_layer_path(a) == remote_layer_path(b) without the string allocations.
2558 : ///
2559 : /// TODO: there should be a variant of LayerName for the physical path that contains information
2560 : /// about the shard and generation, such that this could be replaced by a simple comparison.
2561 1175277 : pub fn is_same_remote_layer_path(
2562 1175277 : aname: &LayerName,
2563 1175277 : ameta: &LayerFileMetadata,
2564 1175277 : bname: &LayerName,
2565 1175277 : bmeta: &LayerFileMetadata,
2566 1175277 : ) -> bool {
2567 1175277 : // NB: don't assert remote_layer_path(a) == remote_layer_path(b); too expensive even for debug.
2568 1175277 : aname == bname && ameta.shard == bmeta.shard && ameta.generation == bmeta.generation
2569 1175277 : }
2570 :
2571 4 : pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
2572 4 : RemotePath::from_string(&format!(
2573 4 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
2574 4 : ))
2575 4 : .expect("Failed to construct path")
2576 4 : }
2577 :
2578 2 : pub fn remote_initdb_preserved_archive_path(
2579 2 : tenant_id: &TenantId,
2580 2 : timeline_id: &TimelineId,
2581 2 : ) -> RemotePath {
2582 2 : RemotePath::from_string(&format!(
2583 2 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PRESERVED_PATH}"
2584 2 : ))
2585 2 : .expect("Failed to construct path")
2586 2 : }
2587 :
2588 1552 : pub fn remote_index_path(
2589 1552 : tenant_shard_id: &TenantShardId,
2590 1552 : timeline_id: &TimelineId,
2591 1552 : generation: Generation,
2592 1552 : ) -> RemotePath {
2593 1552 : RemotePath::from_string(&format!(
2594 1552 : "tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
2595 1552 : IndexPart::FILE_NAME,
2596 1552 : generation.get_suffix()
2597 1552 : ))
2598 1552 : .expect("Failed to construct path")
2599 1552 : }
2600 :
2601 0 : pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2602 0 : RemotePath::from_string(&format!(
2603 0 : "tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
2604 0 : ))
2605 0 : .expect("Failed to construct path")
2606 0 : }
2607 :
2608 : /// Given the key of an index, parse out the generation part of the name
2609 18 : pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
2610 18 : let file_name = match path.get_path().file_name() {
2611 18 : Some(f) => f,
2612 : None => {
2613 : // Unexpected: we should be seeing index_part.json paths only
2614 0 : tracing::warn!("Malformed index key {}", path);
2615 0 : return None;
2616 : }
2617 : };
2618 :
2619 18 : match file_name.split_once('-') {
2620 12 : Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
2621 6 : None => None,
2622 : }
2623 18 : }
2624 :
2625 : /// Given the key of a tenant manifest, parse out the generation number
2626 0 : pub fn parse_remote_tenant_manifest_path(path: RemotePath) -> Option<Generation> {
2627 : static RE: OnceLock<Regex> = OnceLock::new();
2628 0 : let re = RE.get_or_init(|| Regex::new(r".*tenant-manifest-([0-9a-f]{8}).json").unwrap());
2629 0 : re.captures(path.get_path().as_str())
2630 0 : .and_then(|c| c.get(1))
2631 0 : .and_then(|m| Generation::parse_suffix(m.as_str()))
2632 0 : }
2633 :
2634 : #[cfg(test)]
2635 : mod tests {
2636 : use super::*;
2637 : use crate::{
2638 : context::RequestContext,
2639 : tenant::{
2640 : config::AttachmentMode,
2641 : harness::{TenantHarness, TIMELINE_ID},
2642 : storage_layer::layer::local_layer_path,
2643 : Tenant, Timeline,
2644 : },
2645 : DEFAULT_PG_VERSION,
2646 : };
2647 :
2648 : use std::collections::HashSet;
2649 :
2650 8 : pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
2651 8 : format!("contents for {name}").into()
2652 8 : }
2653 :
2654 2 : pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
2655 2 : let metadata = TimelineMetadata::new(
2656 2 : disk_consistent_lsn,
2657 2 : None,
2658 2 : None,
2659 2 : Lsn(0),
2660 2 : Lsn(0),
2661 2 : Lsn(0),
2662 2 : // Any version will do
2663 2 : // but it should be consistent with the one in the tests
2664 2 : crate::DEFAULT_PG_VERSION,
2665 2 : );
2666 2 :
2667 2 : // go through serialize + deserialize to fix the header, including checksum
2668 2 : TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
2669 2 : }
2670 :
2671 2 : fn assert_file_list(a: &HashSet<LayerName>, b: &[&str]) {
2672 6 : let mut avec: Vec<String> = a.iter().map(|x| x.to_string()).collect();
2673 2 : avec.sort();
2674 2 :
2675 2 : let mut bvec = b.to_vec();
2676 2 : bvec.sort_unstable();
2677 2 :
2678 2 : assert_eq!(avec, bvec);
2679 2 : }
2680 :
2681 4 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
2682 4 : let mut expected: Vec<String> = expected
2683 4 : .iter()
2684 16 : .map(|x| format!("{}{}", x, generation.get_suffix()))
2685 4 : .collect();
2686 4 : expected.sort();
2687 4 :
2688 4 : let mut found: Vec<String> = Vec::new();
2689 16 : for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
2690 16 : let entry_name = entry.file_name();
2691 16 : let fname = entry_name.to_str().unwrap();
2692 16 : found.push(String::from(fname));
2693 16 : }
2694 4 : found.sort();
2695 4 :
2696 4 : assert_eq!(found, expected);
2697 4 : }
2698 :
2699 : struct TestSetup {
2700 : harness: TenantHarness,
2701 : tenant: Arc<Tenant>,
2702 : timeline: Arc<Timeline>,
2703 : tenant_ctx: RequestContext,
2704 : }
2705 :
2706 : impl TestSetup {
2707 8 : async fn new(test_name: &str) -> anyhow::Result<Self> {
2708 8 : let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
2709 8 : let harness = TenantHarness::create(test_name).await?;
2710 8 : let (tenant, ctx) = harness.load().await;
2711 :
2712 8 : let timeline = tenant
2713 8 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2714 8 : .await?;
2715 :
2716 8 : Ok(Self {
2717 8 : harness,
2718 8 : tenant,
2719 8 : timeline,
2720 8 : tenant_ctx: ctx,
2721 8 : })
2722 8 : }
2723 :
2724 : /// Construct a RemoteTimelineClient in an arbitrary generation
2725 10 : fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
2726 10 : let location_conf = AttachedLocationConfig {
2727 10 : generation,
2728 10 : attach_mode: AttachmentMode::Single,
2729 10 : };
2730 10 : Arc::new(RemoteTimelineClient {
2731 10 : conf: self.harness.conf,
2732 10 : runtime: tokio::runtime::Handle::current(),
2733 10 : tenant_shard_id: self.harness.tenant_shard_id,
2734 10 : timeline_id: TIMELINE_ID,
2735 10 : generation,
2736 10 : storage_impl: self.harness.remote_storage.clone(),
2737 10 : deletion_queue_client: self.harness.deletion_queue.new_client(),
2738 10 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
2739 10 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
2740 10 : &self.harness.tenant_shard_id,
2741 10 : &TIMELINE_ID,
2742 10 : )),
2743 10 : config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
2744 10 : cancel: CancellationToken::new(),
2745 10 : })
2746 10 : }
2747 :
2748 : /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
2749 : /// and timeline_id are present.
2750 6 : fn span(&self) -> tracing::Span {
2751 6 : tracing::info_span!(
2752 : "test",
2753 : tenant_id = %self.harness.tenant_shard_id.tenant_id,
2754 0 : shard_id = %self.harness.tenant_shard_id.shard_slug(),
2755 : timeline_id = %TIMELINE_ID
2756 : )
2757 6 : }
2758 : }
2759 :
2760 : // Test scheduling
2761 : #[tokio::test]
2762 2 : async fn upload_scheduling() {
2763 2 : // Test outline:
2764 2 : //
2765 2 : // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
2766 2 : // Schedule upload of index. Check that it is queued
2767 2 : // let the layer file uploads finish. Check that the index-upload is now started
2768 2 : // let the index-upload finish.
2769 2 : //
2770 2 : // Download back the index.json. Check that the list of files is correct
2771 2 : //
2772 2 : // Schedule upload. Schedule deletion. Check that the deletion is queued
2773 2 : // let upload finish. Check that deletion is now started
2774 2 : // Schedule another deletion. Check that it's launched immediately.
2775 2 : // Schedule index upload. Check that it's queued
2776 2 :
2777 2 : let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
2778 2 : let span = test_setup.span();
2779 2 : let _guard = span.enter();
2780 2 :
2781 2 : let TestSetup {
2782 2 : harness,
2783 2 : tenant: _tenant,
2784 2 : timeline,
2785 2 : tenant_ctx: _tenant_ctx,
2786 2 : } = test_setup;
2787 2 :
2788 2 : let client = &timeline.remote_client;
2789 2 :
2790 2 : // Download back the index.json, and check that the list of files is correct
2791 2 : let initial_index_part = match client
2792 2 : .download_index_file(&CancellationToken::new())
2793 2 : .await
2794 2 : .unwrap()
2795 2 : {
2796 2 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2797 2 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2798 2 : };
2799 2 : let initial_layers = initial_index_part
2800 2 : .layer_metadata
2801 2 : .keys()
2802 2 : .map(|f| f.to_owned())
2803 2 : .collect::<HashSet<LayerName>>();
2804 2 : let initial_layer = {
2805 2 : assert!(initial_layers.len() == 1);
2806 2 : initial_layers.into_iter().next().unwrap()
2807 2 : };
2808 2 :
2809 2 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
2810 2 :
2811 2 : println!("workdir: {}", harness.conf.workdir);
2812 2 :
2813 2 : let remote_timeline_dir = harness
2814 2 : .remote_fs_dir
2815 2 : .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
2816 2 : println!("remote_timeline_dir: {remote_timeline_dir}");
2817 2 :
2818 2 : let generation = harness.generation;
2819 2 : let shard = harness.shard;
2820 2 :
2821 2 : // Create a couple of dummy files, schedule upload for them
2822 2 :
2823 2 : let layers = [
2824 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
2825 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
2826 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
2827 2 : ]
2828 2 : .into_iter()
2829 6 : .map(|(name, contents): (LayerName, Vec<u8>)| {
2830 6 :
2831 6 : let local_path = local_layer_path(
2832 6 : harness.conf,
2833 6 : &timeline.tenant_shard_id,
2834 6 : &timeline.timeline_id,
2835 6 : &name,
2836 6 : &generation,
2837 6 : );
2838 6 : std::fs::write(&local_path, &contents).unwrap();
2839 6 :
2840 6 : Layer::for_resident(
2841 6 : harness.conf,
2842 6 : &timeline,
2843 6 : local_path,
2844 6 : name,
2845 6 : LayerFileMetadata::new(contents.len() as u64, generation, shard),
2846 6 : )
2847 6 : }).collect::<Vec<_>>();
2848 2 :
2849 2 : client
2850 2 : .schedule_layer_file_upload(layers[0].clone())
2851 2 : .unwrap();
2852 2 : client
2853 2 : .schedule_layer_file_upload(layers[1].clone())
2854 2 : .unwrap();
2855 2 :
2856 2 : // Check that they are started immediately, not queued
2857 2 : //
2858 2 : // this works because we running within block_on, so any futures are now queued up until
2859 2 : // our next await point.
2860 2 : {
2861 2 : let mut guard = client.upload_queue.lock().unwrap();
2862 2 : let upload_queue = guard.initialized_mut().unwrap();
2863 2 : assert!(upload_queue.queued_operations.is_empty());
2864 2 : assert_eq!(upload_queue.inprogress_tasks.len(), 2);
2865 2 : assert_eq!(upload_queue.num_inprogress_layer_uploads(), 2);
2866 2 :
2867 2 : // also check that `latest_file_changes` was updated
2868 2 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
2869 2 : }
2870 2 :
2871 2 : // Schedule upload of index. Check that it is queued
2872 2 : let metadata = dummy_metadata(Lsn(0x20));
2873 2 : client
2874 2 : .schedule_index_upload_for_full_metadata_update(&metadata)
2875 2 : .unwrap();
2876 2 : {
2877 2 : let mut guard = client.upload_queue.lock().unwrap();
2878 2 : let upload_queue = guard.initialized_mut().unwrap();
2879 2 : assert!(upload_queue.queued_operations.len() == 1);
2880 2 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
2881 2 : }
2882 2 :
2883 2 : // Wait for the uploads to finish
2884 2 : client.wait_completion().await.unwrap();
2885 2 : {
2886 2 : let mut guard = client.upload_queue.lock().unwrap();
2887 2 : let upload_queue = guard.initialized_mut().unwrap();
2888 2 :
2889 2 : assert!(upload_queue.queued_operations.is_empty());
2890 2 : assert!(upload_queue.inprogress_tasks.is_empty());
2891 2 : }
2892 2 :
2893 2 : // Download back the index.json, and check that the list of files is correct
2894 2 : let index_part = match client
2895 2 : .download_index_file(&CancellationToken::new())
2896 2 : .await
2897 2 : .unwrap()
2898 2 : {
2899 2 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2900 2 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2901 2 : };
2902 2 :
2903 2 : assert_file_list(
2904 2 : &index_part
2905 2 : .layer_metadata
2906 2 : .keys()
2907 6 : .map(|f| f.to_owned())
2908 2 : .collect(),
2909 2 : &[
2910 2 : &initial_layer.to_string(),
2911 2 : &layers[0].layer_desc().layer_name().to_string(),
2912 2 : &layers[1].layer_desc().layer_name().to_string(),
2913 2 : ],
2914 2 : );
2915 2 : assert_eq!(index_part.metadata, metadata);
2916 2 :
2917 2 : // Schedule upload and then a deletion. Check that the deletion is queued
2918 2 : client
2919 2 : .schedule_layer_file_upload(layers[2].clone())
2920 2 : .unwrap();
2921 2 :
2922 2 : // this is no longer consistent with how deletion works with Layer::drop, but in this test
2923 2 : // keep using schedule_layer_file_deletion because we don't have a way to wait for the
2924 2 : // spawn_blocking started by the drop.
2925 2 : client
2926 2 : .schedule_layer_file_deletion(&[layers[0].layer_desc().layer_name()])
2927 2 : .unwrap();
2928 2 : {
2929 2 : let mut guard = client.upload_queue.lock().unwrap();
2930 2 : let upload_queue = guard.initialized_mut().unwrap();
2931 2 :
2932 2 : // Deletion schedules upload of the index file, and the file deletion itself
2933 2 : assert_eq!(upload_queue.queued_operations.len(), 2);
2934 2 : assert_eq!(upload_queue.inprogress_tasks.len(), 1);
2935 2 : assert_eq!(upload_queue.num_inprogress_layer_uploads(), 1);
2936 2 : assert_eq!(upload_queue.num_inprogress_deletions(), 0);
2937 2 : assert_eq!(
2938 2 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
2939 2 : 0
2940 2 : );
2941 2 : }
2942 2 : assert_remote_files(
2943 2 : &[
2944 2 : &initial_layer.to_string(),
2945 2 : &layers[0].layer_desc().layer_name().to_string(),
2946 2 : &layers[1].layer_desc().layer_name().to_string(),
2947 2 : "index_part.json",
2948 2 : ],
2949 2 : &remote_timeline_dir,
2950 2 : generation,
2951 2 : );
2952 2 :
2953 2 : // Finish them
2954 2 : client.wait_completion().await.unwrap();
2955 2 : harness.deletion_queue.pump().await;
2956 2 :
2957 2 : assert_remote_files(
2958 2 : &[
2959 2 : &initial_layer.to_string(),
2960 2 : &layers[1].layer_desc().layer_name().to_string(),
2961 2 : &layers[2].layer_desc().layer_name().to_string(),
2962 2 : "index_part.json",
2963 2 : ],
2964 2 : &remote_timeline_dir,
2965 2 : generation,
2966 2 : );
2967 2 : }
2968 :
2969 : #[tokio::test]
2970 2 : async fn bytes_unfinished_gauge_for_layer_file_uploads() {
2971 2 : // Setup
2972 2 :
2973 2 : let TestSetup {
2974 2 : harness,
2975 2 : tenant: _tenant,
2976 2 : timeline,
2977 2 : ..
2978 2 : } = TestSetup::new("metrics").await.unwrap();
2979 2 : let client = &timeline.remote_client;
2980 2 :
2981 2 : let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
2982 2 : let local_path = local_layer_path(
2983 2 : harness.conf,
2984 2 : &timeline.tenant_shard_id,
2985 2 : &timeline.timeline_id,
2986 2 : &layer_file_name_1,
2987 2 : &harness.generation,
2988 2 : );
2989 2 : let content_1 = dummy_contents("foo");
2990 2 : std::fs::write(&local_path, &content_1).unwrap();
2991 2 :
2992 2 : let layer_file_1 = Layer::for_resident(
2993 2 : harness.conf,
2994 2 : &timeline,
2995 2 : local_path,
2996 2 : layer_file_name_1.clone(),
2997 2 : LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
2998 2 : );
2999 2 :
3000 2 : #[derive(Debug, PartialEq, Clone, Copy)]
3001 2 : struct BytesStartedFinished {
3002 2 : started: Option<usize>,
3003 2 : finished: Option<usize>,
3004 2 : }
3005 2 : impl std::ops::Add for BytesStartedFinished {
3006 2 : type Output = Self;
3007 4 : fn add(self, rhs: Self) -> Self::Output {
3008 4 : Self {
3009 4 : started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
3010 4 : finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
3011 4 : }
3012 4 : }
3013 2 : }
3014 6 : let get_bytes_started_stopped = || {
3015 6 : let started = client
3016 6 : .metrics
3017 6 : .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
3018 6 : .map(|v| v.try_into().unwrap());
3019 6 : let stopped = client
3020 6 : .metrics
3021 6 : .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
3022 6 : .map(|v| v.try_into().unwrap());
3023 6 : BytesStartedFinished {
3024 6 : started,
3025 6 : finished: stopped,
3026 6 : }
3027 6 : };
3028 2 :
3029 2 : // Test
3030 2 : tracing::info!("now doing actual test");
3031 2 :
3032 2 : let actual_a = get_bytes_started_stopped();
3033 2 :
3034 2 : client
3035 2 : .schedule_layer_file_upload(layer_file_1.clone())
3036 2 : .unwrap();
3037 2 :
3038 2 : let actual_b = get_bytes_started_stopped();
3039 2 :
3040 2 : client.wait_completion().await.unwrap();
3041 2 :
3042 2 : let actual_c = get_bytes_started_stopped();
3043 2 :
3044 2 : // Validate
3045 2 :
3046 2 : let expected_b = actual_a
3047 2 : + BytesStartedFinished {
3048 2 : started: Some(content_1.len()),
3049 2 : // assert that the _finished metric is created eagerly so that subtractions work on first sample
3050 2 : finished: Some(0),
3051 2 : };
3052 2 : assert_eq!(actual_b, expected_b);
3053 2 :
3054 2 : let expected_c = actual_a
3055 2 : + BytesStartedFinished {
3056 2 : started: Some(content_1.len()),
3057 2 : finished: Some(content_1.len()),
3058 2 : };
3059 2 : assert_eq!(actual_c, expected_c);
3060 2 : }
3061 :
3062 12 : async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
3063 12 : // An empty IndexPart, just sufficient to ensure deserialization will succeed
3064 12 : let example_index_part = IndexPart::example();
3065 12 :
3066 12 : let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
3067 12 :
3068 12 : let index_path = test_state.harness.remote_fs_dir.join(
3069 12 : remote_index_path(
3070 12 : &test_state.harness.tenant_shard_id,
3071 12 : &TIMELINE_ID,
3072 12 : generation,
3073 12 : )
3074 12 : .get_path(),
3075 12 : );
3076 12 :
3077 12 : std::fs::create_dir_all(index_path.parent().unwrap())
3078 12 : .expect("creating test dir should work");
3079 12 :
3080 12 : eprintln!("Writing {index_path}");
3081 12 : std::fs::write(&index_path, index_part_bytes).unwrap();
3082 12 : example_index_part
3083 12 : }
3084 :
3085 : /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
3086 : /// index, the IndexPart returned is equal to `expected`
3087 10 : async fn assert_got_index_part(
3088 10 : test_state: &TestSetup,
3089 10 : get_generation: Generation,
3090 10 : expected: &IndexPart,
3091 10 : ) {
3092 10 : let client = test_state.build_client(get_generation);
3093 :
3094 10 : let download_r = client
3095 10 : .download_index_file(&CancellationToken::new())
3096 10 : .await
3097 10 : .expect("download should always succeed");
3098 10 : assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
3099 10 : match download_r {
3100 10 : MaybeDeletedIndexPart::IndexPart(index_part) => {
3101 10 : assert_eq!(&index_part, expected);
3102 : }
3103 0 : MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
3104 : }
3105 10 : }
3106 :
3107 : #[tokio::test]
3108 2 : async fn index_part_download_simple() -> anyhow::Result<()> {
3109 2 : let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
3110 2 : let span = test_state.span();
3111 2 : let _guard = span.enter();
3112 2 :
3113 2 : // Simple case: we are in generation N, load the index from generation N - 1
3114 2 : let generation_n = 5;
3115 2 : let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
3116 2 :
3117 2 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
3118 2 :
3119 2 : Ok(())
3120 2 : }
3121 :
3122 : #[tokio::test]
3123 2 : async fn index_part_download_ordering() -> anyhow::Result<()> {
3124 2 : let test_state = TestSetup::new("index_part_download_ordering")
3125 2 : .await
3126 2 : .unwrap();
3127 2 :
3128 2 : let span = test_state.span();
3129 2 : let _guard = span.enter();
3130 2 :
3131 2 : // A generation-less IndexPart exists in the bucket, we should find it
3132 2 : let generation_n = 5;
3133 2 : let injected_none = inject_index_part(&test_state, Generation::none()).await;
3134 2 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
3135 2 :
3136 2 : // If a more recent-than-none generation exists, we should prefer to load that
3137 2 : let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
3138 2 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
3139 2 :
3140 2 : // If a more-recent-than-me generation exists, we should ignore it.
3141 2 : let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
3142 2 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
3143 2 :
3144 2 : // If a directly previous generation exists, _and_ an index exists in my own
3145 2 : // generation, I should prefer my own generation.
3146 2 : let _injected_prev =
3147 2 : inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
3148 2 : let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
3149 2 : assert_got_index_part(
3150 2 : &test_state,
3151 2 : Generation::new(generation_n),
3152 2 : &injected_current,
3153 2 : )
3154 2 : .await;
3155 2 :
3156 2 : Ok(())
3157 2 : }
3158 : }
|