Line data Source code
1 : //! This module manages synchronizing local FS with remote storage.
2 : //!
3 : //! # Overview
4 : //!
5 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
6 : //! It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
7 : //! when it's safe to do so.
8 : //!
9 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
10 : //!
11 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
12 : //!
13 : //! # APIs & How To Use Them
14 : //!
15 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
16 : //! unless the pageserver is configured without remote storage.
17 : //!
18 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
19 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
20 : //! timeline.
21 : //! However, the client does not become ready for use until we've initialized its upload queue:
22 : //!
23 : //! - For timelines that already have some state on the remote storage, we use
24 : //! [`RemoteTimelineClient::init_upload_queue`] .
25 : //! - For newly created timelines, we use
26 : //! [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
27 : //!
28 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
29 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
30 : //!
31 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
32 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
33 : //!
34 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`] when we've created a new layer file.
35 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
36 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
37 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
38 : //!
39 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
40 : //!
41 : //! There are also APIs for downloading files.
42 : //! These are not part of the aforementioned queuing and will not be discussed
43 : //! further here, except in the section covering tenant attach.
44 : //!
45 : //! # Remote Storage Structure & [`IndexPart`] Index File
46 : //!
47 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
48 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
49 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
50 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
51 : //! files for a given timeline.
52 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
53 : //!
54 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
55 : //!
56 : //! # Consistency
57 : //!
58 : //! To have a consistent remote structure, it's important that uploads and
59 : //! deletions are performed in the right order. For example, the index file
60 : //! contains a list of layer files, so it must not be uploaded until all the
61 : //! layer files that are in its list have been successfully uploaded.
62 : //!
63 : //! The contract between client and its user is that the user is responsible of
64 : //! scheduling operations in an order that keeps the remote consistent as
65 : //! described above.
66 : //! From the user's perspective, the operations are executed sequentially.
67 : //! Internally, the client knows which operations can be performed in parallel,
68 : //! and which operations act like a "barrier" that require preceding operations
69 : //! to finish. The calling code just needs to call the schedule-functions in the
70 : //! correct order, and the client will parallelize the operations in a way that
71 : //! is safe.
72 : //!
73 : //! The caller should be careful with deletion, though. They should not delete
74 : //! local files that have been scheduled for upload but not yet finished uploading.
75 : //! Otherwise the upload will fail. To wait for an upload to finish, use
76 : //! the 'wait_completion' function (more on that later.)
77 : //!
78 : //! All of this relies on the following invariants:
79 : //!
80 : //! - We rely on read-after write consistency in the remote storage.
81 : //! - Layer files are immutable
82 : //!
83 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
84 : //! storage. Different tenants can be attached to different pageservers, but if the
85 : //! same tenant is attached to two pageservers at the same time, they will overwrite
86 : //! each other's index file updates, and confusion will ensue. There's no interlock or
87 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
88 : //! that that doesn't happen.
89 : //!
90 : //! ## Implementation Note
91 : //!
92 : //! The *actual* remote state lags behind the *desired* remote state while
93 : //! there are in-flight operations.
94 : //! We keep track of the desired remote state in [`UploadQueueInitialized::dirty`].
95 : //! It is initialized based on the [`IndexPart`] that was passed during init
96 : //! and updated with every `schedule_*` function call.
97 : //! All this is necessary necessary to compute the future [`IndexPart`]s
98 : //! when scheduling an operation while other operations that also affect the
99 : //! remote [`IndexPart`] are in flight.
100 : //!
101 : //! # Retries & Error Handling
102 : //!
103 : //! The client retries operations indefinitely, using exponential back-off.
104 : //! There is no way to force a retry, i.e., interrupt the back-off.
105 : //! This could be built easily.
106 : //!
107 : //! # Cancellation
108 : //!
109 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
110 : //! the client's tenant and timeline.
111 : //! Dropping the client will drop queued operations but not executing operations.
112 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
113 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
114 : //!
115 : //! # Completion
116 : //!
117 : //! Once an operation has completed, we update [`UploadQueueInitialized::clean`] immediately,
118 : //! and submit a request through the DeletionQueue to update
119 : //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
120 : //! validated that our generation is not stale. It is this visible value
121 : //! that is advertized to safekeepers as a signal that that they can
122 : //! delete the WAL up to that LSN.
123 : //!
124 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
125 : //! for all pending operations to complete. It does not prevent more
126 : //! operations from getting scheduled.
127 : //!
128 : //! # Crash Consistency
129 : //!
130 : //! We do not persist the upload queue state.
131 : //! If we drop the client, or crash, all unfinished operations are lost.
132 : //!
133 : //! To recover, the following steps need to be taken:
134 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
135 : //! consistent remote state, assuming the user scheduled the operations in
136 : //! the correct order.
137 : //! - Initiate upload queue with that [`IndexPart`].
138 : //! - Reschedule all lost operations by comparing the local filesystem state
139 : //! and remote state as per [`IndexPart`]. This is done in
140 : //! [`Tenant::timeline_init_and_sync`].
141 : //!
142 : //! Note that if we crash during file deletion between the index update
143 : //! that removes the file from the list of files, and deleting the remote file,
144 : //! the file is leaked in the remote storage. Similarly, if a new file is created
145 : //! and uploaded, but the pageserver dies permanently before updating the
146 : //! remote index file, the new file is leaked in remote storage. We accept and
147 : //! tolerate that for now.
148 : //! Note further that we cannot easily fix this by scheduling deletes for every
149 : //! file that is present only on the remote, because we cannot distinguish the
150 : //! following two cases:
151 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
152 : //! but crashed before it finished remotely.
153 : //! - (2) We never had the file locally because we haven't on-demand downloaded
154 : //! it yet.
155 : //!
156 : //! # Downloads
157 : //!
158 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
159 : //! downloading files from the remote storage. Downloads are performed immediately
160 : //! against the `RemoteStorage`, independently of the upload queue.
161 : //!
162 : //! When we attach a tenant, we perform the following steps:
163 : //! - create `Tenant` object in `TenantState::Attaching` state
164 : //! - List timelines that are present in remote storage, and for each:
165 : //! - download their remote [`IndexPart`]s
166 : //! - create `Timeline` struct and a `RemoteTimelineClient`
167 : //! - initialize the client's upload queue with its `IndexPart`
168 : //! - schedule uploads for layers that are only present locally.
169 : //! - After the above is done for each timeline, open the tenant for business by
170 : //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
171 : //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
172 : //!
173 : //! # Operating Without Remote Storage
174 : //!
175 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
176 : //! not created and the uploads are skipped.
177 : //!
178 : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
179 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
180 :
181 : pub(crate) mod download;
182 : pub mod index;
183 : pub mod manifest;
184 : pub(crate) mod upload;
185 :
186 : use anyhow::Context;
187 : use camino::Utf8Path;
188 : use chrono::{NaiveDateTime, Utc};
189 :
190 : pub(crate) use download::download_initdb_tar_zst;
191 : use pageserver_api::models::TimelineArchivalState;
192 : use pageserver_api::shard::{ShardIndex, TenantShardId};
193 : use regex::Regex;
194 : use scopeguard::ScopeGuard;
195 : use tokio_util::sync::CancellationToken;
196 : use utils::backoff::{
197 : self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
198 : };
199 : use utils::pausable_failpoint;
200 : use utils::shard::ShardNumber;
201 :
202 : use std::collections::{HashMap, HashSet, VecDeque};
203 : use std::sync::atomic::{AtomicU32, Ordering};
204 : use std::sync::{Arc, Mutex, OnceLock};
205 : use std::time::Duration;
206 :
207 : use remote_storage::{
208 : DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
209 : };
210 : use std::ops::DerefMut;
211 : use tracing::{debug, error, info, instrument, warn};
212 : use tracing::{info_span, Instrument};
213 : use utils::lsn::Lsn;
214 :
215 : use crate::context::RequestContext;
216 : use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
217 : use crate::metrics::{
218 : MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
219 : RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
220 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
221 : };
222 : use crate::task_mgr::shutdown_token;
223 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
224 : use crate::tenant::remote_timeline_client::download::download_retry;
225 : use crate::tenant::storage_layer::AsLayerDesc;
226 : use crate::tenant::upload_queue::{Delete, OpType, UploadQueueStoppedDeletable};
227 : use crate::tenant::TIMELINES_SEGMENT_NAME;
228 : use crate::{
229 : config::PageServerConf,
230 : task_mgr,
231 : task_mgr::TaskKind,
232 : task_mgr::BACKGROUND_RUNTIME,
233 : tenant::metadata::TimelineMetadata,
234 : tenant::upload_queue::{
235 : UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
236 : },
237 : TENANT_HEATMAP_BASENAME,
238 : };
239 :
240 : use utils::id::{TenantId, TimelineId};
241 :
242 : use self::index::IndexPart;
243 :
244 : use super::config::AttachedLocationConfig;
245 : use super::metadata::MetadataUpdate;
246 : use super::storage_layer::{Layer, LayerName, ResidentLayer};
247 : use super::timeline::import_pgdata;
248 : use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
249 : use super::{DeleteTimelineError, Generation};
250 :
251 : pub(crate) use download::{
252 : download_index_part, download_tenant_manifest, is_temp_download_file,
253 : list_remote_tenant_shards, list_remote_timelines,
254 : };
255 : pub(crate) use index::LayerFileMetadata;
256 : pub(crate) use upload::upload_initdb_dir;
257 :
258 : // Occasional network issues and such can cause remote operations to fail, and
259 : // that's expected. If a download fails, we log it at info-level, and retry.
260 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
261 : // level instead, as repeated failures can mean a more serious problem. If it
262 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
263 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
264 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
265 :
266 : // Similarly log failed uploads and deletions at WARN level, after this many
267 : // retries. Uploads and deletions are retried forever, though.
268 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
269 :
270 : pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
271 :
272 : pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst";
273 :
274 : /// Default buffer size when interfacing with [`tokio::fs::File`].
275 : pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
276 :
277 : /// Doing non-essential flushes of deletion queue is subject to this timeout, after
278 : /// which we warn and skip.
279 : const DELETION_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_secs(10);
280 :
281 : pub enum MaybeDeletedIndexPart {
282 : IndexPart(IndexPart),
283 : Deleted(IndexPart),
284 : }
285 :
286 : #[derive(Debug, thiserror::Error)]
287 : pub enum PersistIndexPartWithDeletedFlagError {
288 : #[error("another task is already setting the deleted_flag, started at {0:?}")]
289 : AlreadyInProgress(NaiveDateTime),
290 : #[error("the deleted_flag was already set, value is {0:?}")]
291 : AlreadyDeleted(NaiveDateTime),
292 : #[error(transparent)]
293 : Other(#[from] anyhow::Error),
294 : }
295 :
296 : #[derive(Debug, thiserror::Error)]
297 : pub enum WaitCompletionError {
298 : #[error(transparent)]
299 : NotInitialized(NotInitialized),
300 : #[error("wait_completion aborted because upload queue was stopped")]
301 : UploadQueueShutDownOrStopped,
302 : }
303 :
304 : #[derive(Debug, thiserror::Error)]
305 : #[error("Upload queue either in unexpected state or hasn't downloaded manifest yet")]
306 : pub struct UploadQueueNotReadyError;
307 : /// Behavioral modes that enable seamless live migration.
308 : ///
309 : /// See docs/rfcs/028-pageserver-migration.md to understand how these fit in.
310 : struct RemoteTimelineClientConfig {
311 : /// If this is false, then update to remote_consistent_lsn are dropped rather
312 : /// than being submitted to DeletionQueue for validation. This behavior is
313 : /// used when a tenant attachment is known to have a stale generation number,
314 : /// such that validation attempts will always fail. This is not necessary
315 : /// for correctness, but avoids spamming error statistics with failed validations
316 : /// when doing migrations of tenants.
317 : process_remote_consistent_lsn_updates: bool,
318 :
319 : /// If this is true, then object deletions are held in a buffer in RemoteTimelineClient
320 : /// rather than being submitted to the DeletionQueue. This behavior is used when a tenant
321 : /// is known to be multi-attached, in order to avoid disrupting other attached tenants
322 : /// whose generations' metadata refers to the deleted objects.
323 : block_deletions: bool,
324 : }
325 :
326 : /// RemoteTimelineClientConfig's state is entirely driven by LocationConf, but we do
327 : /// not carry the entire LocationConf structure: it's much more than we need. The From
328 : /// impl extracts the subset of the LocationConf that is interesting to RemoteTimelineClient.
329 : impl From<&AttachedLocationConfig> for RemoteTimelineClientConfig {
330 432 : fn from(lc: &AttachedLocationConfig) -> Self {
331 432 : Self {
332 432 : block_deletions: !lc.may_delete_layers_hint(),
333 432 : process_remote_consistent_lsn_updates: lc.may_upload_layers_hint(),
334 432 : }
335 432 : }
336 : }
337 :
338 : /// A client for accessing a timeline's data in remote storage.
339 : ///
340 : /// This takes care of managing the number of connections, and balancing them
341 : /// across tenants. This also handles retries of failed uploads.
342 : ///
343 : /// Upload and delete requests are ordered so that before a deletion is
344 : /// performed, we wait for all preceding uploads to finish. This ensures sure
345 : /// that if you perform a compaction operation that reshuffles data in layer
346 : /// files, we don't have a transient state where the old files have already been
347 : /// deleted, but new files have not yet been uploaded.
348 : ///
349 : /// Similarly, this enforces an order between index-file uploads, and layer
350 : /// uploads. Before an index-file upload is performed, all preceding layer
351 : /// uploads must be finished.
352 : ///
353 : /// This also maintains a list of remote files, and automatically includes that
354 : /// in the index part file, whenever timeline metadata is uploaded.
355 : ///
356 : /// Downloads are not queued, they are performed immediately.
357 : pub(crate) struct RemoteTimelineClient {
358 : conf: &'static PageServerConf,
359 :
360 : runtime: tokio::runtime::Handle,
361 :
362 : tenant_shard_id: TenantShardId,
363 : timeline_id: TimelineId,
364 : generation: Generation,
365 :
366 : upload_queue: Mutex<UploadQueue>,
367 :
368 : pub(crate) metrics: Arc<RemoteTimelineClientMetrics>,
369 :
370 : storage_impl: GenericRemoteStorage,
371 :
372 : deletion_queue_client: DeletionQueueClient,
373 :
374 : /// Subset of tenant configuration used to control upload behaviors during migrations
375 : config: std::sync::RwLock<RemoteTimelineClientConfig>,
376 :
377 : cancel: CancellationToken,
378 : }
379 :
380 : impl RemoteTimelineClient {
381 : ///
382 : /// Create a remote storage client for given timeline
383 : ///
384 : /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
385 : /// by calling init_upload_queue.
386 : ///
387 422 : pub(crate) fn new(
388 422 : remote_storage: GenericRemoteStorage,
389 422 : deletion_queue_client: DeletionQueueClient,
390 422 : conf: &'static PageServerConf,
391 422 : tenant_shard_id: TenantShardId,
392 422 : timeline_id: TimelineId,
393 422 : generation: Generation,
394 422 : location_conf: &AttachedLocationConfig,
395 422 : ) -> RemoteTimelineClient {
396 422 : RemoteTimelineClient {
397 422 : conf,
398 422 : runtime: if cfg!(test) {
399 : // remote_timeline_client.rs tests rely on current-thread runtime
400 422 : tokio::runtime::Handle::current()
401 : } else {
402 0 : BACKGROUND_RUNTIME.handle().clone()
403 : },
404 422 : tenant_shard_id,
405 422 : timeline_id,
406 422 : generation,
407 422 : storage_impl: remote_storage,
408 422 : deletion_queue_client,
409 422 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
410 422 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
411 422 : &tenant_shard_id,
412 422 : &timeline_id,
413 422 : )),
414 422 : config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
415 422 : cancel: CancellationToken::new(),
416 422 : }
417 422 : }
418 :
419 : /// Initialize the upload queue for a remote storage that already received
420 : /// an index file upload, i.e., it's not empty.
421 : /// The given `index_part` must be the one on the remote.
422 6 : pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
423 6 : let mut upload_queue = self.upload_queue.lock().unwrap();
424 6 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
425 6 : self.update_remote_physical_size_gauge(Some(index_part));
426 6 : info!(
427 0 : "initialized upload queue from remote index with {} layer files",
428 0 : index_part.layer_metadata.len()
429 : );
430 6 : Ok(())
431 6 : }
432 :
433 : /// Initialize the upload queue for the case where the remote storage is empty,
434 : /// i.e., it doesn't have an `IndexPart`.
435 416 : pub fn init_upload_queue_for_empty_remote(
436 416 : &self,
437 416 : local_metadata: &TimelineMetadata,
438 416 : ) -> anyhow::Result<()> {
439 416 : let mut upload_queue = self.upload_queue.lock().unwrap();
440 416 : upload_queue.initialize_empty_remote(local_metadata)?;
441 416 : self.update_remote_physical_size_gauge(None);
442 416 : info!("initialized upload queue as empty");
443 416 : Ok(())
444 416 : }
445 :
446 : /// Initialize the queue in stopped state. Used in startup path
447 : /// to continue deletion operation interrupted by pageserver crash or restart.
448 0 : pub fn init_upload_queue_stopped_to_continue_deletion(
449 0 : &self,
450 0 : index_part: &IndexPart,
451 0 : ) -> anyhow::Result<()> {
452 : // FIXME: consider newtype for DeletedIndexPart.
453 0 : let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
454 0 : "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
455 0 : ))?;
456 :
457 0 : let mut upload_queue = self.upload_queue.lock().unwrap();
458 0 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
459 0 : self.update_remote_physical_size_gauge(Some(index_part));
460 0 : self.stop_impl(&mut upload_queue);
461 0 :
462 0 : upload_queue
463 0 : .stopped_mut()
464 0 : .expect("stopped above")
465 0 : .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
466 0 :
467 0 : Ok(())
468 0 : }
469 :
470 : /// Notify this client of a change to its parent tenant's config, as this may cause us to
471 : /// take action (unblocking deletions when transitioning from AttachedMulti to AttachedSingle)
472 0 : pub(super) fn update_config(&self, location_conf: &AttachedLocationConfig) {
473 0 : let new_conf = RemoteTimelineClientConfig::from(location_conf);
474 0 : let unblocked = !new_conf.block_deletions;
475 0 :
476 0 : // Update config before draining deletions, so that we don't race with more being
477 0 : // inserted. This can result in deletions happening our of order, but that does not
478 0 : // violate any invariants: deletions only need to be ordered relative to upload of the index
479 0 : // that dereferences the deleted objects, and we are not changing that order.
480 0 : *self.config.write().unwrap() = new_conf;
481 0 :
482 0 : if unblocked {
483 : // If we may now delete layers, drain any that were blocked in our old
484 : // configuration state
485 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
486 :
487 0 : if let Ok(queue) = queue_locked.initialized_mut() {
488 0 : let blocked_deletions = std::mem::take(&mut queue.blocked_deletions);
489 0 : for d in blocked_deletions {
490 0 : if let Err(e) = self.deletion_queue_client.push_layers_sync(
491 0 : self.tenant_shard_id,
492 0 : self.timeline_id,
493 0 : self.generation,
494 0 : d.layers,
495 0 : ) {
496 : // This could happen if the pageserver is shut down while a tenant
497 : // is transitioning from a deletion-blocked state: we will leak some
498 : // S3 objects in this case.
499 0 : warn!("Failed to drain blocked deletions: {}", e);
500 0 : break;
501 0 : }
502 : }
503 0 : }
504 0 : }
505 0 : }
506 :
507 : /// Returns `None` if nothing is yet uplodaded, `Some(disk_consistent_lsn)` otherwise.
508 0 : pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
509 0 : match &mut *self.upload_queue.lock().unwrap() {
510 0 : UploadQueue::Uninitialized => None,
511 0 : UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
512 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
513 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => q
514 0 : .upload_queue_for_deletion
515 0 : .get_last_remote_consistent_lsn_projected(),
516 : }
517 0 : }
518 :
519 0 : pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
520 0 : match &mut *self.upload_queue.lock().unwrap() {
521 0 : UploadQueue::Uninitialized => None,
522 0 : UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
523 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
524 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => Some(
525 0 : q.upload_queue_for_deletion
526 0 : .get_last_remote_consistent_lsn_visible(),
527 0 : ),
528 : }
529 0 : }
530 :
531 : /// Returns true if this timeline was previously detached at this Lsn and the remote timeline
532 : /// client is currently initialized.
533 0 : pub(crate) fn is_previous_ancestor_lsn(&self, lsn: Lsn) -> bool {
534 0 : self.upload_queue
535 0 : .lock()
536 0 : .unwrap()
537 0 : .initialized_mut()
538 0 : .map(|uq| uq.clean.0.lineage.is_previous_ancestor_lsn(lsn))
539 0 : .unwrap_or(false)
540 0 : }
541 :
542 : /// Returns whether the timeline is archived.
543 : /// Return None if the remote index_part hasn't been downloaded yet.
544 2 : pub(crate) fn is_archived(&self) -> Option<bool> {
545 2 : self.upload_queue
546 2 : .lock()
547 2 : .unwrap()
548 2 : .initialized_mut()
549 2 : .map(|q| q.clean.0.archived_at.is_some())
550 2 : .ok()
551 2 : }
552 :
553 : /// Returns `Ok(Some(timestamp))` if the timeline has been archived, `Ok(None)` if the timeline hasn't been archived.
554 : ///
555 : /// Return Err(_) if the remote index_part hasn't been downloaded yet, or the timeline hasn't been stopped yet.
556 2 : pub(crate) fn archived_at_stopped_queue(
557 2 : &self,
558 2 : ) -> Result<Option<NaiveDateTime>, UploadQueueNotReadyError> {
559 2 : self.upload_queue
560 2 : .lock()
561 2 : .unwrap()
562 2 : .stopped_mut()
563 2 : .map(|q| q.upload_queue_for_deletion.clean.0.archived_at)
564 2 : .map_err(|_| UploadQueueNotReadyError)
565 2 : }
566 :
567 1826 : fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
568 1826 : let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
569 1410 : current_remote_index_part
570 1410 : .layer_metadata
571 1410 : .values()
572 17355 : .map(|ilmd| ilmd.file_size)
573 1410 : .sum()
574 : } else {
575 416 : 0
576 : };
577 1826 : self.metrics.remote_physical_size_gauge.set(size);
578 1826 : }
579 :
580 2 : pub fn get_remote_physical_size(&self) -> u64 {
581 2 : self.metrics.remote_physical_size_gauge.get()
582 2 : }
583 :
584 : //
585 : // Download operations.
586 : //
587 : // These don't use the per-timeline queue. They do use the global semaphore in
588 : // S3Bucket, to limit the total number of concurrent operations, though.
589 : //
590 :
591 : /// Download index file
592 20 : pub async fn download_index_file(
593 20 : &self,
594 20 : cancel: &CancellationToken,
595 20 : ) -> Result<MaybeDeletedIndexPart, DownloadError> {
596 20 : let _unfinished_gauge_guard = self.metrics.call_begin(
597 20 : &RemoteOpFileKind::Index,
598 20 : &RemoteOpKind::Download,
599 20 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
600 20 : reason: "no need for a downloads gauge",
601 20 : },
602 20 : );
603 :
604 20 : let (index_part, index_generation, index_last_modified) = download::download_index_part(
605 20 : &self.storage_impl,
606 20 : &self.tenant_shard_id,
607 20 : &self.timeline_id,
608 20 : self.generation,
609 20 : cancel,
610 20 : )
611 20 : .measure_remote_op(
612 20 : RemoteOpFileKind::Index,
613 20 : RemoteOpKind::Download,
614 20 : Arc::clone(&self.metrics),
615 20 : )
616 20 : .await?;
617 :
618 : // Defense in depth: monotonicity of generation numbers is an important correctness guarantee, so when we see a very
619 : // old index, we do extra checks in case this is the result of backward time-travel of the generation number (e.g.
620 : // in case of a bug in the service that issues generation numbers). Indices are allowed to be old, but we expect that
621 : // when we load an old index we are loading the _latest_ index: if we are asked to load an old index and there is
622 : // also a newer index available, that is surprising.
623 : const INDEX_AGE_CHECKS_THRESHOLD: Duration = Duration::from_secs(14 * 24 * 3600);
624 20 : let index_age = index_last_modified.elapsed().unwrap_or_else(|e| {
625 0 : if e.duration() > Duration::from_secs(5) {
626 : // We only warn if the S3 clock and our local clock are >5s out: because this is a low resolution
627 : // timestamp, it is common to be out by at least 1 second.
628 0 : tracing::warn!("Index has modification time in the future: {e}");
629 0 : }
630 0 : Duration::ZERO
631 20 : });
632 20 : if index_age > INDEX_AGE_CHECKS_THRESHOLD {
633 0 : tracing::info!(
634 : ?index_generation,
635 0 : age = index_age.as_secs_f64(),
636 0 : "Loaded an old index, checking for other indices..."
637 : );
638 :
639 : // Find the highest-generation index
640 0 : let (_latest_index_part, latest_index_generation, latest_index_mtime) =
641 0 : download::download_index_part(
642 0 : &self.storage_impl,
643 0 : &self.tenant_shard_id,
644 0 : &self.timeline_id,
645 0 : Generation::MAX,
646 0 : cancel,
647 0 : )
648 0 : .await?;
649 :
650 0 : if latest_index_generation > index_generation {
651 : // Unexpected! Why are we loading such an old index if a more recent one exists?
652 : // We will refuse to proceed, as there is no reasonable scenario where this should happen, but
653 : // there _is_ a clear bug/corruption scenario where it would happen (controller sets the generation
654 : // backwards).
655 0 : tracing::error!(
656 : ?index_generation,
657 : ?latest_index_generation,
658 : ?latest_index_mtime,
659 0 : "Found a newer index while loading an old one"
660 : );
661 0 : return Err(DownloadError::Fatal(
662 0 : "Index age exceeds threshold and a newer index exists".into(),
663 0 : ));
664 0 : }
665 20 : }
666 :
667 20 : if index_part.deleted_at.is_some() {
668 0 : Ok(MaybeDeletedIndexPart::Deleted(index_part))
669 : } else {
670 20 : Ok(MaybeDeletedIndexPart::IndexPart(index_part))
671 : }
672 20 : }
673 :
674 : /// Download a (layer) file from `path`, into local filesystem.
675 : ///
676 : /// 'layer_metadata' is the metadata from the remote index file.
677 : ///
678 : /// On success, returns the size of the downloaded file.
679 6 : pub async fn download_layer_file(
680 6 : &self,
681 6 : layer_file_name: &LayerName,
682 6 : layer_metadata: &LayerFileMetadata,
683 6 : local_path: &Utf8Path,
684 6 : gate: &utils::sync::gate::Gate,
685 6 : cancel: &CancellationToken,
686 6 : ctx: &RequestContext,
687 6 : ) -> Result<u64, DownloadError> {
688 6 : let downloaded_size = {
689 6 : let _unfinished_gauge_guard = self.metrics.call_begin(
690 6 : &RemoteOpFileKind::Layer,
691 6 : &RemoteOpKind::Download,
692 6 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
693 6 : reason: "no need for a downloads gauge",
694 6 : },
695 6 : );
696 6 : download::download_layer_file(
697 6 : self.conf,
698 6 : &self.storage_impl,
699 6 : self.tenant_shard_id,
700 6 : self.timeline_id,
701 6 : layer_file_name,
702 6 : layer_metadata,
703 6 : local_path,
704 6 : gate,
705 6 : cancel,
706 6 : ctx,
707 6 : )
708 6 : .measure_remote_op(
709 6 : RemoteOpFileKind::Layer,
710 6 : RemoteOpKind::Download,
711 6 : Arc::clone(&self.metrics),
712 6 : )
713 6 : .await?
714 : };
715 :
716 6 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
717 6 : REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
718 6 :
719 6 : Ok(downloaded_size)
720 6 : }
721 :
722 : //
723 : // Upload operations.
724 : //
725 :
726 : /// Launch an index-file upload operation in the background, with
727 : /// fully updated metadata.
728 : ///
729 : /// This should only be used to upload initial metadata to remote storage.
730 : ///
731 : /// The upload will be added to the queue immediately, but it
732 : /// won't be performed until all previously scheduled layer file
733 : /// upload operations have completed successfully. This is to
734 : /// ensure that when the index file claims that layers X, Y and Z
735 : /// exist in remote storage, they really do. To wait for the upload
736 : /// to complete, use `wait_completion`.
737 : ///
738 : /// If there were any changes to the list of files, i.e. if any
739 : /// layer file uploads were scheduled, since the last index file
740 : /// upload, those will be included too.
741 230 : pub fn schedule_index_upload_for_full_metadata_update(
742 230 : self: &Arc<Self>,
743 230 : metadata: &TimelineMetadata,
744 230 : ) -> anyhow::Result<()> {
745 230 : let mut guard = self.upload_queue.lock().unwrap();
746 230 : let upload_queue = guard.initialized_mut()?;
747 :
748 : // As documented in the struct definition, it's ok for latest_metadata to be
749 : // ahead of what's _actually_ on the remote during index upload.
750 230 : upload_queue.dirty.metadata = metadata.clone();
751 230 :
752 230 : self.schedule_index_upload(upload_queue);
753 230 :
754 230 : Ok(())
755 230 : }
756 :
757 : /// Launch an index-file upload operation in the background, with only parts of the metadata
758 : /// updated.
759 : ///
760 : /// This is the regular way of updating metadata on layer flushes or Gc.
761 : ///
762 : /// Using this lighter update mechanism allows for reparenting and detaching without changes to
763 : /// `index_part.json`, while being more clear on what values update regularly.
764 1150 : pub(crate) fn schedule_index_upload_for_metadata_update(
765 1150 : self: &Arc<Self>,
766 1150 : update: &MetadataUpdate,
767 1150 : ) -> anyhow::Result<()> {
768 1150 : let mut guard = self.upload_queue.lock().unwrap();
769 1150 : let upload_queue = guard.initialized_mut()?;
770 :
771 1150 : upload_queue.dirty.metadata.apply(update);
772 1150 :
773 1150 : self.schedule_index_upload(upload_queue);
774 1150 :
775 1150 : Ok(())
776 1150 : }
777 :
778 : /// Launch an index-file upload operation in the background, with only the `archived_at` field updated.
779 : ///
780 : /// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded,
781 : /// so either if the change is already sitting in the queue, but not commited yet, or the change has not
782 : /// been in the queue yet.
783 2 : pub(crate) fn schedule_index_upload_for_timeline_archival_state(
784 2 : self: &Arc<Self>,
785 2 : state: TimelineArchivalState,
786 2 : ) -> anyhow::Result<bool> {
787 2 : let mut guard = self.upload_queue.lock().unwrap();
788 2 : let upload_queue = guard.initialized_mut()?;
789 :
790 : /// Returns Some(_) if a change is needed, and Some(true) if it's a
791 : /// change needed to set archived_at.
792 4 : fn need_change(
793 4 : archived_at: &Option<NaiveDateTime>,
794 4 : state: TimelineArchivalState,
795 4 : ) -> Option<bool> {
796 4 : match (archived_at, state) {
797 : (Some(_), TimelineArchivalState::Archived)
798 : | (None, TimelineArchivalState::Unarchived) => {
799 : // Nothing to do
800 0 : tracing::info!("intended state matches present state");
801 0 : None
802 : }
803 4 : (None, TimelineArchivalState::Archived) => Some(true),
804 0 : (Some(_), TimelineArchivalState::Unarchived) => Some(false),
805 : }
806 4 : }
807 2 : let need_upload_scheduled = need_change(&upload_queue.dirty.archived_at, state);
808 :
809 2 : if let Some(archived_at_set) = need_upload_scheduled {
810 2 : let intended_archived_at = archived_at_set.then(|| Utc::now().naive_utc());
811 2 : upload_queue.dirty.archived_at = intended_archived_at;
812 2 : self.schedule_index_upload(upload_queue);
813 2 : }
814 :
815 2 : let need_wait = need_change(&upload_queue.clean.0.archived_at, state).is_some();
816 2 : Ok(need_wait)
817 2 : }
818 :
819 : /// Launch an index-file upload operation in the background, setting `import_pgdata` field.
820 0 : pub(crate) fn schedule_index_upload_for_import_pgdata_state_update(
821 0 : self: &Arc<Self>,
822 0 : state: Option<import_pgdata::index_part_format::Root>,
823 0 : ) -> anyhow::Result<()> {
824 0 : let mut guard = self.upload_queue.lock().unwrap();
825 0 : let upload_queue = guard.initialized_mut()?;
826 0 : upload_queue.dirty.import_pgdata = state;
827 0 : self.schedule_index_upload(upload_queue);
828 0 : Ok(())
829 0 : }
830 :
831 : ///
832 : /// Launch an index-file upload operation in the background, if necessary.
833 : ///
834 : /// Use this function to schedule the update of the index file after
835 : /// scheduling file uploads or deletions. If no file uploads or deletions
836 : /// have been scheduled since the last index file upload, this does
837 : /// nothing.
838 : ///
839 : /// Like schedule_index_upload_for_metadata_update(), this merely adds
840 : /// the upload to the upload queue and returns quickly.
841 370 : pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> Result<(), NotInitialized> {
842 370 : let mut guard = self.upload_queue.lock().unwrap();
843 370 : let upload_queue = guard.initialized_mut()?;
844 :
845 370 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
846 14 : self.schedule_index_upload(upload_queue);
847 356 : }
848 :
849 370 : Ok(())
850 370 : }
851 :
852 : /// Launch an index-file upload operation in the background (internal function)
853 1466 : fn schedule_index_upload(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
854 1466 : let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn();
855 1466 : // fix up the duplicated field
856 1466 : upload_queue.dirty.disk_consistent_lsn = disk_consistent_lsn;
857 1466 :
858 1466 : // make sure it serializes before doing it in perform_upload_task so that it doesn't
859 1466 : // look like a retryable error
860 1466 : let void = std::io::sink();
861 1466 : serde_json::to_writer(void, &upload_queue.dirty).expect("serialize index_part.json");
862 1466 :
863 1466 : let index_part = &upload_queue.dirty;
864 1466 :
865 1466 : info!(
866 0 : "scheduling metadata upload up to consistent LSN {disk_consistent_lsn} with {} files ({} changed)",
867 0 : index_part.layer_metadata.len(),
868 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
869 : );
870 :
871 1466 : let op = UploadOp::UploadMetadata {
872 1466 : uploaded: Box::new(index_part.clone()),
873 1466 : };
874 1466 : self.metric_begin(&op);
875 1466 : upload_queue.queued_operations.push_back(op);
876 1466 : upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
877 1466 :
878 1466 : // Launch the task immediately, if possible
879 1466 : self.launch_queued_tasks(upload_queue);
880 1466 : }
881 :
882 : /// Reparent this timeline to a new parent.
883 : ///
884 : /// A retryable step of timeline ancestor detach.
885 0 : pub(crate) async fn schedule_reparenting_and_wait(
886 0 : self: &Arc<Self>,
887 0 : new_parent: &TimelineId,
888 0 : ) -> anyhow::Result<()> {
889 0 : let receiver = {
890 0 : let mut guard = self.upload_queue.lock().unwrap();
891 0 : let upload_queue = guard.initialized_mut()?;
892 :
893 0 : let Some(prev) = upload_queue.dirty.metadata.ancestor_timeline() else {
894 0 : return Err(anyhow::anyhow!(
895 0 : "cannot reparent without a current ancestor"
896 0 : ));
897 : };
898 :
899 0 : let uploaded = &upload_queue.clean.0.metadata;
900 0 :
901 0 : if uploaded.ancestor_timeline().is_none() && !uploaded.ancestor_lsn().is_valid() {
902 : // nothing to do
903 0 : None
904 : } else {
905 0 : upload_queue.dirty.metadata.reparent(new_parent);
906 0 : upload_queue.dirty.lineage.record_previous_ancestor(&prev);
907 0 :
908 0 : self.schedule_index_upload(upload_queue);
909 0 :
910 0 : Some(self.schedule_barrier0(upload_queue))
911 : }
912 : };
913 :
914 0 : if let Some(receiver) = receiver {
915 0 : Self::wait_completion0(receiver).await?;
916 0 : }
917 0 : Ok(())
918 0 : }
919 :
920 : /// Schedules uploading a new version of `index_part.json` with the given layers added,
921 : /// detaching from ancestor and waits for it to complete.
922 : ///
923 : /// This is used with `Timeline::detach_ancestor` functionality.
924 0 : pub(crate) async fn schedule_adding_existing_layers_to_index_detach_and_wait(
925 0 : self: &Arc<Self>,
926 0 : layers: &[Layer],
927 0 : adopted: (TimelineId, Lsn),
928 0 : ) -> anyhow::Result<()> {
929 0 : let barrier = {
930 0 : let mut guard = self.upload_queue.lock().unwrap();
931 0 : let upload_queue = guard.initialized_mut()?;
932 :
933 0 : if upload_queue.clean.0.lineage.detached_previous_ancestor() == Some(adopted) {
934 0 : None
935 : } else {
936 0 : upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
937 0 : upload_queue.dirty.lineage.record_detaching(&adopted);
938 :
939 0 : for layer in layers {
940 0 : let prev = upload_queue
941 0 : .dirty
942 0 : .layer_metadata
943 0 : .insert(layer.layer_desc().layer_name(), layer.metadata());
944 0 : assert!(prev.is_none(), "copied layer existed already {layer}");
945 : }
946 :
947 0 : self.schedule_index_upload(upload_queue);
948 0 :
949 0 : Some(self.schedule_barrier0(upload_queue))
950 : }
951 : };
952 :
953 0 : if let Some(barrier) = barrier {
954 0 : Self::wait_completion0(barrier).await?;
955 0 : }
956 0 : Ok(())
957 0 : }
958 :
959 : /// Adds a gc blocking reason for this timeline if one does not exist already.
960 : ///
961 : /// A retryable step of timeline detach ancestor.
962 : ///
963 : /// Returns a future which waits until the completion of the upload.
964 0 : pub(crate) fn schedule_insert_gc_block_reason(
965 0 : self: &Arc<Self>,
966 0 : reason: index::GcBlockingReason,
967 0 : ) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
968 0 : {
969 0 : let maybe_barrier = {
970 0 : let mut guard = self.upload_queue.lock().unwrap();
971 0 : let upload_queue = guard.initialized_mut()?;
972 :
973 0 : if let index::GcBlockingReason::DetachAncestor = reason {
974 0 : if upload_queue.dirty.metadata.ancestor_timeline().is_none() {
975 0 : drop(guard);
976 0 : panic!("cannot start detach ancestor if there is nothing to detach from");
977 0 : }
978 0 : }
979 :
980 0 : let wanted = |x: Option<&index::GcBlocking>| x.is_some_and(|x| x.blocked_by(reason));
981 :
982 0 : let current = upload_queue.dirty.gc_blocking.as_ref();
983 0 : let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
984 0 :
985 0 : match (current, uploaded) {
986 0 : (x, y) if wanted(x) && wanted(y) => None,
987 0 : (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
988 : // Usual case: !wanted(x) && !wanted(y)
989 : //
990 : // Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to
991 : // turn on and off some reason.
992 0 : (x, y) => {
993 0 : if !wanted(x) && wanted(y) {
994 : // this could be avoided by having external in-memory synchronization, like
995 : // timeline detach ancestor
996 0 : warn!(?reason, op="insert", "unexpected: two racing processes to enable and disable a gc blocking reason");
997 0 : }
998 :
999 : // at this point, the metadata must always show that there is a parent
1000 0 : upload_queue.dirty.gc_blocking = current
1001 0 : .map(|x| x.with_reason(reason))
1002 0 : .or_else(|| Some(index::GcBlocking::started_now_for(reason)));
1003 0 : self.schedule_index_upload(upload_queue);
1004 0 : Some(self.schedule_barrier0(upload_queue))
1005 : }
1006 : }
1007 : };
1008 :
1009 0 : Ok(async move {
1010 0 : if let Some(barrier) = maybe_barrier {
1011 0 : Self::wait_completion0(barrier).await?;
1012 0 : }
1013 0 : Ok(())
1014 0 : })
1015 0 : }
1016 :
1017 : /// Removes a gc blocking reason for this timeline if one exists.
1018 : ///
1019 : /// A retryable step of timeline detach ancestor.
1020 : ///
1021 : /// Returns a future which waits until the completion of the upload.
1022 0 : pub(crate) fn schedule_remove_gc_block_reason(
1023 0 : self: &Arc<Self>,
1024 0 : reason: index::GcBlockingReason,
1025 0 : ) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
1026 0 : {
1027 0 : let maybe_barrier = {
1028 0 : let mut guard = self.upload_queue.lock().unwrap();
1029 0 : let upload_queue = guard.initialized_mut()?;
1030 :
1031 0 : if let index::GcBlockingReason::DetachAncestor = reason {
1032 0 : if !upload_queue.clean.0.lineage.is_detached_from_ancestor() {
1033 0 : drop(guard);
1034 0 : panic!("cannot complete timeline_ancestor_detach while not detached");
1035 0 : }
1036 0 : }
1037 :
1038 0 : let wanted = |x: Option<&index::GcBlocking>| {
1039 0 : x.is_none() || x.is_some_and(|b| !b.blocked_by(reason))
1040 0 : };
1041 :
1042 0 : let current = upload_queue.dirty.gc_blocking.as_ref();
1043 0 : let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
1044 0 :
1045 0 : match (current, uploaded) {
1046 0 : (x, y) if wanted(x) && wanted(y) => None,
1047 0 : (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
1048 0 : (x, y) => {
1049 0 : if !wanted(x) && wanted(y) {
1050 0 : warn!(?reason, op="remove", "unexpected: two racing processes to enable and disable a gc blocking reason (remove)");
1051 0 : }
1052 :
1053 0 : upload_queue.dirty.gc_blocking =
1054 0 : current.as_ref().and_then(|x| x.without_reason(reason));
1055 0 : assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
1056 0 : self.schedule_index_upload(upload_queue);
1057 0 : Some(self.schedule_barrier0(upload_queue))
1058 : }
1059 : }
1060 : };
1061 :
1062 0 : Ok(async move {
1063 0 : if let Some(barrier) = maybe_barrier {
1064 0 : Self::wait_completion0(barrier).await?;
1065 0 : }
1066 0 : Ok(())
1067 0 : })
1068 0 : }
1069 :
1070 : /// Launch an upload operation in the background; the file is added to be included in next
1071 : /// `index_part.json` upload.
1072 1194 : pub(crate) fn schedule_layer_file_upload(
1073 1194 : self: &Arc<Self>,
1074 1194 : layer: ResidentLayer,
1075 1194 : ) -> Result<(), NotInitialized> {
1076 1194 : let mut guard = self.upload_queue.lock().unwrap();
1077 1194 : let upload_queue = guard.initialized_mut()?;
1078 :
1079 1194 : self.schedule_layer_file_upload0(upload_queue, layer);
1080 1194 : self.launch_queued_tasks(upload_queue);
1081 1194 : Ok(())
1082 1194 : }
1083 :
1084 1556 : fn schedule_layer_file_upload0(
1085 1556 : self: &Arc<Self>,
1086 1556 : upload_queue: &mut UploadQueueInitialized,
1087 1556 : layer: ResidentLayer,
1088 1556 : ) {
1089 1556 : let metadata = layer.metadata();
1090 1556 :
1091 1556 : upload_queue
1092 1556 : .dirty
1093 1556 : .layer_metadata
1094 1556 : .insert(layer.layer_desc().layer_name(), metadata.clone());
1095 1556 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
1096 1556 :
1097 1556 : info!(
1098 : gen=?metadata.generation,
1099 : shard=?metadata.shard,
1100 0 : "scheduled layer file upload {layer}",
1101 : );
1102 :
1103 1556 : let op = UploadOp::UploadLayer(layer, metadata, None);
1104 1556 : self.metric_begin(&op);
1105 1556 : upload_queue.queued_operations.push_back(op);
1106 1556 : }
1107 :
1108 : /// Launch a delete operation in the background.
1109 : ///
1110 : /// The operation does not modify local filesystem state.
1111 : ///
1112 : /// Note: This schedules an index file upload before the deletions. The
1113 : /// deletion won't actually be performed, until all previously scheduled
1114 : /// upload operations, and the index file upload, have completed
1115 : /// successfully.
1116 8 : pub fn schedule_layer_file_deletion(
1117 8 : self: &Arc<Self>,
1118 8 : names: &[LayerName],
1119 8 : ) -> anyhow::Result<()> {
1120 8 : let mut guard = self.upload_queue.lock().unwrap();
1121 8 : let upload_queue = guard.initialized_mut()?;
1122 :
1123 8 : let with_metadata =
1124 8 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
1125 8 :
1126 8 : self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
1127 8 :
1128 8 : // Launch the tasks immediately, if possible
1129 8 : self.launch_queued_tasks(upload_queue);
1130 8 : Ok(())
1131 8 : }
1132 :
1133 : /// Unlinks the layer files from `index_part.json` but does not yet schedule deletion for the
1134 : /// layer files, leaving them dangling.
1135 : ///
1136 : /// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
1137 : /// is invoked on them.
1138 4 : pub(crate) fn schedule_gc_update(
1139 4 : self: &Arc<Self>,
1140 4 : gc_layers: &[Layer],
1141 4 : ) -> Result<(), NotInitialized> {
1142 4 : let mut guard = self.upload_queue.lock().unwrap();
1143 4 : let upload_queue = guard.initialized_mut()?;
1144 :
1145 : // just forget the return value; after uploading the next index_part.json, we can consider
1146 : // the layer files as "dangling". this is fine, at worst case we create work for the
1147 : // scrubber.
1148 :
1149 4 : let names = gc_layers.iter().map(|x| x.layer_desc().layer_name());
1150 4 :
1151 4 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
1152 4 :
1153 4 : self.launch_queued_tasks(upload_queue);
1154 4 :
1155 4 : Ok(())
1156 4 : }
1157 :
1158 : /// Update the remote index file, removing the to-be-deleted files from the index,
1159 : /// allowing scheduling of actual deletions later.
1160 88 : fn schedule_unlinking_of_layers_from_index_part0<I>(
1161 88 : self: &Arc<Self>,
1162 88 : upload_queue: &mut UploadQueueInitialized,
1163 88 : names: I,
1164 88 : ) -> Vec<(LayerName, LayerFileMetadata)>
1165 88 : where
1166 88 : I: IntoIterator<Item = LayerName>,
1167 88 : {
1168 88 : // Decorate our list of names with each name's metadata, dropping
1169 88 : // names that are unexpectedly missing from our metadata. This metadata
1170 88 : // is later used when physically deleting layers, to construct key paths.
1171 88 : let with_metadata: Vec<_> = names
1172 88 : .into_iter()
1173 512 : .filter_map(|name| {
1174 512 : let meta = upload_queue.dirty.layer_metadata.remove(&name);
1175 :
1176 512 : if let Some(meta) = meta {
1177 446 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
1178 446 : Some((name, meta))
1179 : } else {
1180 : // This can only happen if we forgot to to schedule the file upload
1181 : // before scheduling the delete. Log it because it is a rare/strange
1182 : // situation, and in case something is misbehaving, we'd like to know which
1183 : // layers experienced this.
1184 66 : info!("Deleting layer {name} not found in latest_files list, never uploaded?");
1185 66 : None
1186 : }
1187 512 : })
1188 88 : .collect();
1189 :
1190 : #[cfg(feature = "testing")]
1191 534 : for (name, metadata) in &with_metadata {
1192 446 : let gen = metadata.generation;
1193 446 : if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen) {
1194 0 : if unexpected == gen {
1195 0 : tracing::error!("{name} was unlinked twice with same generation");
1196 : } else {
1197 0 : tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
1198 : }
1199 446 : }
1200 : }
1201 :
1202 : // after unlinking files from the upload_queue.latest_files we must always schedule an
1203 : // index_part update, because that needs to be uploaded before we can actually delete the
1204 : // files.
1205 88 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
1206 70 : self.schedule_index_upload(upload_queue);
1207 70 : }
1208 :
1209 88 : with_metadata
1210 88 : }
1211 :
1212 : /// Schedules deletion for layer files which have previously been unlinked from the
1213 : /// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
1214 505 : pub(crate) fn schedule_deletion_of_unlinked(
1215 505 : self: &Arc<Self>,
1216 505 : layers: Vec<(LayerName, LayerFileMetadata)>,
1217 505 : ) -> anyhow::Result<()> {
1218 505 : let mut guard = self.upload_queue.lock().unwrap();
1219 505 : let upload_queue = guard.initialized_mut()?;
1220 :
1221 505 : self.schedule_deletion_of_unlinked0(upload_queue, layers);
1222 505 : self.launch_queued_tasks(upload_queue);
1223 505 : Ok(())
1224 505 : }
1225 :
1226 513 : fn schedule_deletion_of_unlinked0(
1227 513 : self: &Arc<Self>,
1228 513 : upload_queue: &mut UploadQueueInitialized,
1229 513 : mut with_metadata: Vec<(LayerName, LayerFileMetadata)>,
1230 513 : ) {
1231 513 : // Filter out any layers which were not created by this tenant shard. These are
1232 513 : // layers that originate from some ancestor shard after a split, and may still
1233 513 : // be referenced by other shards. We are free to delete them locally and remove
1234 513 : // them from our index (and would have already done so when we reach this point
1235 513 : // in the code), but we may not delete them remotely.
1236 513 : with_metadata.retain(|(name, meta)| {
1237 507 : let retain = meta.shard.shard_number == self.tenant_shard_id.shard_number
1238 507 : && meta.shard.shard_count == self.tenant_shard_id.shard_count;
1239 507 : if !retain {
1240 0 : tracing::debug!(
1241 0 : "Skipping deletion of ancestor-shard layer {name}, from shard {}",
1242 : meta.shard
1243 : );
1244 507 : }
1245 507 : retain
1246 513 : });
1247 :
1248 1020 : for (name, meta) in &with_metadata {
1249 507 : info!(
1250 0 : "scheduling deletion of layer {}{} (shard {})",
1251 0 : name,
1252 0 : meta.generation.get_suffix(),
1253 : meta.shard
1254 : );
1255 : }
1256 :
1257 : #[cfg(feature = "testing")]
1258 1020 : for (name, meta) in &with_metadata {
1259 507 : let gen = meta.generation;
1260 507 : match upload_queue.dangling_files.remove(name) {
1261 435 : Some(same) if same == gen => { /* expected */ }
1262 0 : Some(other) => {
1263 0 : tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
1264 : }
1265 : None => {
1266 72 : tracing::error!("{name} was unlinked but was not dangling");
1267 : }
1268 : }
1269 : }
1270 :
1271 : // schedule the actual deletions
1272 513 : if with_metadata.is_empty() {
1273 : // avoid scheduling the op & bumping the metric
1274 6 : return;
1275 507 : }
1276 507 : let op = UploadOp::Delete(Delete {
1277 507 : layers: with_metadata,
1278 507 : });
1279 507 : self.metric_begin(&op);
1280 507 : upload_queue.queued_operations.push_back(op);
1281 513 : }
1282 :
1283 : /// Schedules a compaction update to the remote `index_part.json`.
1284 : ///
1285 : /// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
1286 76 : pub(crate) fn schedule_compaction_update(
1287 76 : self: &Arc<Self>,
1288 76 : compacted_from: &[Layer],
1289 76 : compacted_to: &[ResidentLayer],
1290 76 : ) -> Result<(), NotInitialized> {
1291 76 : let mut guard = self.upload_queue.lock().unwrap();
1292 76 : let upload_queue = guard.initialized_mut()?;
1293 :
1294 438 : for layer in compacted_to {
1295 362 : self.schedule_layer_file_upload0(upload_queue, layer.clone());
1296 362 : }
1297 :
1298 506 : let names = compacted_from.iter().map(|x| x.layer_desc().layer_name());
1299 76 :
1300 76 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
1301 76 : self.launch_queued_tasks(upload_queue);
1302 76 :
1303 76 : Ok(())
1304 76 : }
1305 :
1306 : /// Wait for all previously scheduled uploads/deletions to complete
1307 190 : pub(crate) async fn wait_completion(self: &Arc<Self>) -> Result<(), WaitCompletionError> {
1308 190 : let receiver = {
1309 190 : let mut guard = self.upload_queue.lock().unwrap();
1310 190 : let upload_queue = guard
1311 190 : .initialized_mut()
1312 190 : .map_err(WaitCompletionError::NotInitialized)?;
1313 190 : self.schedule_barrier0(upload_queue)
1314 190 : };
1315 190 :
1316 190 : Self::wait_completion0(receiver).await
1317 190 : }
1318 :
1319 190 : async fn wait_completion0(
1320 190 : mut receiver: tokio::sync::watch::Receiver<()>,
1321 190 : ) -> Result<(), WaitCompletionError> {
1322 190 : if receiver.changed().await.is_err() {
1323 0 : return Err(WaitCompletionError::UploadQueueShutDownOrStopped);
1324 190 : }
1325 190 :
1326 190 : Ok(())
1327 190 : }
1328 :
1329 6 : pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
1330 6 : let mut guard = self.upload_queue.lock().unwrap();
1331 6 : let upload_queue = guard.initialized_mut()?;
1332 6 : self.schedule_barrier0(upload_queue);
1333 6 : Ok(())
1334 6 : }
1335 :
1336 196 : fn schedule_barrier0(
1337 196 : self: &Arc<Self>,
1338 196 : upload_queue: &mut UploadQueueInitialized,
1339 196 : ) -> tokio::sync::watch::Receiver<()> {
1340 196 : let (sender, receiver) = tokio::sync::watch::channel(());
1341 196 : let barrier_op = UploadOp::Barrier(sender);
1342 196 :
1343 196 : upload_queue.queued_operations.push_back(barrier_op);
1344 196 : // Don't count this kind of operation!
1345 196 :
1346 196 : // Launch the task immediately, if possible
1347 196 : self.launch_queued_tasks(upload_queue);
1348 196 :
1349 196 : receiver
1350 196 : }
1351 :
1352 : /// Wait for all previously scheduled operations to complete, and then stop.
1353 : ///
1354 : /// Not cancellation safe
1355 8 : pub(crate) async fn shutdown(self: &Arc<Self>) {
1356 8 : // On cancellation the queue is left in ackward state of refusing new operations but
1357 8 : // proper stop is yet to be called. On cancel the original or some later task must call
1358 8 : // `stop` or `shutdown`.
1359 8 : let sg = scopeguard::guard((), |_| {
1360 0 : tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
1361 8 : });
1362 :
1363 8 : let fut = {
1364 8 : let mut guard = self.upload_queue.lock().unwrap();
1365 8 : let upload_queue = match &mut *guard {
1366 : UploadQueue::Stopped(_) => {
1367 0 : scopeguard::ScopeGuard::into_inner(sg);
1368 0 : return;
1369 : }
1370 : UploadQueue::Uninitialized => {
1371 : // transition into Stopped state
1372 0 : self.stop_impl(&mut guard);
1373 0 : scopeguard::ScopeGuard::into_inner(sg);
1374 0 : return;
1375 : }
1376 8 : UploadQueue::Initialized(ref mut init) => init,
1377 8 : };
1378 8 :
1379 8 : // if the queue is already stuck due to a shutdown operation which was cancelled, then
1380 8 : // just don't add more of these as they would never complete.
1381 8 : //
1382 8 : // TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue
1383 8 : // in every place we would not have to jump through this hoop, and this method could be
1384 8 : // made cancellable.
1385 8 : if !upload_queue.shutting_down {
1386 8 : upload_queue.shutting_down = true;
1387 8 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
1388 8 : // this operation is not counted similar to Barrier
1389 8 :
1390 8 : self.launch_queued_tasks(upload_queue);
1391 8 : }
1392 :
1393 8 : upload_queue.shutdown_ready.clone().acquire_owned()
1394 : };
1395 :
1396 8 : let res = fut.await;
1397 :
1398 8 : scopeguard::ScopeGuard::into_inner(sg);
1399 8 :
1400 8 : match res {
1401 0 : Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
1402 8 : Err(_closed) => {
1403 8 : // expected
1404 8 : }
1405 8 : }
1406 8 :
1407 8 : self.stop();
1408 8 : }
1409 :
1410 : /// Set the deleted_at field in the remote index file.
1411 : ///
1412 : /// This fails if the upload queue has not been `stop()`ed.
1413 : ///
1414 : /// The caller is responsible for calling `stop()` AND for waiting
1415 : /// for any ongoing upload tasks to finish after `stop()` has succeeded.
1416 : /// Check method [`RemoteTimelineClient::stop`] for details.
1417 0 : #[instrument(skip_all)]
1418 : pub(crate) async fn persist_index_part_with_deleted_flag(
1419 : self: &Arc<Self>,
1420 : ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
1421 : let index_part_with_deleted_at = {
1422 : let mut locked = self.upload_queue.lock().unwrap();
1423 :
1424 : // We must be in stopped state because otherwise
1425 : // we can have inprogress index part upload that can overwrite the file
1426 : // with missing is_deleted flag that we going to set below
1427 : let stopped = locked.stopped_mut()?;
1428 :
1429 : match stopped.deleted_at {
1430 : SetDeletedFlagProgress::NotRunning => (), // proceed
1431 : SetDeletedFlagProgress::InProgress(at) => {
1432 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
1433 : }
1434 : SetDeletedFlagProgress::Successful(at) => {
1435 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
1436 : }
1437 : };
1438 : let deleted_at = Utc::now().naive_utc();
1439 : stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
1440 :
1441 : let mut index_part = stopped.upload_queue_for_deletion.dirty.clone();
1442 : index_part.deleted_at = Some(deleted_at);
1443 : index_part
1444 : };
1445 :
1446 0 : let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
1447 0 : let mut locked = self_clone.upload_queue.lock().unwrap();
1448 0 : let stopped = locked
1449 0 : .stopped_mut()
1450 0 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1451 0 : stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
1452 0 : });
1453 :
1454 : pausable_failpoint!("persist_deleted_index_part");
1455 :
1456 : backoff::retry(
1457 0 : || {
1458 0 : upload::upload_index_part(
1459 0 : &self.storage_impl,
1460 0 : &self.tenant_shard_id,
1461 0 : &self.timeline_id,
1462 0 : self.generation,
1463 0 : &index_part_with_deleted_at,
1464 0 : &self.cancel,
1465 0 : )
1466 0 : },
1467 0 : |_e| false,
1468 : 1,
1469 : // have just a couple of attempts
1470 : // when executed as part of timeline deletion this happens in context of api call
1471 : // when executed as part of tenant deletion this happens in the background
1472 : 2,
1473 : "persist_index_part_with_deleted_flag",
1474 : &self.cancel,
1475 : )
1476 : .await
1477 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1478 0 : .and_then(|x| x)?;
1479 :
1480 : // all good, disarm the guard and mark as success
1481 : ScopeGuard::into_inner(undo_deleted_at);
1482 : {
1483 : let mut locked = self.upload_queue.lock().unwrap();
1484 :
1485 : let stopped = locked
1486 : .stopped_mut()
1487 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1488 : stopped.deleted_at = SetDeletedFlagProgress::Successful(
1489 : index_part_with_deleted_at
1490 : .deleted_at
1491 : .expect("we set it above"),
1492 : );
1493 : }
1494 :
1495 : Ok(())
1496 : }
1497 :
1498 0 : pub(crate) fn is_deleting(&self) -> bool {
1499 0 : let mut locked = self.upload_queue.lock().unwrap();
1500 0 : locked.stopped_mut().is_ok()
1501 0 : }
1502 :
1503 0 : pub(crate) async fn preserve_initdb_archive(
1504 0 : self: &Arc<Self>,
1505 0 : tenant_id: &TenantId,
1506 0 : timeline_id: &TimelineId,
1507 0 : cancel: &CancellationToken,
1508 0 : ) -> anyhow::Result<()> {
1509 0 : backoff::retry(
1510 0 : || async {
1511 0 : upload::preserve_initdb_archive(&self.storage_impl, tenant_id, timeline_id, cancel)
1512 0 : .await
1513 0 : },
1514 0 : TimeoutOrCancel::caused_by_cancel,
1515 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
1516 0 : FAILED_REMOTE_OP_RETRIES,
1517 0 : "preserve_initdb_tar_zst",
1518 0 : &cancel.clone(),
1519 0 : )
1520 0 : .await
1521 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1522 0 : .and_then(|x| x)
1523 0 : .context("backing up initdb archive")?;
1524 0 : Ok(())
1525 0 : }
1526 :
1527 : /// Uploads the given layer **without** adding it to be part of a future `index_part.json` upload.
1528 : ///
1529 : /// This is not normally needed.
1530 0 : pub(crate) async fn upload_layer_file(
1531 0 : self: &Arc<Self>,
1532 0 : uploaded: &ResidentLayer,
1533 0 : cancel: &CancellationToken,
1534 0 : ) -> anyhow::Result<()> {
1535 0 : let remote_path = remote_layer_path(
1536 0 : &self.tenant_shard_id.tenant_id,
1537 0 : &self.timeline_id,
1538 0 : uploaded.metadata().shard,
1539 0 : &uploaded.layer_desc().layer_name(),
1540 0 : uploaded.metadata().generation,
1541 0 : );
1542 0 :
1543 0 : backoff::retry(
1544 0 : || async {
1545 0 : upload::upload_timeline_layer(
1546 0 : &self.storage_impl,
1547 0 : uploaded.local_path(),
1548 0 : &remote_path,
1549 0 : uploaded.metadata().file_size,
1550 0 : cancel,
1551 0 : )
1552 0 : .await
1553 0 : },
1554 0 : TimeoutOrCancel::caused_by_cancel,
1555 0 : FAILED_UPLOAD_WARN_THRESHOLD,
1556 0 : FAILED_REMOTE_OP_RETRIES,
1557 0 : "upload a layer without adding it to latest files",
1558 0 : cancel,
1559 0 : )
1560 0 : .await
1561 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1562 0 : .and_then(|x| x)
1563 0 : .context("upload a layer without adding it to latest files")
1564 0 : }
1565 :
1566 : /// Copies the `adopted` remote existing layer to the remote path of `adopted_as`. The layer is
1567 : /// not added to be part of a future `index_part.json` upload.
1568 0 : pub(crate) async fn copy_timeline_layer(
1569 0 : self: &Arc<Self>,
1570 0 : adopted: &Layer,
1571 0 : adopted_as: &Layer,
1572 0 : cancel: &CancellationToken,
1573 0 : ) -> anyhow::Result<()> {
1574 0 : let source_remote_path = remote_layer_path(
1575 0 : &self.tenant_shard_id.tenant_id,
1576 0 : &adopted
1577 0 : .get_timeline_id()
1578 0 : .expect("Source timeline should be alive"),
1579 0 : adopted.metadata().shard,
1580 0 : &adopted.layer_desc().layer_name(),
1581 0 : adopted.metadata().generation,
1582 0 : );
1583 0 :
1584 0 : let target_remote_path = remote_layer_path(
1585 0 : &self.tenant_shard_id.tenant_id,
1586 0 : &self.timeline_id,
1587 0 : adopted_as.metadata().shard,
1588 0 : &adopted_as.layer_desc().layer_name(),
1589 0 : adopted_as.metadata().generation,
1590 0 : );
1591 0 :
1592 0 : backoff::retry(
1593 0 : || async {
1594 0 : upload::copy_timeline_layer(
1595 0 : &self.storage_impl,
1596 0 : &source_remote_path,
1597 0 : &target_remote_path,
1598 0 : cancel,
1599 0 : )
1600 0 : .await
1601 0 : },
1602 0 : TimeoutOrCancel::caused_by_cancel,
1603 0 : FAILED_UPLOAD_WARN_THRESHOLD,
1604 0 : FAILED_REMOTE_OP_RETRIES,
1605 0 : "copy timeline layer",
1606 0 : cancel,
1607 0 : )
1608 0 : .await
1609 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1610 0 : .and_then(|x| x)
1611 0 : .context("remote copy timeline layer")
1612 0 : }
1613 :
1614 0 : async fn flush_deletion_queue(&self) -> Result<(), DeletionQueueError> {
1615 0 : match tokio::time::timeout(
1616 0 : DELETION_QUEUE_FLUSH_TIMEOUT,
1617 0 : self.deletion_queue_client.flush_immediate(),
1618 0 : )
1619 0 : .await
1620 : {
1621 0 : Ok(result) => result,
1622 0 : Err(_timeout) => {
1623 0 : // Flushing remote deletions is not mandatory: we flush here to make the system easier to test, and
1624 0 : // to ensure that _usually_ objects are really gone after a DELETE is acked. However, in case of deletion
1625 0 : // queue issues (https://github.com/neondatabase/neon/issues/6440), we don't want to wait indefinitely here.
1626 0 : tracing::warn!(
1627 0 : "Timed out waiting for deletion queue flush, acking deletion anyway"
1628 : );
1629 0 : Ok(())
1630 : }
1631 : }
1632 0 : }
1633 :
1634 : /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
1635 : /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
1636 : /// deletes leaked files if any and proceeds with deletion of index file at the end.
1637 0 : pub(crate) async fn delete_all(self: &Arc<Self>) -> Result<(), DeleteTimelineError> {
1638 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1639 :
1640 0 : let layers: Vec<RemotePath> = {
1641 0 : let mut locked = self.upload_queue.lock().unwrap();
1642 0 : let stopped = locked.stopped_mut().map_err(DeleteTimelineError::Other)?;
1643 :
1644 0 : if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
1645 0 : return Err(DeleteTimelineError::Other(anyhow::anyhow!(
1646 0 : "deleted_at is not set"
1647 0 : )));
1648 0 : }
1649 0 :
1650 0 : debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
1651 :
1652 0 : stopped
1653 0 : .upload_queue_for_deletion
1654 0 : .dirty
1655 0 : .layer_metadata
1656 0 : .drain()
1657 0 : .filter(|(_file_name, meta)| {
1658 0 : // Filter out layers that belonged to an ancestor shard. Since we are deleting the whole timeline from
1659 0 : // all shards anyway, we _could_ delete these, but
1660 0 : // - it creates a potential race if other shards are still
1661 0 : // using the layers while this shard deletes them.
1662 0 : // - it means that if we rolled back the shard split, the ancestor shards would be in a state where
1663 0 : // these timelines are present but corrupt (their index exists but some layers don't)
1664 0 : //
1665 0 : // These layers will eventually be cleaned up by the scrubber when it does physical GC.
1666 0 : meta.shard.shard_number == self.tenant_shard_id.shard_number
1667 0 : && meta.shard.shard_count == self.tenant_shard_id.shard_count
1668 0 : })
1669 0 : .map(|(file_name, meta)| {
1670 0 : remote_layer_path(
1671 0 : &self.tenant_shard_id.tenant_id,
1672 0 : &self.timeline_id,
1673 0 : meta.shard,
1674 0 : &file_name,
1675 0 : meta.generation,
1676 0 : )
1677 0 : })
1678 0 : .collect()
1679 0 : };
1680 0 :
1681 0 : let layer_deletion_count = layers.len();
1682 0 : self.deletion_queue_client
1683 0 : .push_immediate(layers)
1684 0 : .await
1685 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1686 :
1687 : // Delete the initdb.tar.zst, which is not always present, but deletion attempts of
1688 : // inexistant objects are not considered errors.
1689 0 : let initdb_path =
1690 0 : remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id);
1691 0 : self.deletion_queue_client
1692 0 : .push_immediate(vec![initdb_path])
1693 0 : .await
1694 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1695 :
1696 : // Do not delete index part yet, it is needed for possible retry. If we remove it first
1697 : // and retry will arrive to different pageserver there wont be any traces of it on remote storage
1698 0 : let timeline_storage_path = remote_timeline_path(&self.tenant_shard_id, &self.timeline_id);
1699 0 :
1700 0 : // Execute all pending deletions, so that when we proceed to do a listing below, we aren't
1701 0 : // taking the burden of listing all the layers that we already know we should delete.
1702 0 : self.flush_deletion_queue()
1703 0 : .await
1704 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1705 :
1706 0 : let cancel = shutdown_token();
1707 :
1708 0 : let remaining = download_retry(
1709 0 : || async {
1710 0 : self.storage_impl
1711 0 : .list(
1712 0 : Some(&timeline_storage_path),
1713 0 : ListingMode::NoDelimiter,
1714 0 : None,
1715 0 : &cancel,
1716 0 : )
1717 0 : .await
1718 0 : },
1719 0 : "list remaining files",
1720 0 : &cancel,
1721 0 : )
1722 0 : .await
1723 0 : .context("list files remaining files")?
1724 : .keys;
1725 :
1726 : // We will delete the current index_part object last, since it acts as a deletion
1727 : // marker via its deleted_at attribute
1728 0 : let latest_index = remaining
1729 0 : .iter()
1730 0 : .filter(|o| {
1731 0 : o.key
1732 0 : .object_name()
1733 0 : .map(|n| n.starts_with(IndexPart::FILE_NAME))
1734 0 : .unwrap_or(false)
1735 0 : })
1736 0 : .filter_map(|o| parse_remote_index_path(o.key.clone()).map(|gen| (o.key.clone(), gen)))
1737 0 : .max_by_key(|i| i.1)
1738 0 : .map(|i| i.0.clone())
1739 0 : .unwrap_or(
1740 0 : // No generation-suffixed indices, assume we are dealing with
1741 0 : // a legacy index.
1742 0 : remote_index_path(&self.tenant_shard_id, &self.timeline_id, Generation::none()),
1743 0 : );
1744 0 :
1745 0 : let remaining_layers: Vec<RemotePath> = remaining
1746 0 : .into_iter()
1747 0 : .filter_map(|o| {
1748 0 : if o.key == latest_index || o.key.object_name() == Some(INITDB_PRESERVED_PATH) {
1749 0 : None
1750 : } else {
1751 0 : Some(o.key)
1752 : }
1753 0 : })
1754 0 : .inspect(|path| {
1755 0 : if let Some(name) = path.object_name() {
1756 0 : info!(%name, "deleting a file not referenced from index_part.json");
1757 : } else {
1758 0 : warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
1759 : }
1760 0 : })
1761 0 : .collect();
1762 0 :
1763 0 : let not_referenced_count = remaining_layers.len();
1764 0 : if !remaining_layers.is_empty() {
1765 0 : self.deletion_queue_client
1766 0 : .push_immediate(remaining_layers)
1767 0 : .await
1768 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1769 0 : }
1770 :
1771 0 : fail::fail_point!("timeline-delete-before-index-delete", |_| {
1772 0 : Err(DeleteTimelineError::Other(anyhow::anyhow!(
1773 0 : "failpoint: timeline-delete-before-index-delete"
1774 0 : )))?
1775 0 : });
1776 :
1777 0 : debug!("enqueuing index part deletion");
1778 0 : self.deletion_queue_client
1779 0 : .push_immediate([latest_index].to_vec())
1780 0 : .await
1781 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1782 :
1783 : // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
1784 : // for a flush to a persistent deletion list so that we may be sure deletion will occur.
1785 0 : self.flush_deletion_queue()
1786 0 : .await
1787 0 : .map_err(|_| DeleteTimelineError::Cancelled)?;
1788 :
1789 0 : fail::fail_point!("timeline-delete-after-index-delete", |_| {
1790 0 : Err(DeleteTimelineError::Other(anyhow::anyhow!(
1791 0 : "failpoint: timeline-delete-after-index-delete"
1792 0 : )))?
1793 0 : });
1794 :
1795 0 : info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
1796 :
1797 0 : Ok(())
1798 0 : }
1799 :
1800 : ///
1801 : /// Pick next tasks from the queue, and start as many of them as possible without violating
1802 : /// the ordering constraints.
1803 : ///
1804 : /// The caller needs to already hold the `upload_queue` lock.
1805 6493 : fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
1806 9936 : while let Some(next_op) = upload_queue.queued_operations.front() {
1807 : // Can we run this task now?
1808 5860 : let can_run_now = match next_op {
1809 : UploadOp::UploadLayer(..) => {
1810 : // Can always be scheduled.
1811 1544 : true
1812 : }
1813 : UploadOp::UploadMetadata { .. } => {
1814 : // These can only be performed after all the preceding operations
1815 : // have finished.
1816 3594 : upload_queue.inprogress_tasks.is_empty()
1817 : }
1818 : UploadOp::Delete(..) => {
1819 : // Wait for preceding uploads to finish. Concurrent deletions are OK, though.
1820 358 : upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
1821 : }
1822 :
1823 : UploadOp::Barrier(_) | UploadOp::Shutdown => {
1824 364 : upload_queue.inprogress_tasks.is_empty()
1825 : }
1826 : };
1827 :
1828 : // If we cannot launch this task, don't look any further.
1829 : //
1830 : // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
1831 : // them now, but we don't try to do that currently. For example, if the frontmost task
1832 : // is an index-file upload that cannot proceed until preceding uploads have finished, we
1833 : // could still start layer uploads that were scheduled later.
1834 5860 : if !can_run_now {
1835 2409 : break;
1836 3451 : }
1837 3451 :
1838 3451 : if let UploadOp::Shutdown = next_op {
1839 : // leave the op in the queue but do not start more tasks; it will be dropped when
1840 : // the stop is called.
1841 8 : upload_queue.shutdown_ready.close();
1842 8 : break;
1843 3443 : }
1844 3443 :
1845 3443 : // We can launch this task. Remove it from the queue first.
1846 3443 : let mut next_op = upload_queue.queued_operations.pop_front().unwrap();
1847 3443 :
1848 3443 : debug!("starting op: {}", next_op);
1849 :
1850 : // Update the counters and prepare
1851 3443 : match &mut next_op {
1852 1544 : UploadOp::UploadLayer(layer, meta, mode) => {
1853 1544 : if upload_queue
1854 1544 : .recently_deleted
1855 1544 : .remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
1856 0 : {
1857 0 : *mode = Some(OpType::FlushDeletion);
1858 0 : } else {
1859 1544 : *mode = Some(OpType::MayReorder)
1860 : }
1861 1544 : upload_queue.num_inprogress_layer_uploads += 1;
1862 : }
1863 1423 : UploadOp::UploadMetadata { .. } => {
1864 1423 : upload_queue.num_inprogress_metadata_uploads += 1;
1865 1423 : }
1866 280 : UploadOp::Delete(Delete { layers }) => {
1867 560 : for (name, meta) in layers {
1868 280 : upload_queue
1869 280 : .recently_deleted
1870 280 : .insert((name.clone(), meta.generation));
1871 280 : }
1872 280 : upload_queue.num_inprogress_deletions += 1;
1873 : }
1874 196 : UploadOp::Barrier(sender) => {
1875 196 : sender.send_replace(());
1876 196 : continue;
1877 : }
1878 0 : UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
1879 : };
1880 :
1881 : // Assign unique ID to this task
1882 3247 : upload_queue.task_counter += 1;
1883 3247 : let upload_task_id = upload_queue.task_counter;
1884 3247 :
1885 3247 : // Add it to the in-progress map
1886 3247 : let task = Arc::new(UploadTask {
1887 3247 : task_id: upload_task_id,
1888 3247 : op: next_op,
1889 3247 : retries: AtomicU32::new(0),
1890 3247 : });
1891 3247 : upload_queue
1892 3247 : .inprogress_tasks
1893 3247 : .insert(task.task_id, Arc::clone(&task));
1894 3247 :
1895 3247 : // Spawn task to perform the task
1896 3247 : let self_rc = Arc::clone(self);
1897 3247 : let tenant_shard_id = self.tenant_shard_id;
1898 3247 : let timeline_id = self.timeline_id;
1899 3247 : task_mgr::spawn(
1900 3247 : &self.runtime,
1901 3247 : TaskKind::RemoteUploadTask,
1902 3247 : self.tenant_shard_id,
1903 3247 : Some(self.timeline_id),
1904 3247 : "remote upload",
1905 3143 : async move {
1906 3143 : self_rc.perform_upload_task(task).await;
1907 3036 : Ok(())
1908 3036 : }
1909 3247 : .instrument(info_span!(parent: None, "remote_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, %upload_task_id)),
1910 : );
1911 :
1912 : // Loop back to process next task
1913 : }
1914 6493 : }
1915 :
1916 : ///
1917 : /// Perform an upload task.
1918 : ///
1919 : /// The task is in the `inprogress_tasks` list. This function will try to
1920 : /// execute it, retrying forever. On successful completion, the task is
1921 : /// removed it from the `inprogress_tasks` list, and any next task(s) in the
1922 : /// queue that were waiting by the completion are launched.
1923 : ///
1924 : /// The task can be shut down, however. That leads to stopping the whole
1925 : /// queue.
1926 : ///
1927 3143 : async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
1928 3143 : let cancel = shutdown_token();
1929 : // Loop to retry until it completes.
1930 : loop {
1931 : // If we're requested to shut down, close up shop and exit.
1932 : //
1933 : // Note: We only check for the shutdown requests between retries, so
1934 : // if a shutdown request arrives while we're busy uploading, in the
1935 : // upload::upload:*() call below, we will wait not exit until it has
1936 : // finished. We probably could cancel the upload by simply dropping
1937 : // the Future, but we're not 100% sure if the remote storage library
1938 : // is cancellation safe, so we don't dare to do that. Hopefully, the
1939 : // upload finishes or times out soon enough.
1940 3143 : if cancel.is_cancelled() {
1941 0 : info!("upload task cancelled by shutdown request");
1942 0 : self.stop();
1943 0 : return;
1944 3143 : }
1945 :
1946 3143 : let upload_result: anyhow::Result<()> = match &task.op {
1947 1450 : UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => {
1948 1450 : if let Some(OpType::FlushDeletion) = mode {
1949 0 : if self.config.read().unwrap().block_deletions {
1950 : // Of course, this is not efficient... but usually the queue should be empty.
1951 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
1952 0 : let mut detected = false;
1953 0 : if let Ok(queue) = queue_locked.initialized_mut() {
1954 0 : for list in queue.blocked_deletions.iter_mut() {
1955 0 : list.layers.retain(|(name, meta)| {
1956 0 : if name == &layer.layer_desc().layer_name()
1957 0 : && meta.generation == layer_metadata.generation
1958 : {
1959 0 : detected = true;
1960 0 : // remove the layer from deletion queue
1961 0 : false
1962 : } else {
1963 : // keep the layer
1964 0 : true
1965 : }
1966 0 : });
1967 0 : }
1968 0 : }
1969 0 : if detected {
1970 0 : info!(
1971 0 : "cancelled blocked deletion of layer {} at gen {:?}",
1972 0 : layer.layer_desc().layer_name(),
1973 : layer_metadata.generation
1974 : );
1975 0 : }
1976 : } else {
1977 : // TODO: we did not guarantee that upload task starts after deletion task, so there could be possibly race conditions
1978 : // that we still get the layer deleted. But this only happens if someone creates a layer immediately after it's deleted,
1979 : // which is not possible in the current system.
1980 0 : info!(
1981 0 : "waiting for deletion queue flush to complete before uploading layer {} at gen {:?}",
1982 0 : layer.layer_desc().layer_name(),
1983 : layer_metadata.generation
1984 : );
1985 : {
1986 : // We are going to flush, we can clean up the recently deleted list.
1987 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
1988 0 : if let Ok(queue) = queue_locked.initialized_mut() {
1989 0 : queue.recently_deleted.clear();
1990 0 : }
1991 : }
1992 0 : if let Err(e) = self.deletion_queue_client.flush_execute().await {
1993 0 : warn!(
1994 0 : "failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ",
1995 0 : layer.layer_desc().layer_name(),
1996 : layer_metadata.generation
1997 : );
1998 : } else {
1999 0 : info!(
2000 0 : "done flushing deletion queue before uploading layer {} at gen {:?}",
2001 0 : layer.layer_desc().layer_name(),
2002 : layer_metadata.generation
2003 : );
2004 : }
2005 : }
2006 1450 : }
2007 1450 : let local_path = layer.local_path();
2008 1450 :
2009 1450 : // We should only be uploading layers created by this `Tenant`'s lifetime, so
2010 1450 : // the metadata in the upload should always match our current generation.
2011 1450 : assert_eq!(layer_metadata.generation, self.generation);
2012 :
2013 1450 : let remote_path = remote_layer_path(
2014 1450 : &self.tenant_shard_id.tenant_id,
2015 1450 : &self.timeline_id,
2016 1450 : layer_metadata.shard,
2017 1450 : &layer.layer_desc().layer_name(),
2018 1450 : layer_metadata.generation,
2019 1450 : );
2020 1450 :
2021 1450 : upload::upload_timeline_layer(
2022 1450 : &self.storage_impl,
2023 1450 : local_path,
2024 1450 : &remote_path,
2025 1450 : layer_metadata.file_size,
2026 1450 : &self.cancel,
2027 1450 : )
2028 1450 : .measure_remote_op(
2029 1450 : RemoteOpFileKind::Layer,
2030 1450 : RemoteOpKind::Upload,
2031 1450 : Arc::clone(&self.metrics),
2032 1450 : )
2033 1450 : .await
2034 : }
2035 1413 : UploadOp::UploadMetadata { ref uploaded } => {
2036 1413 : let res = upload::upload_index_part(
2037 1413 : &self.storage_impl,
2038 1413 : &self.tenant_shard_id,
2039 1413 : &self.timeline_id,
2040 1413 : self.generation,
2041 1413 : uploaded,
2042 1413 : &self.cancel,
2043 1413 : )
2044 1413 : .measure_remote_op(
2045 1413 : RemoteOpFileKind::Index,
2046 1413 : RemoteOpKind::Upload,
2047 1413 : Arc::clone(&self.metrics),
2048 1413 : )
2049 1413 : .await;
2050 1404 : if res.is_ok() {
2051 1404 : self.update_remote_physical_size_gauge(Some(uploaded));
2052 1404 : let mention_having_future_layers = if cfg!(feature = "testing") {
2053 1404 : uploaded
2054 1404 : .layer_metadata
2055 1404 : .keys()
2056 17308 : .any(|x| x.is_in_future(uploaded.metadata.disk_consistent_lsn()))
2057 : } else {
2058 0 : false
2059 : };
2060 1404 : if mention_having_future_layers {
2061 : // find rationale near crate::tenant::timeline::init::cleanup_future_layer
2062 22 : tracing::info!(
2063 0 : disk_consistent_lsn = %uploaded.metadata.disk_consistent_lsn(),
2064 0 : "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup"
2065 : );
2066 1382 : }
2067 0 : }
2068 1404 : res
2069 : }
2070 280 : UploadOp::Delete(delete) => {
2071 280 : if self.config.read().unwrap().block_deletions {
2072 0 : let mut queue_locked = self.upload_queue.lock().unwrap();
2073 0 : if let Ok(queue) = queue_locked.initialized_mut() {
2074 0 : queue.blocked_deletions.push(delete.clone());
2075 0 : }
2076 0 : Ok(())
2077 : } else {
2078 280 : pausable_failpoint!("before-delete-layer-pausable");
2079 280 : self.deletion_queue_client
2080 280 : .push_layers(
2081 280 : self.tenant_shard_id,
2082 280 : self.timeline_id,
2083 280 : self.generation,
2084 280 : delete.layers.clone(),
2085 280 : )
2086 280 : .await
2087 280 : .map_err(|e| anyhow::anyhow!(e))
2088 : }
2089 : }
2090 0 : unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
2091 : // unreachable. Barrier operations are handled synchronously in
2092 : // launch_queued_tasks
2093 0 : warn!("unexpected {unexpected:?} operation in perform_upload_task");
2094 0 : break;
2095 : }
2096 : };
2097 :
2098 0 : match upload_result {
2099 : Ok(()) => {
2100 3036 : break;
2101 : }
2102 0 : Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => {
2103 0 : // loop around to do the proper stopping
2104 0 : continue;
2105 : }
2106 0 : Err(e) => {
2107 0 : let retries = task.retries.fetch_add(1, Ordering::SeqCst);
2108 0 :
2109 0 : // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
2110 0 : // or other external reasons. Such issues are relatively regular, so log them
2111 0 : // at info level at first, and only WARN if the operation fails repeatedly.
2112 0 : //
2113 0 : // (See similar logic for downloads in `download::download_retry`)
2114 0 : if retries < FAILED_UPLOAD_WARN_THRESHOLD {
2115 0 : info!(
2116 0 : "failed to perform remote task {}, will retry (attempt {}): {:#}",
2117 0 : task.op, retries, e
2118 : );
2119 : } else {
2120 0 : warn!(
2121 0 : "failed to perform remote task {}, will retry (attempt {}): {:?}",
2122 0 : task.op, retries, e
2123 : );
2124 : }
2125 :
2126 : // sleep until it's time to retry, or we're cancelled
2127 0 : exponential_backoff(
2128 0 : retries,
2129 0 : DEFAULT_BASE_BACKOFF_SECONDS,
2130 0 : DEFAULT_MAX_BACKOFF_SECONDS,
2131 0 : &cancel,
2132 0 : )
2133 0 : .await;
2134 : }
2135 : }
2136 : }
2137 :
2138 3036 : let retries = task.retries.load(Ordering::SeqCst);
2139 3036 : if retries > 0 {
2140 0 : info!(
2141 0 : "remote task {} completed successfully after {} retries",
2142 0 : task.op, retries
2143 : );
2144 : } else {
2145 3036 : debug!("remote task {} completed successfully", task.op);
2146 : }
2147 :
2148 : // The task has completed successfully. Remove it from the in-progress list.
2149 3036 : let lsn_update = {
2150 3036 : let mut upload_queue_guard = self.upload_queue.lock().unwrap();
2151 3036 : let upload_queue = match upload_queue_guard.deref_mut() {
2152 0 : UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
2153 0 : UploadQueue::Stopped(_stopped) => {
2154 0 : None
2155 : },
2156 3036 : UploadQueue::Initialized(qi) => { Some(qi) }
2157 : };
2158 :
2159 3036 : let upload_queue = match upload_queue {
2160 3036 : Some(upload_queue) => upload_queue,
2161 : None => {
2162 0 : info!("another concurrent task already stopped the queue");
2163 0 : return;
2164 : }
2165 : };
2166 :
2167 3036 : upload_queue.inprogress_tasks.remove(&task.task_id);
2168 :
2169 3036 : let lsn_update = match task.op {
2170 : UploadOp::UploadLayer(_, _, _) => {
2171 1352 : upload_queue.num_inprogress_layer_uploads -= 1;
2172 1352 : None
2173 : }
2174 1404 : UploadOp::UploadMetadata { ref uploaded } => {
2175 1404 : upload_queue.num_inprogress_metadata_uploads -= 1;
2176 1404 :
2177 1404 : // the task id is reused as a monotonicity check for storing the "clean"
2178 1404 : // IndexPart.
2179 1404 : let last_updater = upload_queue.clean.1;
2180 1404 : let is_later = last_updater.is_some_and(|task_id| task_id < task.task_id);
2181 1404 : let monotone = is_later || last_updater.is_none();
2182 :
2183 1404 : assert!(monotone, "no two index uploads should be completing at the same time, prev={last_updater:?}, task.task_id={}", task.task_id);
2184 :
2185 : // not taking ownership is wasteful
2186 1404 : upload_queue.clean.0.clone_from(uploaded);
2187 1404 : upload_queue.clean.1 = Some(task.task_id);
2188 1404 :
2189 1404 : let lsn = upload_queue.clean.0.metadata.disk_consistent_lsn();
2190 1404 : self.metrics
2191 1404 : .projected_remote_consistent_lsn_gauge
2192 1404 : .set(lsn.0);
2193 1404 :
2194 1404 : if self.generation.is_none() {
2195 : // Legacy mode: skip validating generation
2196 0 : upload_queue.visible_remote_consistent_lsn.store(lsn);
2197 0 : None
2198 1404 : } else if self
2199 1404 : .config
2200 1404 : .read()
2201 1404 : .unwrap()
2202 1404 : .process_remote_consistent_lsn_updates
2203 : {
2204 1404 : Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
2205 : } else {
2206 : // Our config disables remote_consistent_lsn updates: drop it.
2207 0 : None
2208 : }
2209 : }
2210 : UploadOp::Delete(_) => {
2211 280 : upload_queue.num_inprogress_deletions -= 1;
2212 280 : None
2213 : }
2214 0 : UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
2215 : };
2216 :
2217 : // Launch any queued tasks that were unblocked by this one.
2218 3036 : self.launch_queued_tasks(upload_queue);
2219 3036 : lsn_update
2220 : };
2221 :
2222 3036 : if let Some((lsn, slot)) = lsn_update {
2223 : // Updates to the remote_consistent_lsn we advertise to pageservers
2224 : // are all routed through the DeletionQueue, to enforce important
2225 : // data safety guarantees (see docs/rfcs/025-generation-numbers.md)
2226 1404 : self.deletion_queue_client
2227 1404 : .update_remote_consistent_lsn(
2228 1404 : self.tenant_shard_id,
2229 1404 : self.timeline_id,
2230 1404 : self.generation,
2231 1404 : lsn,
2232 1404 : slot,
2233 1404 : )
2234 1404 : .await;
2235 1632 : }
2236 :
2237 3036 : self.metric_end(&task.op);
2238 3036 : }
2239 :
2240 6573 : fn metric_impl(
2241 6573 : &self,
2242 6573 : op: &UploadOp,
2243 6573 : ) -> Option<(
2244 6573 : RemoteOpFileKind,
2245 6573 : RemoteOpKind,
2246 6573 : RemoteTimelineClientMetricsCallTrackSize,
2247 6573 : )> {
2248 : use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
2249 6573 : let res = match op {
2250 2908 : UploadOp::UploadLayer(_, m, _) => (
2251 2908 : RemoteOpFileKind::Layer,
2252 2908 : RemoteOpKind::Upload,
2253 2908 : RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
2254 2908 : ),
2255 2870 : UploadOp::UploadMetadata { .. } => (
2256 2870 : RemoteOpFileKind::Index,
2257 2870 : RemoteOpKind::Upload,
2258 2870 : DontTrackSize {
2259 2870 : reason: "metadata uploads are tiny",
2260 2870 : },
2261 2870 : ),
2262 787 : UploadOp::Delete(_delete) => (
2263 787 : RemoteOpFileKind::Layer,
2264 787 : RemoteOpKind::Delete,
2265 787 : DontTrackSize {
2266 787 : reason: "should we track deletes? positive or negative sign?",
2267 787 : },
2268 787 : ),
2269 : UploadOp::Barrier(..) | UploadOp::Shutdown => {
2270 : // we do not account these
2271 8 : return None;
2272 : }
2273 : };
2274 6565 : Some(res)
2275 6573 : }
2276 :
2277 3529 : fn metric_begin(&self, op: &UploadOp) {
2278 3529 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
2279 3529 : Some(x) => x,
2280 0 : None => return,
2281 : };
2282 3529 : let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
2283 3529 : guard.will_decrement_manually(); // in metric_end(), see right below
2284 3529 : }
2285 :
2286 3044 : fn metric_end(&self, op: &UploadOp) {
2287 3044 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
2288 3036 : Some(x) => x,
2289 8 : None => return,
2290 : };
2291 3036 : self.metrics.call_end(&file_kind, &op_kind, track_bytes);
2292 3044 : }
2293 :
2294 : /// Close the upload queue for new operations and cancel queued operations.
2295 : ///
2296 : /// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
2297 : ///
2298 : /// In-progress operations will still be running after this function returns.
2299 : /// Use `task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), Some(self.tenant_shard_id), Some(timeline_id))`
2300 : /// to wait for them to complete, after calling this function.
2301 18 : pub(crate) fn stop(&self) {
2302 18 : // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
2303 18 : // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
2304 18 : // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
2305 18 : let mut guard = self.upload_queue.lock().unwrap();
2306 18 : self.stop_impl(&mut guard);
2307 18 : }
2308 :
2309 18 : fn stop_impl(&self, guard: &mut std::sync::MutexGuard<UploadQueue>) {
2310 18 : match &mut **guard {
2311 : UploadQueue::Uninitialized => {
2312 0 : info!("UploadQueue is in state Uninitialized, nothing to do");
2313 0 : **guard = UploadQueue::Stopped(UploadQueueStopped::Uninitialized);
2314 : }
2315 : UploadQueue::Stopped(_) => {
2316 : // nothing to do
2317 8 : info!("another concurrent task already shut down the queue");
2318 : }
2319 10 : UploadQueue::Initialized(initialized) => {
2320 10 : info!("shutting down upload queue");
2321 :
2322 : // Replace the queue with the Stopped state, taking ownership of the old
2323 : // Initialized queue. We will do some checks on it, and then drop it.
2324 10 : let qi = {
2325 : // Here we preserve working version of the upload queue for possible use during deletions.
2326 : // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
2327 : // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
2328 : // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
2329 10 : let upload_queue_for_deletion = UploadQueueInitialized {
2330 10 : task_counter: 0,
2331 10 : dirty: initialized.dirty.clone(),
2332 10 : clean: initialized.clean.clone(),
2333 10 : latest_files_changes_since_metadata_upload_scheduled: 0,
2334 10 : visible_remote_consistent_lsn: initialized
2335 10 : .visible_remote_consistent_lsn
2336 10 : .clone(),
2337 10 : num_inprogress_layer_uploads: 0,
2338 10 : num_inprogress_metadata_uploads: 0,
2339 10 : num_inprogress_deletions: 0,
2340 10 : inprogress_tasks: HashMap::default(),
2341 10 : queued_operations: VecDeque::default(),
2342 10 : #[cfg(feature = "testing")]
2343 10 : dangling_files: HashMap::default(),
2344 10 : blocked_deletions: Vec::new(),
2345 10 : shutting_down: false,
2346 10 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
2347 10 : recently_deleted: HashSet::new(),
2348 10 : };
2349 10 :
2350 10 : let upload_queue = std::mem::replace(
2351 10 : &mut **guard,
2352 10 : UploadQueue::Stopped(UploadQueueStopped::Deletable(
2353 10 : UploadQueueStoppedDeletable {
2354 10 : upload_queue_for_deletion,
2355 10 : deleted_at: SetDeletedFlagProgress::NotRunning,
2356 10 : },
2357 10 : )),
2358 10 : );
2359 10 : if let UploadQueue::Initialized(qi) = upload_queue {
2360 10 : qi
2361 : } else {
2362 0 : unreachable!("we checked in the match above that it is Initialized");
2363 : }
2364 : };
2365 :
2366 : // consistency check
2367 10 : assert_eq!(
2368 10 : qi.num_inprogress_layer_uploads
2369 10 : + qi.num_inprogress_metadata_uploads
2370 10 : + qi.num_inprogress_deletions,
2371 10 : qi.inprogress_tasks.len()
2372 10 : );
2373 :
2374 : // We don't need to do anything here for in-progress tasks. They will finish
2375 : // on their own, decrement the unfinished-task counter themselves, and observe
2376 : // that the queue is Stopped.
2377 10 : drop(qi.inprogress_tasks);
2378 :
2379 : // Tear down queued ops
2380 10 : for op in qi.queued_operations.into_iter() {
2381 8 : self.metric_end(&op);
2382 8 : // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
2383 8 : // which is exactly what we want to happen.
2384 8 : drop(op);
2385 8 : }
2386 : }
2387 : }
2388 18 : }
2389 :
2390 : /// Returns an accessor which will hold the UploadQueue mutex for accessing the upload queue
2391 : /// externally to RemoteTimelineClient.
2392 0 : pub(crate) fn initialized_upload_queue(
2393 0 : &self,
2394 0 : ) -> Result<UploadQueueAccessor<'_>, NotInitialized> {
2395 0 : let mut inner = self.upload_queue.lock().unwrap();
2396 0 : inner.initialized_mut()?;
2397 0 : Ok(UploadQueueAccessor { inner })
2398 0 : }
2399 :
2400 8 : pub(crate) fn no_pending_work(&self) -> bool {
2401 8 : let inner = self.upload_queue.lock().unwrap();
2402 8 : match &*inner {
2403 : UploadQueue::Uninitialized
2404 0 : | UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => true,
2405 8 : UploadQueue::Stopped(UploadQueueStopped::Deletable(x)) => {
2406 8 : x.upload_queue_for_deletion.no_pending_work()
2407 : }
2408 0 : UploadQueue::Initialized(x) => x.no_pending_work(),
2409 : }
2410 8 : }
2411 :
2412 : /// 'foreign' in the sense that it does not belong to this tenant shard. This method
2413 : /// is used during GC for other shards to get the index of shard zero.
2414 0 : pub(crate) async fn download_foreign_index(
2415 0 : &self,
2416 0 : shard_number: ShardNumber,
2417 0 : cancel: &CancellationToken,
2418 0 : ) -> Result<(IndexPart, Generation, std::time::SystemTime), DownloadError> {
2419 0 : let foreign_shard_id = TenantShardId {
2420 0 : shard_number,
2421 0 : shard_count: self.tenant_shard_id.shard_count,
2422 0 : tenant_id: self.tenant_shard_id.tenant_id,
2423 0 : };
2424 0 : download_index_part(
2425 0 : &self.storage_impl,
2426 0 : &foreign_shard_id,
2427 0 : &self.timeline_id,
2428 0 : Generation::MAX,
2429 0 : cancel,
2430 0 : )
2431 0 : .await
2432 0 : }
2433 : }
2434 :
2435 : pub(crate) struct UploadQueueAccessor<'a> {
2436 : inner: std::sync::MutexGuard<'a, UploadQueue>,
2437 : }
2438 :
2439 : impl UploadQueueAccessor<'_> {
2440 0 : pub(crate) fn latest_uploaded_index_part(&self) -> &IndexPart {
2441 0 : match &*self.inner {
2442 0 : UploadQueue::Initialized(x) => &x.clean.0,
2443 : UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
2444 0 : unreachable!("checked before constructing")
2445 : }
2446 : }
2447 0 : }
2448 : }
2449 :
2450 0 : pub fn remote_tenant_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2451 0 : let path = format!("tenants/{tenant_shard_id}");
2452 0 : RemotePath::from_string(&path).expect("Failed to construct path")
2453 0 : }
2454 :
2455 590 : pub fn remote_tenant_manifest_path(
2456 590 : tenant_shard_id: &TenantShardId,
2457 590 : generation: Generation,
2458 590 : ) -> RemotePath {
2459 590 : let path = format!(
2460 590 : "tenants/{tenant_shard_id}/tenant-manifest{}.json",
2461 590 : generation.get_suffix()
2462 590 : );
2463 590 : RemotePath::from_string(&path).expect("Failed to construct path")
2464 590 : }
2465 :
2466 : /// Prefix to all generations' manifest objects in a tenant shard
2467 196 : pub fn remote_tenant_manifest_prefix(tenant_shard_id: &TenantShardId) -> RemotePath {
2468 196 : let path = format!("tenants/{tenant_shard_id}/tenant-manifest",);
2469 196 : RemotePath::from_string(&path).expect("Failed to construct path")
2470 196 : }
2471 :
2472 226 : pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2473 226 : let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
2474 226 : RemotePath::from_string(&path).expect("Failed to construct path")
2475 226 : }
2476 :
2477 0 : fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath {
2478 0 : let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
2479 0 : RemotePath::from_string(&path).expect("Failed to construct path")
2480 0 : }
2481 :
2482 30 : pub fn remote_timeline_path(
2483 30 : tenant_shard_id: &TenantShardId,
2484 30 : timeline_id: &TimelineId,
2485 30 : ) -> RemotePath {
2486 30 : remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
2487 30 : }
2488 :
2489 : /// Obtains the path of the given Layer in the remote
2490 : ///
2491 : /// Note that the shard component of a remote layer path is _not_ always the same
2492 : /// as in the TenantShardId of the caller: tenants may reference layers from a different
2493 : /// ShardIndex. Use the ShardIndex from the layer's metadata.
2494 1727 : pub fn remote_layer_path(
2495 1727 : tenant_id: &TenantId,
2496 1727 : timeline_id: &TimelineId,
2497 1727 : shard: ShardIndex,
2498 1727 : layer_file_name: &LayerName,
2499 1727 : generation: Generation,
2500 1727 : ) -> RemotePath {
2501 1727 : // Generation-aware key format
2502 1727 : let path = format!(
2503 1727 : "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
2504 1727 : shard.get_suffix(),
2505 1727 : layer_file_name,
2506 1727 : generation.get_suffix()
2507 1727 : );
2508 1727 :
2509 1727 : RemotePath::from_string(&path).expect("Failed to construct path")
2510 1727 : }
2511 :
2512 4 : pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
2513 4 : RemotePath::from_string(&format!(
2514 4 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
2515 4 : ))
2516 4 : .expect("Failed to construct path")
2517 4 : }
2518 :
2519 2 : pub fn remote_initdb_preserved_archive_path(
2520 2 : tenant_id: &TenantId,
2521 2 : timeline_id: &TimelineId,
2522 2 : ) -> RemotePath {
2523 2 : RemotePath::from_string(&format!(
2524 2 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PRESERVED_PATH}"
2525 2 : ))
2526 2 : .expect("Failed to construct path")
2527 2 : }
2528 :
2529 1476 : pub fn remote_index_path(
2530 1476 : tenant_shard_id: &TenantShardId,
2531 1476 : timeline_id: &TimelineId,
2532 1476 : generation: Generation,
2533 1476 : ) -> RemotePath {
2534 1476 : RemotePath::from_string(&format!(
2535 1476 : "tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
2536 1476 : IndexPart::FILE_NAME,
2537 1476 : generation.get_suffix()
2538 1476 : ))
2539 1476 : .expect("Failed to construct path")
2540 1476 : }
2541 :
2542 0 : pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2543 0 : RemotePath::from_string(&format!(
2544 0 : "tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
2545 0 : ))
2546 0 : .expect("Failed to construct path")
2547 0 : }
2548 :
2549 : /// Given the key of an index, parse out the generation part of the name
2550 18 : pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
2551 18 : let file_name = match path.get_path().file_name() {
2552 18 : Some(f) => f,
2553 : None => {
2554 : // Unexpected: we should be seeing index_part.json paths only
2555 0 : tracing::warn!("Malformed index key {}", path);
2556 0 : return None;
2557 : }
2558 : };
2559 :
2560 18 : match file_name.split_once('-') {
2561 12 : Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
2562 6 : None => None,
2563 : }
2564 18 : }
2565 :
2566 : /// Given the key of a tenant manifest, parse out the generation number
2567 0 : pub fn parse_remote_tenant_manifest_path(path: RemotePath) -> Option<Generation> {
2568 : static RE: OnceLock<Regex> = OnceLock::new();
2569 0 : let re = RE.get_or_init(|| Regex::new(r".*tenant-manifest-([0-9a-f]{8}).json").unwrap());
2570 0 : re.captures(path.get_path().as_str())
2571 0 : .and_then(|c| c.get(1))
2572 0 : .and_then(|m| Generation::parse_suffix(m.as_str()))
2573 0 : }
2574 :
2575 : #[cfg(test)]
2576 : mod tests {
2577 : use super::*;
2578 : use crate::{
2579 : context::RequestContext,
2580 : tenant::{
2581 : config::AttachmentMode,
2582 : harness::{TenantHarness, TIMELINE_ID},
2583 : storage_layer::layer::local_layer_path,
2584 : Tenant, Timeline,
2585 : },
2586 : DEFAULT_PG_VERSION,
2587 : };
2588 :
2589 : use std::collections::HashSet;
2590 :
2591 8 : pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
2592 8 : format!("contents for {name}").into()
2593 8 : }
2594 :
2595 2 : pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
2596 2 : let metadata = TimelineMetadata::new(
2597 2 : disk_consistent_lsn,
2598 2 : None,
2599 2 : None,
2600 2 : Lsn(0),
2601 2 : Lsn(0),
2602 2 : Lsn(0),
2603 2 : // Any version will do
2604 2 : // but it should be consistent with the one in the tests
2605 2 : crate::DEFAULT_PG_VERSION,
2606 2 : );
2607 2 :
2608 2 : // go through serialize + deserialize to fix the header, including checksum
2609 2 : TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
2610 2 : }
2611 :
2612 2 : fn assert_file_list(a: &HashSet<LayerName>, b: &[&str]) {
2613 6 : let mut avec: Vec<String> = a.iter().map(|x| x.to_string()).collect();
2614 2 : avec.sort();
2615 2 :
2616 2 : let mut bvec = b.to_vec();
2617 2 : bvec.sort_unstable();
2618 2 :
2619 2 : assert_eq!(avec, bvec);
2620 2 : }
2621 :
2622 4 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
2623 4 : let mut expected: Vec<String> = expected
2624 4 : .iter()
2625 16 : .map(|x| format!("{}{}", x, generation.get_suffix()))
2626 4 : .collect();
2627 4 : expected.sort();
2628 4 :
2629 4 : let mut found: Vec<String> = Vec::new();
2630 16 : for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
2631 16 : let entry_name = entry.file_name();
2632 16 : let fname = entry_name.to_str().unwrap();
2633 16 : found.push(String::from(fname));
2634 16 : }
2635 4 : found.sort();
2636 4 :
2637 4 : assert_eq!(found, expected);
2638 4 : }
2639 :
2640 : struct TestSetup {
2641 : harness: TenantHarness,
2642 : tenant: Arc<Tenant>,
2643 : timeline: Arc<Timeline>,
2644 : tenant_ctx: RequestContext,
2645 : }
2646 :
2647 : impl TestSetup {
2648 8 : async fn new(test_name: &str) -> anyhow::Result<Self> {
2649 8 : let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
2650 8 : let harness = TenantHarness::create(test_name).await?;
2651 8 : let (tenant, ctx) = harness.load().await;
2652 :
2653 8 : let timeline = tenant
2654 8 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2655 8 : .await?;
2656 :
2657 8 : Ok(Self {
2658 8 : harness,
2659 8 : tenant,
2660 8 : timeline,
2661 8 : tenant_ctx: ctx,
2662 8 : })
2663 8 : }
2664 :
2665 : /// Construct a RemoteTimelineClient in an arbitrary generation
2666 10 : fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
2667 10 : let location_conf = AttachedLocationConfig {
2668 10 : generation,
2669 10 : attach_mode: AttachmentMode::Single,
2670 10 : };
2671 10 : Arc::new(RemoteTimelineClient {
2672 10 : conf: self.harness.conf,
2673 10 : runtime: tokio::runtime::Handle::current(),
2674 10 : tenant_shard_id: self.harness.tenant_shard_id,
2675 10 : timeline_id: TIMELINE_ID,
2676 10 : generation,
2677 10 : storage_impl: self.harness.remote_storage.clone(),
2678 10 : deletion_queue_client: self.harness.deletion_queue.new_client(),
2679 10 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
2680 10 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
2681 10 : &self.harness.tenant_shard_id,
2682 10 : &TIMELINE_ID,
2683 10 : )),
2684 10 : config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
2685 10 : cancel: CancellationToken::new(),
2686 10 : })
2687 10 : }
2688 :
2689 : /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
2690 : /// and timeline_id are present.
2691 6 : fn span(&self) -> tracing::Span {
2692 6 : tracing::info_span!(
2693 : "test",
2694 : tenant_id = %self.harness.tenant_shard_id.tenant_id,
2695 0 : shard_id = %self.harness.tenant_shard_id.shard_slug(),
2696 : timeline_id = %TIMELINE_ID
2697 : )
2698 6 : }
2699 : }
2700 :
2701 : // Test scheduling
2702 : #[tokio::test]
2703 2 : async fn upload_scheduling() {
2704 2 : // Test outline:
2705 2 : //
2706 2 : // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
2707 2 : // Schedule upload of index. Check that it is queued
2708 2 : // let the layer file uploads finish. Check that the index-upload is now started
2709 2 : // let the index-upload finish.
2710 2 : //
2711 2 : // Download back the index.json. Check that the list of files is correct
2712 2 : //
2713 2 : // Schedule upload. Schedule deletion. Check that the deletion is queued
2714 2 : // let upload finish. Check that deletion is now started
2715 2 : // Schedule another deletion. Check that it's launched immediately.
2716 2 : // Schedule index upload. Check that it's queued
2717 2 :
2718 2 : let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
2719 2 : let span = test_setup.span();
2720 2 : let _guard = span.enter();
2721 2 :
2722 2 : let TestSetup {
2723 2 : harness,
2724 2 : tenant: _tenant,
2725 2 : timeline,
2726 2 : tenant_ctx: _tenant_ctx,
2727 2 : } = test_setup;
2728 2 :
2729 2 : let client = &timeline.remote_client;
2730 2 :
2731 2 : // Download back the index.json, and check that the list of files is correct
2732 2 : let initial_index_part = match client
2733 2 : .download_index_file(&CancellationToken::new())
2734 2 : .await
2735 2 : .unwrap()
2736 2 : {
2737 2 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2738 2 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2739 2 : };
2740 2 : let initial_layers = initial_index_part
2741 2 : .layer_metadata
2742 2 : .keys()
2743 2 : .map(|f| f.to_owned())
2744 2 : .collect::<HashSet<LayerName>>();
2745 2 : let initial_layer = {
2746 2 : assert!(initial_layers.len() == 1);
2747 2 : initial_layers.into_iter().next().unwrap()
2748 2 : };
2749 2 :
2750 2 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
2751 2 :
2752 2 : println!("workdir: {}", harness.conf.workdir);
2753 2 :
2754 2 : let remote_timeline_dir = harness
2755 2 : .remote_fs_dir
2756 2 : .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
2757 2 : println!("remote_timeline_dir: {remote_timeline_dir}");
2758 2 :
2759 2 : let generation = harness.generation;
2760 2 : let shard = harness.shard;
2761 2 :
2762 2 : // Create a couple of dummy files, schedule upload for them
2763 2 :
2764 2 : let layers = [
2765 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
2766 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
2767 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
2768 2 : ]
2769 2 : .into_iter()
2770 6 : .map(|(name, contents): (LayerName, Vec<u8>)| {
2771 6 :
2772 6 : let local_path = local_layer_path(
2773 6 : harness.conf,
2774 6 : &timeline.tenant_shard_id,
2775 6 : &timeline.timeline_id,
2776 6 : &name,
2777 6 : &generation,
2778 6 : );
2779 6 : std::fs::write(&local_path, &contents).unwrap();
2780 6 :
2781 6 : Layer::for_resident(
2782 6 : harness.conf,
2783 6 : &timeline,
2784 6 : local_path,
2785 6 : name,
2786 6 : LayerFileMetadata::new(contents.len() as u64, generation, shard),
2787 6 : )
2788 6 : }).collect::<Vec<_>>();
2789 2 :
2790 2 : client
2791 2 : .schedule_layer_file_upload(layers[0].clone())
2792 2 : .unwrap();
2793 2 : client
2794 2 : .schedule_layer_file_upload(layers[1].clone())
2795 2 : .unwrap();
2796 2 :
2797 2 : // Check that they are started immediately, not queued
2798 2 : //
2799 2 : // this works because we running within block_on, so any futures are now queued up until
2800 2 : // our next await point.
2801 2 : {
2802 2 : let mut guard = client.upload_queue.lock().unwrap();
2803 2 : let upload_queue = guard.initialized_mut().unwrap();
2804 2 : assert!(upload_queue.queued_operations.is_empty());
2805 2 : assert!(upload_queue.inprogress_tasks.len() == 2);
2806 2 : assert!(upload_queue.num_inprogress_layer_uploads == 2);
2807 2 :
2808 2 : // also check that `latest_file_changes` was updated
2809 2 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
2810 2 : }
2811 2 :
2812 2 : // Schedule upload of index. Check that it is queued
2813 2 : let metadata = dummy_metadata(Lsn(0x20));
2814 2 : client
2815 2 : .schedule_index_upload_for_full_metadata_update(&metadata)
2816 2 : .unwrap();
2817 2 : {
2818 2 : let mut guard = client.upload_queue.lock().unwrap();
2819 2 : let upload_queue = guard.initialized_mut().unwrap();
2820 2 : assert!(upload_queue.queued_operations.len() == 1);
2821 2 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
2822 2 : }
2823 2 :
2824 2 : // Wait for the uploads to finish
2825 2 : client.wait_completion().await.unwrap();
2826 2 : {
2827 2 : let mut guard = client.upload_queue.lock().unwrap();
2828 2 : let upload_queue = guard.initialized_mut().unwrap();
2829 2 :
2830 2 : assert!(upload_queue.queued_operations.is_empty());
2831 2 : assert!(upload_queue.inprogress_tasks.is_empty());
2832 2 : }
2833 2 :
2834 2 : // Download back the index.json, and check that the list of files is correct
2835 2 : let index_part = match client
2836 2 : .download_index_file(&CancellationToken::new())
2837 2 : .await
2838 2 : .unwrap()
2839 2 : {
2840 2 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2841 2 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2842 2 : };
2843 2 :
2844 2 : assert_file_list(
2845 2 : &index_part
2846 2 : .layer_metadata
2847 2 : .keys()
2848 6 : .map(|f| f.to_owned())
2849 2 : .collect(),
2850 2 : &[
2851 2 : &initial_layer.to_string(),
2852 2 : &layers[0].layer_desc().layer_name().to_string(),
2853 2 : &layers[1].layer_desc().layer_name().to_string(),
2854 2 : ],
2855 2 : );
2856 2 : assert_eq!(index_part.metadata, metadata);
2857 2 :
2858 2 : // Schedule upload and then a deletion. Check that the deletion is queued
2859 2 : client
2860 2 : .schedule_layer_file_upload(layers[2].clone())
2861 2 : .unwrap();
2862 2 :
2863 2 : // this is no longer consistent with how deletion works with Layer::drop, but in this test
2864 2 : // keep using schedule_layer_file_deletion because we don't have a way to wait for the
2865 2 : // spawn_blocking started by the drop.
2866 2 : client
2867 2 : .schedule_layer_file_deletion(&[layers[0].layer_desc().layer_name()])
2868 2 : .unwrap();
2869 2 : {
2870 2 : let mut guard = client.upload_queue.lock().unwrap();
2871 2 : let upload_queue = guard.initialized_mut().unwrap();
2872 2 :
2873 2 : // Deletion schedules upload of the index file, and the file deletion itself
2874 2 : assert_eq!(upload_queue.queued_operations.len(), 2);
2875 2 : assert_eq!(upload_queue.inprogress_tasks.len(), 1);
2876 2 : assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
2877 2 : assert_eq!(upload_queue.num_inprogress_deletions, 0);
2878 2 : assert_eq!(
2879 2 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
2880 2 : 0
2881 2 : );
2882 2 : }
2883 2 : assert_remote_files(
2884 2 : &[
2885 2 : &initial_layer.to_string(),
2886 2 : &layers[0].layer_desc().layer_name().to_string(),
2887 2 : &layers[1].layer_desc().layer_name().to_string(),
2888 2 : "index_part.json",
2889 2 : ],
2890 2 : &remote_timeline_dir,
2891 2 : generation,
2892 2 : );
2893 2 :
2894 2 : // Finish them
2895 2 : client.wait_completion().await.unwrap();
2896 2 : harness.deletion_queue.pump().await;
2897 2 :
2898 2 : assert_remote_files(
2899 2 : &[
2900 2 : &initial_layer.to_string(),
2901 2 : &layers[1].layer_desc().layer_name().to_string(),
2902 2 : &layers[2].layer_desc().layer_name().to_string(),
2903 2 : "index_part.json",
2904 2 : ],
2905 2 : &remote_timeline_dir,
2906 2 : generation,
2907 2 : );
2908 2 : }
2909 :
2910 : #[tokio::test]
2911 2 : async fn bytes_unfinished_gauge_for_layer_file_uploads() {
2912 2 : // Setup
2913 2 :
2914 2 : let TestSetup {
2915 2 : harness,
2916 2 : tenant: _tenant,
2917 2 : timeline,
2918 2 : ..
2919 2 : } = TestSetup::new("metrics").await.unwrap();
2920 2 : let client = &timeline.remote_client;
2921 2 :
2922 2 : let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
2923 2 : let local_path = local_layer_path(
2924 2 : harness.conf,
2925 2 : &timeline.tenant_shard_id,
2926 2 : &timeline.timeline_id,
2927 2 : &layer_file_name_1,
2928 2 : &harness.generation,
2929 2 : );
2930 2 : let content_1 = dummy_contents("foo");
2931 2 : std::fs::write(&local_path, &content_1).unwrap();
2932 2 :
2933 2 : let layer_file_1 = Layer::for_resident(
2934 2 : harness.conf,
2935 2 : &timeline,
2936 2 : local_path,
2937 2 : layer_file_name_1.clone(),
2938 2 : LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
2939 2 : );
2940 2 :
2941 2 : #[derive(Debug, PartialEq, Clone, Copy)]
2942 2 : struct BytesStartedFinished {
2943 2 : started: Option<usize>,
2944 2 : finished: Option<usize>,
2945 2 : }
2946 2 : impl std::ops::Add for BytesStartedFinished {
2947 2 : type Output = Self;
2948 4 : fn add(self, rhs: Self) -> Self::Output {
2949 4 : Self {
2950 4 : started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
2951 4 : finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
2952 4 : }
2953 4 : }
2954 2 : }
2955 6 : let get_bytes_started_stopped = || {
2956 6 : let started = client
2957 6 : .metrics
2958 6 : .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2959 6 : .map(|v| v.try_into().unwrap());
2960 6 : let stopped = client
2961 6 : .metrics
2962 6 : .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2963 6 : .map(|v| v.try_into().unwrap());
2964 6 : BytesStartedFinished {
2965 6 : started,
2966 6 : finished: stopped,
2967 6 : }
2968 6 : };
2969 2 :
2970 2 : // Test
2971 2 : tracing::info!("now doing actual test");
2972 2 :
2973 2 : let actual_a = get_bytes_started_stopped();
2974 2 :
2975 2 : client
2976 2 : .schedule_layer_file_upload(layer_file_1.clone())
2977 2 : .unwrap();
2978 2 :
2979 2 : let actual_b = get_bytes_started_stopped();
2980 2 :
2981 2 : client.wait_completion().await.unwrap();
2982 2 :
2983 2 : let actual_c = get_bytes_started_stopped();
2984 2 :
2985 2 : // Validate
2986 2 :
2987 2 : let expected_b = actual_a
2988 2 : + BytesStartedFinished {
2989 2 : started: Some(content_1.len()),
2990 2 : // assert that the _finished metric is created eagerly so that subtractions work on first sample
2991 2 : finished: Some(0),
2992 2 : };
2993 2 : assert_eq!(actual_b, expected_b);
2994 2 :
2995 2 : let expected_c = actual_a
2996 2 : + BytesStartedFinished {
2997 2 : started: Some(content_1.len()),
2998 2 : finished: Some(content_1.len()),
2999 2 : };
3000 2 : assert_eq!(actual_c, expected_c);
3001 2 : }
3002 :
3003 12 : async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
3004 12 : // An empty IndexPart, just sufficient to ensure deserialization will succeed
3005 12 : let example_index_part = IndexPart::example();
3006 12 :
3007 12 : let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
3008 12 :
3009 12 : let index_path = test_state.harness.remote_fs_dir.join(
3010 12 : remote_index_path(
3011 12 : &test_state.harness.tenant_shard_id,
3012 12 : &TIMELINE_ID,
3013 12 : generation,
3014 12 : )
3015 12 : .get_path(),
3016 12 : );
3017 12 :
3018 12 : std::fs::create_dir_all(index_path.parent().unwrap())
3019 12 : .expect("creating test dir should work");
3020 12 :
3021 12 : eprintln!("Writing {index_path}");
3022 12 : std::fs::write(&index_path, index_part_bytes).unwrap();
3023 12 : example_index_part
3024 12 : }
3025 :
3026 : /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
3027 : /// index, the IndexPart returned is equal to `expected`
3028 10 : async fn assert_got_index_part(
3029 10 : test_state: &TestSetup,
3030 10 : get_generation: Generation,
3031 10 : expected: &IndexPart,
3032 10 : ) {
3033 10 : let client = test_state.build_client(get_generation);
3034 :
3035 10 : let download_r = client
3036 10 : .download_index_file(&CancellationToken::new())
3037 10 : .await
3038 10 : .expect("download should always succeed");
3039 10 : assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
3040 10 : match download_r {
3041 10 : MaybeDeletedIndexPart::IndexPart(index_part) => {
3042 10 : assert_eq!(&index_part, expected);
3043 : }
3044 0 : MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
3045 : }
3046 10 : }
3047 :
3048 : #[tokio::test]
3049 2 : async fn index_part_download_simple() -> anyhow::Result<()> {
3050 2 : let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
3051 2 : let span = test_state.span();
3052 2 : let _guard = span.enter();
3053 2 :
3054 2 : // Simple case: we are in generation N, load the index from generation N - 1
3055 2 : let generation_n = 5;
3056 2 : let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
3057 2 :
3058 2 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
3059 2 :
3060 2 : Ok(())
3061 2 : }
3062 :
3063 : #[tokio::test]
3064 2 : async fn index_part_download_ordering() -> anyhow::Result<()> {
3065 2 : let test_state = TestSetup::new("index_part_download_ordering")
3066 2 : .await
3067 2 : .unwrap();
3068 2 :
3069 2 : let span = test_state.span();
3070 2 : let _guard = span.enter();
3071 2 :
3072 2 : // A generation-less IndexPart exists in the bucket, we should find it
3073 2 : let generation_n = 5;
3074 2 : let injected_none = inject_index_part(&test_state, Generation::none()).await;
3075 2 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
3076 2 :
3077 2 : // If a more recent-than-none generation exists, we should prefer to load that
3078 2 : let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
3079 2 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
3080 2 :
3081 2 : // If a more-recent-than-me generation exists, we should ignore it.
3082 2 : let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
3083 2 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
3084 2 :
3085 2 : // If a directly previous generation exists, _and_ an index exists in my own
3086 2 : // generation, I should prefer my own generation.
3087 2 : let _injected_prev =
3088 2 : inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
3089 2 : let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
3090 2 : assert_got_index_part(
3091 2 : &test_state,
3092 2 : Generation::new(generation_n),
3093 2 : &injected_current,
3094 2 : )
3095 2 : .await;
3096 2 :
3097 2 : Ok(())
3098 2 : }
3099 : }
|