Line data Source code
1 : //! This module manages synchronizing local FS with remote storage.
2 : //!
3 : //! # Overview
4 : //!
5 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
6 : //! It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
7 : //! when it's safe to do so.
8 : //!
9 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
10 : //!
11 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
12 : //!
13 : //! # APIs & How To Use Them
14 : //!
15 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
16 : //! unless the pageserver is configured without remote storage.
17 : //!
18 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
19 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
20 : //! timeline.
21 : //! However, the client does not become ready for use until we've initialized its upload queue:
22 : //!
23 : //! - For timelines that already have some state on the remote storage, we use
24 : //! [`RemoteTimelineClient::init_upload_queue`] .
25 : //! - For newly created timelines, we use
26 : //! [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
27 : //!
28 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
29 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
30 : //!
31 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
32 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
33 : //!
34 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`] when we've created a new layer file.
35 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
36 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
37 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
38 : //!
39 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
40 : //!
41 : //! There are also APIs for downloading files.
42 : //! These are not part of the aforementioned queuing and will not be discussed
43 : //! further here, except in the section covering tenant attach.
44 : //!
45 : //! # Remote Storage Structure & [`IndexPart`] Index File
46 : //!
47 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
48 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
49 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
50 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
51 : //! files for a given timeline.
52 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
53 : //!
54 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
55 : //!
56 : //! # Consistency
57 : //!
58 : //! To have a consistent remote structure, it's important that uploads and
59 : //! deletions are performed in the right order. For example, the index file
60 : //! contains a list of layer files, so it must not be uploaded until all the
61 : //! layer files that are in its list have been successfully uploaded.
62 : //!
63 : //! The contract between client and its user is that the user is responsible of
64 : //! scheduling operations in an order that keeps the remote consistent as
65 : //! described above.
66 : //! From the user's perspective, the operations are executed sequentially.
67 : //! Internally, the client knows which operations can be performed in parallel,
68 : //! and which operations act like a "barrier" that require preceding operations
69 : //! to finish. The calling code just needs to call the schedule-functions in the
70 : //! correct order, and the client will parallelize the operations in a way that
71 : //! is safe.
72 : //!
73 : //! The caller should be careful with deletion, though. They should not delete
74 : //! local files that have been scheduled for upload but not yet finished uploading.
75 : //! Otherwise the upload will fail. To wait for an upload to finish, use
76 : //! the 'wait_completion' function (more on that later.)
77 : //!
78 : //! All of this relies on the following invariants:
79 : //!
80 : //! - We rely on read-after write consistency in the remote storage.
81 : //! - Layer files are immutable
82 : //!
83 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
84 : //! storage. Different tenants can be attached to different pageservers, but if the
85 : //! same tenant is attached to two pageservers at the same time, they will overwrite
86 : //! each other's index file updates, and confusion will ensue. There's no interlock or
87 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
88 : //! that that doesn't happen.
89 : //!
90 : //! ## Implementation Note
91 : //!
92 : //! The *actual* remote state lags behind the *desired* remote state while
93 : //! there are in-flight operations.
94 : //! We keep track of the desired remote state in [`UploadQueueInitialized::dirty`].
95 : //! It is initialized based on the [`IndexPart`] that was passed during init
96 : //! and updated with every `schedule_*` function call.
97 : //! All this is necessary necessary to compute the future [`IndexPart`]s
98 : //! when scheduling an operation while other operations that also affect the
99 : //! remote [`IndexPart`] are in flight.
100 : //!
101 : //! # Retries & Error Handling
102 : //!
103 : //! The client retries operations indefinitely, using exponential back-off.
104 : //! There is no way to force a retry, i.e., interrupt the back-off.
105 : //! This could be built easily.
106 : //!
107 : //! # Cancellation
108 : //!
109 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
110 : //! the client's tenant and timeline.
111 : //! Dropping the client will drop queued operations but not executing operations.
112 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
113 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
114 : //!
115 : //! # Completion
116 : //!
117 : //! Once an operation has completed, we update [`UploadQueueInitialized::clean`] immediately,
118 : //! and submit a request through the DeletionQueue to update
119 : //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
120 : //! validated that our generation is not stale. It is this visible value
121 : //! that is advertized to safekeepers as a signal that that they can
122 : //! delete the WAL up to that LSN.
123 : //!
124 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
125 : //! for all pending operations to complete. It does not prevent more
126 : //! operations from getting scheduled.
127 : //!
128 : //! # Crash Consistency
129 : //!
130 : //! We do not persist the upload queue state.
131 : //! If we drop the client, or crash, all unfinished operations are lost.
132 : //!
133 : //! To recover, the following steps need to be taken:
134 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
135 : //! consistent remote state, assuming the user scheduled the operations in
136 : //! the correct order.
137 : //! - Initiate upload queue with that [`IndexPart`].
138 : //! - Reschedule all lost operations by comparing the local filesystem state
139 : //! and remote state as per [`IndexPart`]. This is done in
140 : //! [`Tenant::timeline_init_and_sync`].
141 : //!
142 : //! Note that if we crash during file deletion between the index update
143 : //! that removes the file from the list of files, and deleting the remote file,
144 : //! the file is leaked in the remote storage. Similarly, if a new file is created
145 : //! and uploaded, but the pageserver dies permanently before updating the
146 : //! remote index file, the new file is leaked in remote storage. We accept and
147 : //! tolerate that for now.
148 : //! Note further that we cannot easily fix this by scheduling deletes for every
149 : //! file that is present only on the remote, because we cannot distinguish the
150 : //! following two cases:
151 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
152 : //! but crashed before it finished remotely.
153 : //! - (2) We never had the file locally because we haven't on-demand downloaded
154 : //! it yet.
155 : //!
156 : //! # Downloads
157 : //!
158 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
159 : //! downloading files from the remote storage. Downloads are performed immediately
160 : //! against the `RemoteStorage`, independently of the upload queue.
161 : //!
162 : //! When we attach a tenant, we perform the following steps:
163 : //! - create `Tenant` object in `TenantState::Attaching` state
164 : //! - List timelines that are present in remote storage, and for each:
165 : //! - download their remote [`IndexPart`]s
166 : //! - create `Timeline` struct and a `RemoteTimelineClient`
167 : //! - initialize the client's upload queue with its `IndexPart`
168 : //! - schedule uploads for layers that are only present locally.
169 : //! - After the above is done for each timeline, open the tenant for business by
170 : //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
171 : //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
172 : //!
173 : //! # Operating Without Remote Storage
174 : //!
175 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
176 : //! not created and the uploads are skipped.
177 : //!
178 : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
179 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
180 :
181 : pub(crate) mod download;
182 : pub mod index;
183 : pub mod manifest;
184 : pub(crate) mod upload;
185 :
186 : use anyhow::Context;
187 : use camino::Utf8Path;
188 : use chrono::{NaiveDateTime, Utc};
189 :
190 : pub(crate) use download::download_initdb_tar_zst;
191 : use pageserver_api::models::TimelineArchivalState;
192 : use pageserver_api::shard::{ShardIndex, TenantShardId};
193 : use scopeguard::ScopeGuard;
194 : use tokio_util::sync::CancellationToken;
195 : use utils::backoff::{
196 : self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
197 : };
198 : use utils::pausable_failpoint;
199 :
200 : use std::collections::{HashMap, VecDeque};
201 : use std::sync::atomic::{AtomicU32, Ordering};
202 : use std::sync::{Arc, Mutex};
203 : use std::time::Duration;
204 :
205 : use remote_storage::{
206 : DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
207 : };
208 : use std::ops::DerefMut;
209 : use tracing::{debug, error, info, instrument, warn};
210 : use tracing::{info_span, Instrument};
211 : use utils::lsn::Lsn;
212 :
213 : use crate::context::RequestContext;
214 : use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
215 : use crate::metrics::{
216 : MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
217 : RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
218 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
219 : };
220 : use crate::task_mgr::shutdown_token;
221 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
222 : use crate::tenant::remote_timeline_client::download::download_retry;
223 : use crate::tenant::storage_layer::AsLayerDesc;
224 : use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable};
225 : use crate::tenant::TIMELINES_SEGMENT_NAME;
226 : use crate::{
227 : config::PageServerConf,
228 : task_mgr,
229 : task_mgr::TaskKind,
230 : task_mgr::BACKGROUND_RUNTIME,
231 : tenant::metadata::TimelineMetadata,
232 : tenant::upload_queue::{
233 : UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
234 : },
235 : TENANT_HEATMAP_BASENAME,
236 : };
237 :
238 : use utils::id::{TenantId, TimelineId};
239 :
240 : use self::index::IndexPart;
241 :
242 : use super::metadata::MetadataUpdate;
243 : use super::storage_layer::{Layer, LayerName, ResidentLayer};
244 : use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
245 : use super::Generation;
246 :
247 : pub(crate) use download::{
248 : do_download_tenant_manifest, download_index_part, is_temp_download_file,
249 : list_remote_tenant_shards, list_remote_timelines,
250 : };
251 : pub(crate) use index::LayerFileMetadata;
252 : pub(crate) use upload::{upload_initdb_dir, upload_tenant_manifest};
253 :
254 : // Occasional network issues and such can cause remote operations to fail, and
255 : // that's expected. If a download fails, we log it at info-level, and retry.
256 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
257 : // level instead, as repeated failures can mean a more serious problem. If it
258 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
259 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
260 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
261 :
262 : // Similarly log failed uploads and deletions at WARN level, after this many
263 : // retries. Uploads and deletions are retried forever, though.
264 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
265 :
266 : pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
267 :
268 : pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst";
269 :
270 : /// Default buffer size when interfacing with [`tokio::fs::File`].
271 : pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
272 :
273 : /// Doing non-essential flushes of deletion queue is subject to this timeout, after
274 : /// which we warn and skip.
275 : const DELETION_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_secs(10);
276 :
277 : /// Hardcode a generation for the tenant manifest for now so that we don't
278 : /// need to deal with generation-less manifests in the future.
279 : ///
280 : /// TODO: add proper generation support to all the places that use this.
281 : pub(crate) const TENANT_MANIFEST_GENERATION: Generation = Generation::new(1);
282 :
283 : pub enum MaybeDeletedIndexPart {
284 : IndexPart(IndexPart),
285 : Deleted(IndexPart),
286 : }
287 :
288 0 : #[derive(Debug, thiserror::Error)]
289 : pub enum PersistIndexPartWithDeletedFlagError {
290 : #[error("another task is already setting the deleted_flag, started at {0:?}")]
291 : AlreadyInProgress(NaiveDateTime),
292 : #[error("the deleted_flag was already set, value is {0:?}")]
293 : AlreadyDeleted(NaiveDateTime),
294 : #[error(transparent)]
295 : Other(#[from] anyhow::Error),
296 : }
297 :
298 0 : #[derive(Debug, thiserror::Error)]
299 : pub enum WaitCompletionError {
300 : #[error(transparent)]
301 : NotInitialized(NotInitialized),
302 : #[error("wait_completion aborted because upload queue was stopped")]
303 : UploadQueueShutDownOrStopped,
304 : }
305 :
306 0 : #[derive(Debug, thiserror::Error)]
307 : #[error("Upload queue either in unexpected state or hasn't downloaded manifest yet")]
308 : pub struct UploadQueueNotReadyError;
309 :
310 : /// A client for accessing a timeline's data in remote storage.
311 : ///
312 : /// This takes care of managing the number of connections, and balancing them
313 : /// across tenants. This also handles retries of failed uploads.
314 : ///
315 : /// Upload and delete requests are ordered so that before a deletion is
316 : /// performed, we wait for all preceding uploads to finish. This ensures sure
317 : /// that if you perform a compaction operation that reshuffles data in layer
318 : /// files, we don't have a transient state where the old files have already been
319 : /// deleted, but new files have not yet been uploaded.
320 : ///
321 : /// Similarly, this enforces an order between index-file uploads, and layer
322 : /// uploads. Before an index-file upload is performed, all preceding layer
323 : /// uploads must be finished.
324 : ///
325 : /// This also maintains a list of remote files, and automatically includes that
326 : /// in the index part file, whenever timeline metadata is uploaded.
327 : ///
328 : /// Downloads are not queued, they are performed immediately.
329 : pub struct RemoteTimelineClient {
330 : conf: &'static PageServerConf,
331 :
332 : runtime: tokio::runtime::Handle,
333 :
334 : tenant_shard_id: TenantShardId,
335 : timeline_id: TimelineId,
336 : generation: Generation,
337 :
338 : upload_queue: Mutex<UploadQueue>,
339 :
340 : pub(crate) metrics: Arc<RemoteTimelineClientMetrics>,
341 :
342 : storage_impl: GenericRemoteStorage,
343 :
344 : deletion_queue_client: DeletionQueueClient,
345 :
346 : cancel: CancellationToken,
347 : }
348 :
349 : impl RemoteTimelineClient {
350 : ///
351 : /// Create a remote storage client for given timeline
352 : ///
353 : /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
354 : /// by calling init_upload_queue.
355 : ///
356 410 : pub fn new(
357 410 : remote_storage: GenericRemoteStorage,
358 410 : deletion_queue_client: DeletionQueueClient,
359 410 : conf: &'static PageServerConf,
360 410 : tenant_shard_id: TenantShardId,
361 410 : timeline_id: TimelineId,
362 410 : generation: Generation,
363 410 : ) -> RemoteTimelineClient {
364 410 : RemoteTimelineClient {
365 410 : conf,
366 410 : runtime: if cfg!(test) {
367 : // remote_timeline_client.rs tests rely on current-thread runtime
368 410 : tokio::runtime::Handle::current()
369 : } else {
370 0 : BACKGROUND_RUNTIME.handle().clone()
371 : },
372 410 : tenant_shard_id,
373 410 : timeline_id,
374 410 : generation,
375 410 : storage_impl: remote_storage,
376 410 : deletion_queue_client,
377 410 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
378 410 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
379 410 : &tenant_shard_id,
380 410 : &timeline_id,
381 410 : )),
382 410 : cancel: CancellationToken::new(),
383 410 : }
384 410 : }
385 :
386 : /// Initialize the upload queue for a remote storage that already received
387 : /// an index file upload, i.e., it's not empty.
388 : /// The given `index_part` must be the one on the remote.
389 6 : pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
390 6 : let mut upload_queue = self.upload_queue.lock().unwrap();
391 6 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
392 6 : self.update_remote_physical_size_gauge(Some(index_part));
393 6 : info!(
394 0 : "initialized upload queue from remote index with {} layer files",
395 0 : index_part.layer_metadata.len()
396 : );
397 6 : Ok(())
398 6 : }
399 :
400 : /// Initialize the upload queue for the case where the remote storage is empty,
401 : /// i.e., it doesn't have an `IndexPart`.
402 404 : pub fn init_upload_queue_for_empty_remote(
403 404 : &self,
404 404 : local_metadata: &TimelineMetadata,
405 404 : ) -> anyhow::Result<()> {
406 404 : let mut upload_queue = self.upload_queue.lock().unwrap();
407 404 : upload_queue.initialize_empty_remote(local_metadata)?;
408 404 : self.update_remote_physical_size_gauge(None);
409 404 : info!("initialized upload queue as empty");
410 404 : Ok(())
411 404 : }
412 :
413 : /// Initialize the queue in stopped state. Used in startup path
414 : /// to continue deletion operation interrupted by pageserver crash or restart.
415 0 : pub fn init_upload_queue_stopped_to_continue_deletion(
416 0 : &self,
417 0 : index_part: &IndexPart,
418 0 : ) -> anyhow::Result<()> {
419 : // FIXME: consider newtype for DeletedIndexPart.
420 0 : let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
421 0 : "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
422 0 : ))?;
423 :
424 0 : let mut upload_queue = self.upload_queue.lock().unwrap();
425 0 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
426 0 : self.update_remote_physical_size_gauge(Some(index_part));
427 0 : self.stop_impl(&mut upload_queue);
428 0 :
429 0 : upload_queue
430 0 : .stopped_mut()
431 0 : .expect("stopped above")
432 0 : .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
433 0 :
434 0 : Ok(())
435 0 : }
436 :
437 : /// Returns `None` if nothing is yet uplodaded, `Some(disk_consistent_lsn)` otherwise.
438 0 : pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
439 0 : match &mut *self.upload_queue.lock().unwrap() {
440 0 : UploadQueue::Uninitialized => None,
441 0 : UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
442 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
443 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => q
444 0 : .upload_queue_for_deletion
445 0 : .get_last_remote_consistent_lsn_projected(),
446 : }
447 0 : }
448 :
449 0 : pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
450 0 : match &mut *self.upload_queue.lock().unwrap() {
451 0 : UploadQueue::Uninitialized => None,
452 0 : UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
453 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
454 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => Some(
455 0 : q.upload_queue_for_deletion
456 0 : .get_last_remote_consistent_lsn_visible(),
457 0 : ),
458 : }
459 0 : }
460 :
461 : /// Returns true if this timeline was previously detached at this Lsn and the remote timeline
462 : /// client is currently initialized.
463 0 : pub(crate) fn is_previous_ancestor_lsn(&self, lsn: Lsn) -> bool {
464 0 : self.upload_queue
465 0 : .lock()
466 0 : .unwrap()
467 0 : .initialized_mut()
468 0 : .map(|uq| uq.clean.0.lineage.is_previous_ancestor_lsn(lsn))
469 0 : .unwrap_or(false)
470 0 : }
471 :
472 : /// Returns whether the timeline is archived.
473 : /// Return None if the remote index_part hasn't been downloaded yet.
474 0 : pub(crate) fn is_archived(&self) -> Option<bool> {
475 0 : self.upload_queue
476 0 : .lock()
477 0 : .unwrap()
478 0 : .initialized_mut()
479 0 : .map(|q| q.clean.0.archived_at.is_some())
480 0 : .ok()
481 0 : }
482 :
483 : /// Returns `Ok(Some(timestamp))` if the timeline has been archived, `Ok(None)` if the timeline hasn't been archived.
484 : ///
485 : /// Return Err(_) if the remote index_part hasn't been downloaded yet, or the timeline hasn't been stopped yet.
486 0 : pub(crate) fn archived_at_stopped_queue(
487 0 : &self,
488 0 : ) -> Result<Option<NaiveDateTime>, UploadQueueNotReadyError> {
489 0 : self.upload_queue
490 0 : .lock()
491 0 : .unwrap()
492 0 : .stopped_mut()
493 0 : .map(|q| q.upload_queue_for_deletion.clean.0.archived_at)
494 0 : .map_err(|_| UploadQueueNotReadyError)
495 0 : }
496 :
497 1805 : fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
498 1805 : let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
499 1401 : current_remote_index_part
500 1401 : .layer_metadata
501 1401 : .values()
502 17506 : .map(|ilmd| ilmd.file_size)
503 1401 : .sum()
504 : } else {
505 404 : 0
506 : };
507 1805 : self.metrics.remote_physical_size_gauge.set(size);
508 1805 : }
509 :
510 2 : pub fn get_remote_physical_size(&self) -> u64 {
511 2 : self.metrics.remote_physical_size_gauge.get()
512 2 : }
513 :
514 : //
515 : // Download operations.
516 : //
517 : // These don't use the per-timeline queue. They do use the global semaphore in
518 : // S3Bucket, to limit the total number of concurrent operations, though.
519 : //
520 :
521 : /// Download index file
522 20 : pub async fn download_index_file(
523 20 : &self,
524 20 : cancel: &CancellationToken,
525 20 : ) -> Result<MaybeDeletedIndexPart, DownloadError> {
526 20 : let _unfinished_gauge_guard = self.metrics.call_begin(
527 20 : &RemoteOpFileKind::Index,
528 20 : &RemoteOpKind::Download,
529 20 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
530 20 : reason: "no need for a downloads gauge",
531 20 : },
532 20 : );
533 :
534 20 : let (index_part, index_generation, index_last_modified) = download::download_index_part(
535 20 : &self.storage_impl,
536 20 : &self.tenant_shard_id,
537 20 : &self.timeline_id,
538 20 : self.generation,
539 20 : cancel,
540 20 : )
541 20 : .measure_remote_op(
542 20 : RemoteOpFileKind::Index,
543 20 : RemoteOpKind::Download,
544 20 : Arc::clone(&self.metrics),
545 20 : )
546 76 : .await?;
547 :
548 : // Defense in depth: monotonicity of generation numbers is an important correctness guarantee, so when we see a very
549 : // old index, we do extra checks in case this is the result of backward time-travel of the generation number (e.g.
550 : // in case of a bug in the service that issues generation numbers). Indices are allowed to be old, but we expect that
551 : // when we load an old index we are loading the _latest_ index: if we are asked to load an old index and there is
552 : // also a newer index available, that is surprising.
553 : const INDEX_AGE_CHECKS_THRESHOLD: Duration = Duration::from_secs(14 * 24 * 3600);
554 20 : let index_age = index_last_modified.elapsed().unwrap_or_else(|e| {
555 0 : if e.duration() > Duration::from_secs(5) {
556 : // We only warn if the S3 clock and our local clock are >5s out: because this is a low resolution
557 : // timestamp, it is common to be out by at least 1 second.
558 0 : tracing::warn!("Index has modification time in the future: {e}");
559 0 : }
560 0 : Duration::ZERO
561 20 : });
562 20 : if index_age > INDEX_AGE_CHECKS_THRESHOLD {
563 0 : tracing::info!(
564 : ?index_generation,
565 0 : age = index_age.as_secs_f64(),
566 0 : "Loaded an old index, checking for other indices..."
567 : );
568 :
569 : // Find the highest-generation index
570 0 : let (_latest_index_part, latest_index_generation, latest_index_mtime) =
571 0 : download::download_index_part(
572 0 : &self.storage_impl,
573 0 : &self.tenant_shard_id,
574 0 : &self.timeline_id,
575 0 : Generation::MAX,
576 0 : cancel,
577 0 : )
578 0 : .await?;
579 :
580 0 : if latest_index_generation > index_generation {
581 : // Unexpected! Why are we loading such an old index if a more recent one exists?
582 0 : tracing::warn!(
583 : ?index_generation,
584 : ?latest_index_generation,
585 : ?latest_index_mtime,
586 0 : "Found a newer index while loading an old one"
587 : );
588 0 : }
589 20 : }
590 :
591 20 : if index_part.deleted_at.is_some() {
592 0 : Ok(MaybeDeletedIndexPart::Deleted(index_part))
593 : } else {
594 20 : Ok(MaybeDeletedIndexPart::IndexPart(index_part))
595 : }
596 20 : }
597 :
598 : /// Download a (layer) file from `path`, into local filesystem.
599 : ///
600 : /// 'layer_metadata' is the metadata from the remote index file.
601 : ///
602 : /// On success, returns the size of the downloaded file.
603 6 : pub async fn download_layer_file(
604 6 : &self,
605 6 : layer_file_name: &LayerName,
606 6 : layer_metadata: &LayerFileMetadata,
607 6 : local_path: &Utf8Path,
608 6 : cancel: &CancellationToken,
609 6 : ctx: &RequestContext,
610 6 : ) -> Result<u64, DownloadError> {
611 6 : let downloaded_size = {
612 6 : let _unfinished_gauge_guard = self.metrics.call_begin(
613 6 : &RemoteOpFileKind::Layer,
614 6 : &RemoteOpKind::Download,
615 6 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
616 6 : reason: "no need for a downloads gauge",
617 6 : },
618 6 : );
619 6 : download::download_layer_file(
620 6 : self.conf,
621 6 : &self.storage_impl,
622 6 : self.tenant_shard_id,
623 6 : self.timeline_id,
624 6 : layer_file_name,
625 6 : layer_metadata,
626 6 : local_path,
627 6 : cancel,
628 6 : ctx,
629 6 : )
630 6 : .measure_remote_op(
631 6 : RemoteOpFileKind::Layer,
632 6 : RemoteOpKind::Download,
633 6 : Arc::clone(&self.metrics),
634 6 : )
635 65 : .await?
636 : };
637 :
638 6 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
639 6 : REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
640 6 :
641 6 : Ok(downloaded_size)
642 6 : }
643 :
644 : //
645 : // Upload operations.
646 : //
647 :
648 : /// Launch an index-file upload operation in the background, with
649 : /// fully updated metadata.
650 : ///
651 : /// This should only be used to upload initial metadata to remote storage.
652 : ///
653 : /// The upload will be added to the queue immediately, but it
654 : /// won't be performed until all previously scheduled layer file
655 : /// upload operations have completed successfully. This is to
656 : /// ensure that when the index file claims that layers X, Y and Z
657 : /// exist in remote storage, they really do. To wait for the upload
658 : /// to complete, use `wait_completion`.
659 : ///
660 : /// If there were any changes to the list of files, i.e. if any
661 : /// layer file uploads were scheduled, since the last index file
662 : /// upload, those will be included too.
663 228 : pub fn schedule_index_upload_for_full_metadata_update(
664 228 : self: &Arc<Self>,
665 228 : metadata: &TimelineMetadata,
666 228 : ) -> anyhow::Result<()> {
667 228 : let mut guard = self.upload_queue.lock().unwrap();
668 228 : let upload_queue = guard.initialized_mut()?;
669 :
670 : // As documented in the struct definition, it's ok for latest_metadata to be
671 : // ahead of what's _actually_ on the remote during index upload.
672 228 : upload_queue.dirty.metadata = metadata.clone();
673 228 :
674 228 : self.schedule_index_upload(upload_queue)?;
675 :
676 228 : Ok(())
677 228 : }
678 :
679 : /// Launch an index-file upload operation in the background, with only parts of the metadata
680 : /// updated.
681 : ///
682 : /// This is the regular way of updating metadata on layer flushes or Gc.
683 : ///
684 : /// Using this lighter update mechanism allows for reparenting and detaching without changes to
685 : /// `index_part.json`, while being more clear on what values update regularly.
686 1140 : pub(crate) fn schedule_index_upload_for_metadata_update(
687 1140 : self: &Arc<Self>,
688 1140 : update: &MetadataUpdate,
689 1140 : ) -> anyhow::Result<()> {
690 1140 : let mut guard = self.upload_queue.lock().unwrap();
691 1140 : let upload_queue = guard.initialized_mut()?;
692 :
693 1140 : upload_queue.dirty.metadata.apply(update);
694 1140 :
695 1140 : self.schedule_index_upload(upload_queue)?;
696 :
697 1140 : Ok(())
698 1140 : }
699 :
700 : /// Launch an index-file upload operation in the background, with only the `archived_at` field updated.
701 : ///
702 : /// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded,
703 : /// so either if the change is already sitting in the queue, but not commited yet, or the change has not
704 : /// been in the queue yet.
705 0 : pub(crate) fn schedule_index_upload_for_timeline_archival_state(
706 0 : self: &Arc<Self>,
707 0 : state: TimelineArchivalState,
708 0 : ) -> anyhow::Result<bool> {
709 0 : let mut guard = self.upload_queue.lock().unwrap();
710 0 : let upload_queue = guard.initialized_mut()?;
711 :
712 : /// Returns Some(_) if a change is needed, and Some(true) if it's a
713 : /// change needed to set archived_at.
714 0 : fn need_change(
715 0 : archived_at: &Option<NaiveDateTime>,
716 0 : state: TimelineArchivalState,
717 0 : ) -> Option<bool> {
718 0 : match (archived_at, state) {
719 : (Some(_), TimelineArchivalState::Archived)
720 : | (None, TimelineArchivalState::Unarchived) => {
721 : // Nothing to do
722 0 : tracing::info!("intended state matches present state");
723 0 : None
724 : }
725 0 : (None, TimelineArchivalState::Archived) => Some(true),
726 0 : (Some(_), TimelineArchivalState::Unarchived) => Some(false),
727 : }
728 0 : }
729 0 : let need_upload_scheduled = need_change(&upload_queue.dirty.archived_at, state);
730 :
731 0 : if let Some(archived_at_set) = need_upload_scheduled {
732 0 : let intended_archived_at = archived_at_set.then(|| Utc::now().naive_utc());
733 0 : upload_queue.dirty.archived_at = intended_archived_at;
734 0 : self.schedule_index_upload(upload_queue)?;
735 0 : }
736 :
737 0 : let need_wait = need_change(&upload_queue.clean.0.archived_at, state).is_some();
738 0 : Ok(need_wait)
739 0 : }
740 :
741 : ///
742 : /// Launch an index-file upload operation in the background, if necessary.
743 : ///
744 : /// Use this function to schedule the update of the index file after
745 : /// scheduling file uploads or deletions. If no file uploads or deletions
746 : /// have been scheduled since the last index file upload, this does
747 : /// nothing.
748 : ///
749 : /// Like schedule_index_upload_for_metadata_update(), this merely adds
750 : /// the upload to the upload queue and returns quickly.
751 370 : pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> Result<(), NotInitialized> {
752 370 : let mut guard = self.upload_queue.lock().unwrap();
753 370 : let upload_queue = guard.initialized_mut()?;
754 :
755 370 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
756 14 : self.schedule_index_upload(upload_queue)?;
757 356 : }
758 :
759 370 : Ok(())
760 370 : }
761 :
762 : /// Launch an index-file upload operation in the background (internal function)
763 1428 : fn schedule_index_upload(
764 1428 : self: &Arc<Self>,
765 1428 : upload_queue: &mut UploadQueueInitialized,
766 1428 : ) -> Result<(), NotInitialized> {
767 1428 : let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn();
768 1428 : // fix up the duplicated field
769 1428 : upload_queue.dirty.disk_consistent_lsn = disk_consistent_lsn;
770 1428 :
771 1428 : // make sure it serializes before doing it in perform_upload_task so that it doesn't
772 1428 : // look like a retryable error
773 1428 : let void = std::io::sink();
774 1428 : serde_json::to_writer(void, &upload_queue.dirty).expect("serialize index_part.json");
775 1428 :
776 1428 : let index_part = &upload_queue.dirty;
777 1428 :
778 1428 : info!(
779 0 : "scheduling metadata upload up to consistent LSN {disk_consistent_lsn} with {} files ({} changed)",
780 0 : index_part.layer_metadata.len(),
781 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
782 : );
783 :
784 1428 : let op = UploadOp::UploadMetadata {
785 1428 : uploaded: Box::new(index_part.clone()),
786 1428 : };
787 1428 : self.metric_begin(&op);
788 1428 : upload_queue.queued_operations.push_back(op);
789 1428 : upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
790 1428 :
791 1428 : // Launch the task immediately, if possible
792 1428 : self.launch_queued_tasks(upload_queue);
793 1428 : Ok(())
794 1428 : }
795 :
796 : /// Reparent this timeline to a new parent.
797 : ///
798 : /// A retryable step of timeline ancestor detach.
799 0 : pub(crate) async fn schedule_reparenting_and_wait(
800 0 : self: &Arc<Self>,
801 0 : new_parent: &TimelineId,
802 0 : ) -> anyhow::Result<()> {
803 0 : let receiver = {
804 0 : let mut guard = self.upload_queue.lock().unwrap();
805 0 : let upload_queue = guard.initialized_mut()?;
806 :
807 0 : let Some(prev) = upload_queue.dirty.metadata.ancestor_timeline() else {
808 0 : return Err(anyhow::anyhow!(
809 0 : "cannot reparent without a current ancestor"
810 0 : ));
811 : };
812 :
813 0 : let uploaded = &upload_queue.clean.0.metadata;
814 0 :
815 0 : if uploaded.ancestor_timeline().is_none() && !uploaded.ancestor_lsn().is_valid() {
816 : // nothing to do
817 0 : None
818 : } else {
819 0 : upload_queue.dirty.metadata.reparent(new_parent);
820 0 : upload_queue.dirty.lineage.record_previous_ancestor(&prev);
821 0 :
822 0 : self.schedule_index_upload(upload_queue)?;
823 :
824 0 : Some(self.schedule_barrier0(upload_queue))
825 : }
826 : };
827 :
828 0 : if let Some(receiver) = receiver {
829 0 : Self::wait_completion0(receiver).await?;
830 0 : }
831 0 : Ok(())
832 0 : }
833 :
834 : /// Schedules uploading a new version of `index_part.json` with the given layers added,
835 : /// detaching from ancestor and waits for it to complete.
836 : ///
837 : /// This is used with `Timeline::detach_ancestor` functionality.
838 0 : pub(crate) async fn schedule_adding_existing_layers_to_index_detach_and_wait(
839 0 : self: &Arc<Self>,
840 0 : layers: &[Layer],
841 0 : adopted: (TimelineId, Lsn),
842 0 : ) -> anyhow::Result<()> {
843 0 : let barrier = {
844 0 : let mut guard = self.upload_queue.lock().unwrap();
845 0 : let upload_queue = guard.initialized_mut()?;
846 :
847 0 : if upload_queue.clean.0.lineage.detached_previous_ancestor() == Some(adopted) {
848 0 : None
849 : } else {
850 0 : upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
851 0 : upload_queue.dirty.lineage.record_detaching(&adopted);
852 :
853 0 : for layer in layers {
854 0 : let prev = upload_queue
855 0 : .dirty
856 0 : .layer_metadata
857 0 : .insert(layer.layer_desc().layer_name(), layer.metadata());
858 0 : assert!(prev.is_none(), "copied layer existed already {layer}");
859 : }
860 :
861 0 : self.schedule_index_upload(upload_queue)?;
862 :
863 0 : Some(self.schedule_barrier0(upload_queue))
864 : }
865 : };
866 :
867 0 : if let Some(barrier) = barrier {
868 0 : Self::wait_completion0(barrier).await?;
869 0 : }
870 0 : Ok(())
871 0 : }
872 :
873 : /// Adds a gc blocking reason for this timeline if one does not exist already.
874 : ///
875 : /// A retryable step of timeline detach ancestor.
876 : ///
877 : /// Returns a future which waits until the completion of the upload.
878 0 : pub(crate) fn schedule_insert_gc_block_reason(
879 0 : self: &Arc<Self>,
880 0 : reason: index::GcBlockingReason,
881 0 : ) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
882 0 : {
883 0 : let maybe_barrier = {
884 0 : let mut guard = self.upload_queue.lock().unwrap();
885 0 : let upload_queue = guard.initialized_mut()?;
886 :
887 0 : if let index::GcBlockingReason::DetachAncestor = reason {
888 0 : if upload_queue.dirty.metadata.ancestor_timeline().is_none() {
889 0 : drop(guard);
890 0 : panic!("cannot start detach ancestor if there is nothing to detach from");
891 0 : }
892 0 : }
893 :
894 0 : let wanted = |x: Option<&index::GcBlocking>| x.is_some_and(|x| x.blocked_by(reason));
895 :
896 0 : let current = upload_queue.dirty.gc_blocking.as_ref();
897 0 : let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
898 0 :
899 0 : match (current, uploaded) {
900 0 : (x, y) if wanted(x) && wanted(y) => None,
901 0 : (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
902 : // Usual case: !wanted(x) && !wanted(y)
903 : //
904 : // Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to
905 : // turn on and off some reason.
906 0 : (x, y) => {
907 0 : if !wanted(x) && wanted(y) {
908 : // this could be avoided by having external in-memory synchronization, like
909 : // timeline detach ancestor
910 0 : warn!(?reason, op="insert", "unexpected: two racing processes to enable and disable a gc blocking reason");
911 0 : }
912 :
913 : // at this point, the metadata must always show that there is a parent
914 0 : upload_queue.dirty.gc_blocking = current
915 0 : .map(|x| x.with_reason(reason))
916 0 : .or_else(|| Some(index::GcBlocking::started_now_for(reason)));
917 0 : self.schedule_index_upload(upload_queue)?;
918 0 : Some(self.schedule_barrier0(upload_queue))
919 : }
920 : }
921 : };
922 :
923 0 : Ok(async move {
924 0 : if let Some(barrier) = maybe_barrier {
925 0 : Self::wait_completion0(barrier).await?;
926 0 : }
927 0 : Ok(())
928 0 : })
929 0 : }
930 :
931 : /// Removes a gc blocking reason for this timeline if one exists.
932 : ///
933 : /// A retryable step of timeline detach ancestor.
934 : ///
935 : /// Returns a future which waits until the completion of the upload.
936 0 : pub(crate) fn schedule_remove_gc_block_reason(
937 0 : self: &Arc<Self>,
938 0 : reason: index::GcBlockingReason,
939 0 : ) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
940 0 : {
941 0 : let maybe_barrier = {
942 0 : let mut guard = self.upload_queue.lock().unwrap();
943 0 : let upload_queue = guard.initialized_mut()?;
944 :
945 0 : if let index::GcBlockingReason::DetachAncestor = reason {
946 0 : if !upload_queue.clean.0.lineage.is_detached_from_ancestor() {
947 0 : drop(guard);
948 0 : panic!("cannot complete timeline_ancestor_detach while not detached");
949 0 : }
950 0 : }
951 :
952 0 : let wanted = |x: Option<&index::GcBlocking>| {
953 0 : x.is_none() || x.is_some_and(|b| !b.blocked_by(reason))
954 0 : };
955 :
956 0 : let current = upload_queue.dirty.gc_blocking.as_ref();
957 0 : let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
958 0 :
959 0 : match (current, uploaded) {
960 0 : (x, y) if wanted(x) && wanted(y) => None,
961 0 : (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
962 0 : (x, y) => {
963 0 : if !wanted(x) && wanted(y) {
964 0 : warn!(?reason, op="remove", "unexpected: two racing processes to enable and disable a gc blocking reason (remove)");
965 0 : }
966 :
967 0 : upload_queue.dirty.gc_blocking =
968 0 : current.as_ref().and_then(|x| x.without_reason(reason));
969 0 : assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
970 : // FIXME: bogus ?
971 0 : self.schedule_index_upload(upload_queue)?;
972 0 : Some(self.schedule_barrier0(upload_queue))
973 : }
974 : }
975 : };
976 :
977 0 : Ok(async move {
978 0 : if let Some(barrier) = maybe_barrier {
979 0 : Self::wait_completion0(barrier).await?;
980 0 : }
981 0 : Ok(())
982 0 : })
983 0 : }
984 :
985 : /// Launch an upload operation in the background; the file is added to be included in next
986 : /// `index_part.json` upload.
987 1184 : pub(crate) fn schedule_layer_file_upload(
988 1184 : self: &Arc<Self>,
989 1184 : layer: ResidentLayer,
990 1184 : ) -> Result<(), NotInitialized> {
991 1184 : let mut guard = self.upload_queue.lock().unwrap();
992 1184 : let upload_queue = guard.initialized_mut()?;
993 :
994 1184 : self.schedule_layer_file_upload0(upload_queue, layer);
995 1184 : self.launch_queued_tasks(upload_queue);
996 1184 : Ok(())
997 1184 : }
998 :
999 1514 : fn schedule_layer_file_upload0(
1000 1514 : self: &Arc<Self>,
1001 1514 : upload_queue: &mut UploadQueueInitialized,
1002 1514 : layer: ResidentLayer,
1003 1514 : ) {
1004 1514 : let metadata = layer.metadata();
1005 1514 :
1006 1514 : upload_queue
1007 1514 : .dirty
1008 1514 : .layer_metadata
1009 1514 : .insert(layer.layer_desc().layer_name(), metadata.clone());
1010 1514 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
1011 1514 :
1012 1514 : info!(
1013 : gen=?metadata.generation,
1014 : shard=?metadata.shard,
1015 0 : "scheduled layer file upload {layer}",
1016 : );
1017 :
1018 1514 : let op = UploadOp::UploadLayer(layer, metadata);
1019 1514 : self.metric_begin(&op);
1020 1514 : upload_queue.queued_operations.push_back(op);
1021 1514 : }
1022 :
1023 : /// Launch a delete operation in the background.
1024 : ///
1025 : /// The operation does not modify local filesystem state.
1026 : ///
1027 : /// Note: This schedules an index file upload before the deletions. The
1028 : /// deletion won't actually be performed, until all previously scheduled
1029 : /// upload operations, and the index file upload, have completed
1030 : /// successfully.
1031 8 : pub fn schedule_layer_file_deletion(
1032 8 : self: &Arc<Self>,
1033 8 : names: &[LayerName],
1034 8 : ) -> anyhow::Result<()> {
1035 8 : let mut guard = self.upload_queue.lock().unwrap();
1036 8 : let upload_queue = guard.initialized_mut()?;
1037 :
1038 8 : let with_metadata = self
1039 8 : .schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned())?;
1040 :
1041 8 : self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
1042 8 :
1043 8 : // Launch the tasks immediately, if possible
1044 8 : self.launch_queued_tasks(upload_queue);
1045 8 : Ok(())
1046 8 : }
1047 :
1048 : /// Unlinks the layer files from `index_part.json` but does not yet schedule deletion for the
1049 : /// layer files, leaving them dangling.
1050 : ///
1051 : /// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
1052 : /// is invoked on them.
1053 4 : pub(crate) fn schedule_gc_update(
1054 4 : self: &Arc<Self>,
1055 4 : gc_layers: &[Layer],
1056 4 : ) -> Result<(), NotInitialized> {
1057 4 : let mut guard = self.upload_queue.lock().unwrap();
1058 4 : let upload_queue = guard.initialized_mut()?;
1059 :
1060 : // just forget the return value; after uploading the next index_part.json, we can consider
1061 : // the layer files as "dangling". this is fine, at worst case we create work for the
1062 : // scrubber.
1063 :
1064 4 : let names = gc_layers.iter().map(|x| x.layer_desc().layer_name());
1065 4 :
1066 4 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names)?;
1067 :
1068 4 : self.launch_queued_tasks(upload_queue);
1069 4 :
1070 4 : Ok(())
1071 4 : }
1072 :
1073 : /// Update the remote index file, removing the to-be-deleted files from the index,
1074 : /// allowing scheduling of actual deletions later.
1075 62 : fn schedule_unlinking_of_layers_from_index_part0<I>(
1076 62 : self: &Arc<Self>,
1077 62 : upload_queue: &mut UploadQueueInitialized,
1078 62 : names: I,
1079 62 : ) -> Result<Vec<(LayerName, LayerFileMetadata)>, NotInitialized>
1080 62 : where
1081 62 : I: IntoIterator<Item = LayerName>,
1082 62 : {
1083 62 : // Decorate our list of names with each name's metadata, dropping
1084 62 : // names that are unexpectedly missing from our metadata. This metadata
1085 62 : // is later used when physically deleting layers, to construct key paths.
1086 62 : let with_metadata: Vec<_> = names
1087 62 : .into_iter()
1088 456 : .filter_map(|name| {
1089 456 : let meta = upload_queue.dirty.layer_metadata.remove(&name);
1090 :
1091 456 : if let Some(meta) = meta {
1092 422 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
1093 422 : Some((name, meta))
1094 : } else {
1095 : // This can only happen if we forgot to to schedule the file upload
1096 : // before scheduling the delete. Log it because it is a rare/strange
1097 : // situation, and in case something is misbehaving, we'd like to know which
1098 : // layers experienced this.
1099 34 : info!("Deleting layer {name} not found in latest_files list, never uploaded?");
1100 34 : None
1101 : }
1102 456 : })
1103 62 : .collect();
1104 :
1105 : #[cfg(feature = "testing")]
1106 484 : for (name, metadata) in &with_metadata {
1107 422 : let gen = metadata.generation;
1108 422 : if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen) {
1109 0 : if unexpected == gen {
1110 0 : tracing::error!("{name} was unlinked twice with same generation");
1111 : } else {
1112 0 : tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
1113 : }
1114 422 : }
1115 : }
1116 :
1117 : // after unlinking files from the upload_queue.latest_files we must always schedule an
1118 : // index_part update, because that needs to be uploaded before we can actually delete the
1119 : // files.
1120 62 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
1121 46 : self.schedule_index_upload(upload_queue)?;
1122 16 : }
1123 :
1124 62 : Ok(with_metadata)
1125 62 : }
1126 :
1127 : /// Schedules deletion for layer files which have previously been unlinked from the
1128 : /// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
1129 456 : pub(crate) fn schedule_deletion_of_unlinked(
1130 456 : self: &Arc<Self>,
1131 456 : layers: Vec<(LayerName, LayerFileMetadata)>,
1132 456 : ) -> anyhow::Result<()> {
1133 456 : let mut guard = self.upload_queue.lock().unwrap();
1134 456 : let upload_queue = guard.initialized_mut()?;
1135 :
1136 456 : self.schedule_deletion_of_unlinked0(upload_queue, layers);
1137 456 : self.launch_queued_tasks(upload_queue);
1138 456 : Ok(())
1139 456 : }
1140 :
1141 464 : fn schedule_deletion_of_unlinked0(
1142 464 : self: &Arc<Self>,
1143 464 : upload_queue: &mut UploadQueueInitialized,
1144 464 : mut with_metadata: Vec<(LayerName, LayerFileMetadata)>,
1145 464 : ) {
1146 464 : // Filter out any layers which were not created by this tenant shard. These are
1147 464 : // layers that originate from some ancestor shard after a split, and may still
1148 464 : // be referenced by other shards. We are free to delete them locally and remove
1149 464 : // them from our index (and would have already done so when we reach this point
1150 464 : // in the code), but we may not delete them remotely.
1151 464 : with_metadata.retain(|(name, meta)| {
1152 458 : let retain = meta.shard.shard_number == self.tenant_shard_id.shard_number
1153 458 : && meta.shard.shard_count == self.tenant_shard_id.shard_count;
1154 458 : if !retain {
1155 0 : tracing::debug!(
1156 0 : "Skipping deletion of ancestor-shard layer {name}, from shard {}",
1157 : meta.shard
1158 : );
1159 458 : }
1160 458 : retain
1161 464 : });
1162 :
1163 922 : for (name, meta) in &with_metadata {
1164 458 : info!(
1165 0 : "scheduling deletion of layer {}{} (shard {})",
1166 0 : name,
1167 0 : meta.generation.get_suffix(),
1168 : meta.shard
1169 : );
1170 : }
1171 :
1172 : #[cfg(feature = "testing")]
1173 922 : for (name, meta) in &with_metadata {
1174 458 : let gen = meta.generation;
1175 458 : match upload_queue.dangling_files.remove(name) {
1176 418 : Some(same) if same == gen => { /* expected */ }
1177 0 : Some(other) => {
1178 0 : tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
1179 : }
1180 : None => {
1181 40 : tracing::error!("{name} was unlinked but was not dangling");
1182 : }
1183 : }
1184 : }
1185 :
1186 : // schedule the actual deletions
1187 464 : if with_metadata.is_empty() {
1188 : // avoid scheduling the op & bumping the metric
1189 6 : return;
1190 458 : }
1191 458 : let op = UploadOp::Delete(Delete {
1192 458 : layers: with_metadata,
1193 458 : });
1194 458 : self.metric_begin(&op);
1195 458 : upload_queue.queued_operations.push_back(op);
1196 464 : }
1197 :
1198 : /// Schedules a compaction update to the remote `index_part.json`.
1199 : ///
1200 : /// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
1201 50 : pub(crate) fn schedule_compaction_update(
1202 50 : self: &Arc<Self>,
1203 50 : compacted_from: &[Layer],
1204 50 : compacted_to: &[ResidentLayer],
1205 50 : ) -> Result<(), NotInitialized> {
1206 50 : let mut guard = self.upload_queue.lock().unwrap();
1207 50 : let upload_queue = guard.initialized_mut()?;
1208 :
1209 380 : for layer in compacted_to {
1210 330 : self.schedule_layer_file_upload0(upload_queue, layer.clone());
1211 330 : }
1212 :
1213 450 : let names = compacted_from.iter().map(|x| x.layer_desc().layer_name());
1214 50 :
1215 50 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names)?;
1216 50 : self.launch_queued_tasks(upload_queue);
1217 50 :
1218 50 : Ok(())
1219 50 : }
1220 :
1221 : /// Wait for all previously scheduled uploads/deletions to complete
1222 1316 : pub(crate) async fn wait_completion(self: &Arc<Self>) -> Result<(), WaitCompletionError> {
1223 1316 : let receiver = {
1224 1316 : let mut guard = self.upload_queue.lock().unwrap();
1225 1316 : let upload_queue = guard
1226 1316 : .initialized_mut()
1227 1316 : .map_err(WaitCompletionError::NotInitialized)?;
1228 1316 : self.schedule_barrier0(upload_queue)
1229 1316 : };
1230 1316 :
1231 1316 : Self::wait_completion0(receiver).await
1232 1316 : }
1233 :
1234 1316 : async fn wait_completion0(
1235 1316 : mut receiver: tokio::sync::watch::Receiver<()>,
1236 1316 : ) -> Result<(), WaitCompletionError> {
1237 1316 : if receiver.changed().await.is_err() {
1238 0 : return Err(WaitCompletionError::UploadQueueShutDownOrStopped);
1239 1316 : }
1240 1316 :
1241 1316 : Ok(())
1242 1316 : }
1243 :
1244 6 : pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
1245 6 : let mut guard = self.upload_queue.lock().unwrap();
1246 6 : let upload_queue = guard.initialized_mut()?;
1247 6 : self.schedule_barrier0(upload_queue);
1248 6 : Ok(())
1249 6 : }
1250 :
1251 1322 : fn schedule_barrier0(
1252 1322 : self: &Arc<Self>,
1253 1322 : upload_queue: &mut UploadQueueInitialized,
1254 1322 : ) -> tokio::sync::watch::Receiver<()> {
1255 1322 : let (sender, receiver) = tokio::sync::watch::channel(());
1256 1322 : let barrier_op = UploadOp::Barrier(sender);
1257 1322 :
1258 1322 : upload_queue.queued_operations.push_back(barrier_op);
1259 1322 : // Don't count this kind of operation!
1260 1322 :
1261 1322 : // Launch the task immediately, if possible
1262 1322 : self.launch_queued_tasks(upload_queue);
1263 1322 :
1264 1322 : receiver
1265 1322 : }
1266 :
1267 : /// Wait for all previously scheduled operations to complete, and then stop.
1268 : ///
1269 : /// Not cancellation safe
1270 6 : pub(crate) async fn shutdown(self: &Arc<Self>) {
1271 6 : // On cancellation the queue is left in ackward state of refusing new operations but
1272 6 : // proper stop is yet to be called. On cancel the original or some later task must call
1273 6 : // `stop` or `shutdown`.
1274 6 : let sg = scopeguard::guard((), |_| {
1275 0 : tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
1276 6 : });
1277 :
1278 6 : let fut = {
1279 6 : let mut guard = self.upload_queue.lock().unwrap();
1280 6 : let upload_queue = match &mut *guard {
1281 0 : UploadQueue::Stopped(_) => return,
1282 : UploadQueue::Uninitialized => {
1283 : // transition into Stopped state
1284 0 : self.stop_impl(&mut guard);
1285 0 : return;
1286 : }
1287 6 : UploadQueue::Initialized(ref mut init) => init,
1288 6 : };
1289 6 :
1290 6 : // if the queue is already stuck due to a shutdown operation which was cancelled, then
1291 6 : // just don't add more of these as they would never complete.
1292 6 : //
1293 6 : // TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue
1294 6 : // in every place we would not have to jump through this hoop, and this method could be
1295 6 : // made cancellable.
1296 6 : if !upload_queue.shutting_down {
1297 6 : upload_queue.shutting_down = true;
1298 6 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
1299 6 : // this operation is not counted similar to Barrier
1300 6 :
1301 6 : self.launch_queued_tasks(upload_queue);
1302 6 : }
1303 :
1304 6 : upload_queue.shutdown_ready.clone().acquire_owned()
1305 : };
1306 :
1307 6 : let res = fut.await;
1308 :
1309 6 : scopeguard::ScopeGuard::into_inner(sg);
1310 6 :
1311 6 : match res {
1312 0 : Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
1313 6 : Err(_closed) => {
1314 6 : // expected
1315 6 : }
1316 6 : }
1317 6 :
1318 6 : self.stop();
1319 6 : }
1320 :
1321 : /// Set the deleted_at field in the remote index file.
1322 : ///
1323 : /// This fails if the upload queue has not been `stop()`ed.
1324 : ///
1325 : /// The caller is responsible for calling `stop()` AND for waiting
1326 : /// for any ongoing upload tasks to finish after `stop()` has succeeded.
1327 : /// Check method [`RemoteTimelineClient::stop`] for details.
1328 0 : #[instrument(skip_all)]
1329 : pub(crate) async fn persist_index_part_with_deleted_flag(
1330 : self: &Arc<Self>,
1331 : ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
1332 : let index_part_with_deleted_at = {
1333 : let mut locked = self.upload_queue.lock().unwrap();
1334 :
1335 : // We must be in stopped state because otherwise
1336 : // we can have inprogress index part upload that can overwrite the file
1337 : // with missing is_deleted flag that we going to set below
1338 : let stopped = locked.stopped_mut()?;
1339 :
1340 : match stopped.deleted_at {
1341 : SetDeletedFlagProgress::NotRunning => (), // proceed
1342 : SetDeletedFlagProgress::InProgress(at) => {
1343 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
1344 : }
1345 : SetDeletedFlagProgress::Successful(at) => {
1346 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
1347 : }
1348 : };
1349 : let deleted_at = Utc::now().naive_utc();
1350 : stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
1351 :
1352 : let mut index_part = stopped.upload_queue_for_deletion.dirty.clone();
1353 : index_part.deleted_at = Some(deleted_at);
1354 : index_part
1355 : };
1356 :
1357 0 : let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
1358 0 : let mut locked = self_clone.upload_queue.lock().unwrap();
1359 0 : let stopped = locked
1360 0 : .stopped_mut()
1361 0 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1362 0 : stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
1363 0 : });
1364 :
1365 : pausable_failpoint!("persist_deleted_index_part");
1366 :
1367 : backoff::retry(
1368 0 : || {
1369 0 : upload::upload_index_part(
1370 0 : &self.storage_impl,
1371 0 : &self.tenant_shard_id,
1372 0 : &self.timeline_id,
1373 0 : self.generation,
1374 0 : &index_part_with_deleted_at,
1375 0 : &self.cancel,
1376 0 : )
1377 0 : },
1378 0 : |_e| false,
1379 : 1,
1380 : // have just a couple of attempts
1381 : // when executed as part of timeline deletion this happens in context of api call
1382 : // when executed as part of tenant deletion this happens in the background
1383 : 2,
1384 : "persist_index_part_with_deleted_flag",
1385 : &self.cancel,
1386 : )
1387 : .await
1388 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1389 0 : .and_then(|x| x)?;
1390 :
1391 : // all good, disarm the guard and mark as success
1392 : ScopeGuard::into_inner(undo_deleted_at);
1393 : {
1394 : let mut locked = self.upload_queue.lock().unwrap();
1395 :
1396 : let stopped = locked
1397 : .stopped_mut()
1398 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1399 : stopped.deleted_at = SetDeletedFlagProgress::Successful(
1400 : index_part_with_deleted_at
1401 : .deleted_at
1402 : .expect("we set it above"),
1403 : );
1404 : }
1405 :
1406 : Ok(())
1407 : }
1408 :
1409 0 : pub(crate) fn is_deleting(&self) -> bool {
1410 0 : let mut locked = self.upload_queue.lock().unwrap();
1411 0 : locked.stopped_mut().is_ok()
1412 0 : }
1413 :
1414 0 : pub(crate) async fn preserve_initdb_archive(
1415 0 : self: &Arc<Self>,
1416 0 : tenant_id: &TenantId,
1417 0 : timeline_id: &TimelineId,
1418 0 : cancel: &CancellationToken,
1419 0 : ) -> anyhow::Result<()> {
1420 0 : backoff::retry(
1421 0 : || async {
1422 0 : upload::preserve_initdb_archive(&self.storage_impl, tenant_id, timeline_id, cancel)
1423 0 : .await
1424 0 : },
1425 0 : TimeoutOrCancel::caused_by_cancel,
1426 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
1427 0 : FAILED_REMOTE_OP_RETRIES,
1428 0 : "preserve_initdb_tar_zst",
1429 0 : &cancel.clone(),
1430 0 : )
1431 0 : .await
1432 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1433 0 : .and_then(|x| x)
1434 0 : .context("backing up initdb archive")?;
1435 0 : Ok(())
1436 0 : }
1437 :
1438 : /// Uploads the given layer **without** adding it to be part of a future `index_part.json` upload.
1439 : ///
1440 : /// This is not normally needed.
1441 0 : pub(crate) async fn upload_layer_file(
1442 0 : self: &Arc<Self>,
1443 0 : uploaded: &ResidentLayer,
1444 0 : cancel: &CancellationToken,
1445 0 : ) -> anyhow::Result<()> {
1446 0 : let remote_path = remote_layer_path(
1447 0 : &self.tenant_shard_id.tenant_id,
1448 0 : &self.timeline_id,
1449 0 : self.tenant_shard_id.to_index(),
1450 0 : &uploaded.layer_desc().layer_name(),
1451 0 : uploaded.metadata().generation,
1452 0 : );
1453 0 :
1454 0 : backoff::retry(
1455 0 : || async {
1456 0 : upload::upload_timeline_layer(
1457 0 : &self.storage_impl,
1458 0 : uploaded.local_path(),
1459 0 : &remote_path,
1460 0 : uploaded.metadata().file_size,
1461 0 : cancel,
1462 0 : )
1463 0 : .await
1464 0 : },
1465 0 : TimeoutOrCancel::caused_by_cancel,
1466 0 : FAILED_UPLOAD_WARN_THRESHOLD,
1467 0 : FAILED_REMOTE_OP_RETRIES,
1468 0 : "upload a layer without adding it to latest files",
1469 0 : cancel,
1470 0 : )
1471 0 : .await
1472 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1473 0 : .and_then(|x| x)
1474 0 : .context("upload a layer without adding it to latest files")
1475 0 : }
1476 :
1477 : /// Copies the `adopted` remote existing layer to the remote path of `adopted_as`. The layer is
1478 : /// not added to be part of a future `index_part.json` upload.
1479 0 : pub(crate) async fn copy_timeline_layer(
1480 0 : self: &Arc<Self>,
1481 0 : adopted: &Layer,
1482 0 : adopted_as: &Layer,
1483 0 : cancel: &CancellationToken,
1484 0 : ) -> anyhow::Result<()> {
1485 0 : let source_remote_path = remote_layer_path(
1486 0 : &self.tenant_shard_id.tenant_id,
1487 0 : &adopted
1488 0 : .get_timeline_id()
1489 0 : .expect("Source timeline should be alive"),
1490 0 : self.tenant_shard_id.to_index(),
1491 0 : &adopted.layer_desc().layer_name(),
1492 0 : adopted.metadata().generation,
1493 0 : );
1494 0 :
1495 0 : let target_remote_path = remote_layer_path(
1496 0 : &self.tenant_shard_id.tenant_id,
1497 0 : &self.timeline_id,
1498 0 : self.tenant_shard_id.to_index(),
1499 0 : &adopted_as.layer_desc().layer_name(),
1500 0 : adopted_as.metadata().generation,
1501 0 : );
1502 0 :
1503 0 : backoff::retry(
1504 0 : || async {
1505 0 : upload::copy_timeline_layer(
1506 0 : &self.storage_impl,
1507 0 : &source_remote_path,
1508 0 : &target_remote_path,
1509 0 : cancel,
1510 0 : )
1511 0 : .await
1512 0 : },
1513 0 : TimeoutOrCancel::caused_by_cancel,
1514 0 : FAILED_UPLOAD_WARN_THRESHOLD,
1515 0 : FAILED_REMOTE_OP_RETRIES,
1516 0 : "copy timeline layer",
1517 0 : cancel,
1518 0 : )
1519 0 : .await
1520 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1521 0 : .and_then(|x| x)
1522 0 : .context("remote copy timeline layer")
1523 0 : }
1524 :
1525 0 : async fn flush_deletion_queue(&self) -> Result<(), DeletionQueueError> {
1526 0 : match tokio::time::timeout(
1527 0 : DELETION_QUEUE_FLUSH_TIMEOUT,
1528 0 : self.deletion_queue_client.flush_immediate(),
1529 0 : )
1530 0 : .await
1531 : {
1532 0 : Ok(result) => result,
1533 0 : Err(_timeout) => {
1534 0 : // Flushing remote deletions is not mandatory: we flush here to make the system easier to test, and
1535 0 : // to ensure that _usually_ objects are really gone after a DELETE is acked. However, in case of deletion
1536 0 : // queue issues (https://github.com/neondatabase/neon/issues/6440), we don't want to wait indefinitely here.
1537 0 : tracing::warn!(
1538 0 : "Timed out waiting for deletion queue flush, acking deletion anyway"
1539 : );
1540 0 : Ok(())
1541 : }
1542 : }
1543 0 : }
1544 :
1545 : /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
1546 : /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
1547 : /// deletes leaked files if any and proceeds with deletion of index file at the end.
1548 0 : pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
1549 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1550 :
1551 0 : let layers: Vec<RemotePath> = {
1552 0 : let mut locked = self.upload_queue.lock().unwrap();
1553 0 : let stopped = locked.stopped_mut()?;
1554 :
1555 0 : if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
1556 0 : anyhow::bail!("deleted_at is not set")
1557 0 : }
1558 0 :
1559 0 : debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
1560 :
1561 0 : stopped
1562 0 : .upload_queue_for_deletion
1563 0 : .dirty
1564 0 : .layer_metadata
1565 0 : .drain()
1566 0 : .filter(|(_file_name, meta)| {
1567 0 : // Filter out layers that belonged to an ancestor shard. Since we are deleting the whole timeline from
1568 0 : // all shards anyway, we _could_ delete these, but
1569 0 : // - it creates a potential race if other shards are still
1570 0 : // using the layers while this shard deletes them.
1571 0 : // - it means that if we rolled back the shard split, the ancestor shards would be in a state where
1572 0 : // these timelines are present but corrupt (their index exists but some layers don't)
1573 0 : //
1574 0 : // These layers will eventually be cleaned up by the scrubber when it does physical GC.
1575 0 : meta.shard.shard_number == self.tenant_shard_id.shard_number
1576 0 : && meta.shard.shard_count == self.tenant_shard_id.shard_count
1577 0 : })
1578 0 : .map(|(file_name, meta)| {
1579 0 : remote_layer_path(
1580 0 : &self.tenant_shard_id.tenant_id,
1581 0 : &self.timeline_id,
1582 0 : meta.shard,
1583 0 : &file_name,
1584 0 : meta.generation,
1585 0 : )
1586 0 : })
1587 0 : .collect()
1588 0 : };
1589 0 :
1590 0 : let layer_deletion_count = layers.len();
1591 0 : self.deletion_queue_client.push_immediate(layers).await?;
1592 :
1593 : // Delete the initdb.tar.zst, which is not always present, but deletion attempts of
1594 : // inexistant objects are not considered errors.
1595 0 : let initdb_path =
1596 0 : remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id);
1597 0 : self.deletion_queue_client
1598 0 : .push_immediate(vec![initdb_path])
1599 0 : .await?;
1600 :
1601 : // Do not delete index part yet, it is needed for possible retry. If we remove it first
1602 : // and retry will arrive to different pageserver there wont be any traces of it on remote storage
1603 0 : let timeline_storage_path = remote_timeline_path(&self.tenant_shard_id, &self.timeline_id);
1604 0 :
1605 0 : // Execute all pending deletions, so that when we proceed to do a listing below, we aren't
1606 0 : // taking the burden of listing all the layers that we already know we should delete.
1607 0 : self.flush_deletion_queue().await?;
1608 :
1609 0 : let cancel = shutdown_token();
1610 :
1611 0 : let remaining = download_retry(
1612 0 : || async {
1613 0 : self.storage_impl
1614 0 : .list(
1615 0 : Some(&timeline_storage_path),
1616 0 : ListingMode::NoDelimiter,
1617 0 : None,
1618 0 : &cancel,
1619 0 : )
1620 0 : .await
1621 0 : },
1622 0 : "list remaining files",
1623 0 : &cancel,
1624 0 : )
1625 0 : .await
1626 0 : .context("list files remaining files")?
1627 : .keys;
1628 :
1629 : // We will delete the current index_part object last, since it acts as a deletion
1630 : // marker via its deleted_at attribute
1631 0 : let latest_index = remaining
1632 0 : .iter()
1633 0 : .filter(|o| {
1634 0 : o.key
1635 0 : .object_name()
1636 0 : .map(|n| n.starts_with(IndexPart::FILE_NAME))
1637 0 : .unwrap_or(false)
1638 0 : })
1639 0 : .filter_map(|o| parse_remote_index_path(o.key.clone()).map(|gen| (o.key.clone(), gen)))
1640 0 : .max_by_key(|i| i.1)
1641 0 : .map(|i| i.0.clone())
1642 0 : .unwrap_or(
1643 0 : // No generation-suffixed indices, assume we are dealing with
1644 0 : // a legacy index.
1645 0 : remote_index_path(&self.tenant_shard_id, &self.timeline_id, Generation::none()),
1646 0 : );
1647 0 :
1648 0 : let remaining_layers: Vec<RemotePath> = remaining
1649 0 : .into_iter()
1650 0 : .filter_map(|o| {
1651 0 : if o.key == latest_index || o.key.object_name() == Some(INITDB_PRESERVED_PATH) {
1652 0 : None
1653 : } else {
1654 0 : Some(o.key)
1655 : }
1656 0 : })
1657 0 : .inspect(|path| {
1658 0 : if let Some(name) = path.object_name() {
1659 0 : info!(%name, "deleting a file not referenced from index_part.json");
1660 : } else {
1661 0 : warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
1662 : }
1663 0 : })
1664 0 : .collect();
1665 0 :
1666 0 : let not_referenced_count = remaining_layers.len();
1667 0 : if !remaining_layers.is_empty() {
1668 0 : self.deletion_queue_client
1669 0 : .push_immediate(remaining_layers)
1670 0 : .await?;
1671 0 : }
1672 :
1673 0 : fail::fail_point!("timeline-delete-before-index-delete", |_| {
1674 0 : Err(anyhow::anyhow!(
1675 0 : "failpoint: timeline-delete-before-index-delete"
1676 0 : ))?
1677 0 : });
1678 :
1679 0 : debug!("enqueuing index part deletion");
1680 0 : self.deletion_queue_client
1681 0 : .push_immediate([latest_index].to_vec())
1682 0 : .await?;
1683 :
1684 : // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
1685 : // for a flush to a persistent deletion list so that we may be sure deletion will occur.
1686 0 : self.flush_deletion_queue().await?;
1687 :
1688 0 : fail::fail_point!("timeline-delete-after-index-delete", |_| {
1689 0 : Err(anyhow::anyhow!(
1690 0 : "failpoint: timeline-delete-after-index-delete"
1691 0 : ))?
1692 0 : });
1693 :
1694 0 : info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
1695 :
1696 0 : Ok(())
1697 0 : }
1698 :
1699 : ///
1700 : /// Pick next tasks from the queue, and start as many of them as possible without violating
1701 : /// the ordering constraints.
1702 : ///
1703 : /// The caller needs to already hold the `upload_queue` lock.
1704 7416 : fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
1705 11898 : while let Some(next_op) = upload_queue.queued_operations.front() {
1706 : // Can we run this task now?
1707 8625 : let can_run_now = match next_op {
1708 : UploadOp::UploadLayer(..) => {
1709 : // Can always be scheduled.
1710 1512 : true
1711 : }
1712 : UploadOp::UploadMetadata { .. } => {
1713 : // These can only be performed after all the preceding operations
1714 : // have finished.
1715 4367 : upload_queue.inprogress_tasks.is_empty()
1716 : }
1717 : UploadOp::Delete(_) => {
1718 : // Wait for preceding uploads to finish. Concurrent deletions are OK, though.
1719 278 : upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
1720 : }
1721 :
1722 : UploadOp::Barrier(_) | UploadOp::Shutdown => {
1723 2468 : upload_queue.inprogress_tasks.is_empty()
1724 : }
1725 : };
1726 :
1727 : // If we cannot launch this task, don't look any further.
1728 : //
1729 : // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
1730 : // them now, but we don't try to do that currently. For example, if the frontmost task
1731 : // is an index-file upload that cannot proceed until preceding uploads have finished, we
1732 : // could still start layer uploads that were scheduled later.
1733 8625 : if !can_run_now {
1734 4137 : break;
1735 4488 : }
1736 4488 :
1737 4488 : if let UploadOp::Shutdown = next_op {
1738 : // leave the op in the queue but do not start more tasks; it will be dropped when
1739 : // the stop is called.
1740 6 : upload_queue.shutdown_ready.close();
1741 6 : break;
1742 4482 : }
1743 4482 :
1744 4482 : // We can launch this task. Remove it from the queue first.
1745 4482 : let next_op = upload_queue.queued_operations.pop_front().unwrap();
1746 4482 :
1747 4482 : debug!("starting op: {}", next_op);
1748 :
1749 : // Update the counters
1750 4482 : match next_op {
1751 1512 : UploadOp::UploadLayer(_, _) => {
1752 1512 : upload_queue.num_inprogress_layer_uploads += 1;
1753 1512 : }
1754 1409 : UploadOp::UploadMetadata { .. } => {
1755 1409 : upload_queue.num_inprogress_metadata_uploads += 1;
1756 1409 : }
1757 239 : UploadOp::Delete(_) => {
1758 239 : upload_queue.num_inprogress_deletions += 1;
1759 239 : }
1760 1322 : UploadOp::Barrier(sender) => {
1761 1322 : sender.send_replace(());
1762 1322 : continue;
1763 : }
1764 0 : UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
1765 : };
1766 :
1767 : // Assign unique ID to this task
1768 3160 : upload_queue.task_counter += 1;
1769 3160 : let upload_task_id = upload_queue.task_counter;
1770 3160 :
1771 3160 : // Add it to the in-progress map
1772 3160 : let task = Arc::new(UploadTask {
1773 3160 : task_id: upload_task_id,
1774 3160 : op: next_op,
1775 3160 : retries: AtomicU32::new(0),
1776 3160 : });
1777 3160 : upload_queue
1778 3160 : .inprogress_tasks
1779 3160 : .insert(task.task_id, Arc::clone(&task));
1780 3160 :
1781 3160 : // Spawn task to perform the task
1782 3160 : let self_rc = Arc::clone(self);
1783 3160 : let tenant_shard_id = self.tenant_shard_id;
1784 3160 : let timeline_id = self.timeline_id;
1785 3160 : task_mgr::spawn(
1786 3160 : &self.runtime,
1787 3160 : TaskKind::RemoteUploadTask,
1788 3160 : self.tenant_shard_id,
1789 3160 : Some(self.timeline_id),
1790 3160 : "remote upload",
1791 3057 : async move {
1792 42746 : self_rc.perform_upload_task(task).await;
1793 2958 : Ok(())
1794 2958 : }
1795 3160 : .instrument(info_span!(parent: None, "remote_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, %upload_task_id)),
1796 : );
1797 :
1798 : // Loop back to process next task
1799 : }
1800 7416 : }
1801 :
1802 : ///
1803 : /// Perform an upload task.
1804 : ///
1805 : /// The task is in the `inprogress_tasks` list. This function will try to
1806 : /// execute it, retrying forever. On successful completion, the task is
1807 : /// removed it from the `inprogress_tasks` list, and any next task(s) in the
1808 : /// queue that were waiting by the completion are launched.
1809 : ///
1810 : /// The task can be shut down, however. That leads to stopping the whole
1811 : /// queue.
1812 : ///
1813 3057 : async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
1814 3057 : let cancel = shutdown_token();
1815 : // Loop to retry until it completes.
1816 : loop {
1817 : // If we're requested to shut down, close up shop and exit.
1818 : //
1819 : // Note: We only check for the shutdown requests between retries, so
1820 : // if a shutdown request arrives while we're busy uploading, in the
1821 : // upload::upload:*() call below, we will wait not exit until it has
1822 : // finished. We probably could cancel the upload by simply dropping
1823 : // the Future, but we're not 100% sure if the remote storage library
1824 : // is cancellation safe, so we don't dare to do that. Hopefully, the
1825 : // upload finishes or times out soon enough.
1826 3057 : if cancel.is_cancelled() {
1827 0 : info!("upload task cancelled by shutdown request");
1828 0 : self.stop();
1829 0 : return;
1830 3057 : }
1831 :
1832 3057 : let upload_result: anyhow::Result<()> = match &task.op {
1833 1419 : UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
1834 1419 : let local_path = layer.local_path();
1835 1419 :
1836 1419 : // We should only be uploading layers created by this `Tenant`'s lifetime, so
1837 1419 : // the metadata in the upload should always match our current generation.
1838 1419 : assert_eq!(layer_metadata.generation, self.generation);
1839 :
1840 1419 : let remote_path = remote_layer_path(
1841 1419 : &self.tenant_shard_id.tenant_id,
1842 1419 : &self.timeline_id,
1843 1419 : layer_metadata.shard,
1844 1419 : &layer.layer_desc().layer_name(),
1845 1419 : layer_metadata.generation,
1846 1419 : );
1847 1419 :
1848 1419 : upload::upload_timeline_layer(
1849 1419 : &self.storage_impl,
1850 1419 : local_path,
1851 1419 : &remote_path,
1852 1419 : layer_metadata.file_size,
1853 1419 : &self.cancel,
1854 1419 : )
1855 1419 : .measure_remote_op(
1856 1419 : RemoteOpFileKind::Layer,
1857 1419 : RemoteOpKind::Upload,
1858 1419 : Arc::clone(&self.metrics),
1859 1419 : )
1860 37417 : .await
1861 : }
1862 1399 : UploadOp::UploadMetadata { ref uploaded } => {
1863 1399 : let res = upload::upload_index_part(
1864 1399 : &self.storage_impl,
1865 1399 : &self.tenant_shard_id,
1866 1399 : &self.timeline_id,
1867 1399 : self.generation,
1868 1399 : uploaded,
1869 1399 : &self.cancel,
1870 1399 : )
1871 1399 : .measure_remote_op(
1872 1399 : RemoteOpFileKind::Index,
1873 1399 : RemoteOpKind::Upload,
1874 1399 : Arc::clone(&self.metrics),
1875 1399 : )
1876 5129 : .await;
1877 1395 : if res.is_ok() {
1878 1395 : self.update_remote_physical_size_gauge(Some(uploaded));
1879 1395 : let mention_having_future_layers = if cfg!(feature = "testing") {
1880 1395 : uploaded
1881 1395 : .layer_metadata
1882 1395 : .keys()
1883 17476 : .any(|x| x.is_in_future(uploaded.metadata.disk_consistent_lsn()))
1884 : } else {
1885 0 : false
1886 : };
1887 1395 : if mention_having_future_layers {
1888 : // find rationale near crate::tenant::timeline::init::cleanup_future_layer
1889 13 : tracing::info!(
1890 0 : disk_consistent_lsn = %uploaded.metadata.disk_consistent_lsn(),
1891 0 : "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup"
1892 : );
1893 1382 : }
1894 0 : }
1895 1395 : res
1896 : }
1897 239 : UploadOp::Delete(delete) => {
1898 239 : pausable_failpoint!("before-delete-layer-pausable");
1899 239 : self.deletion_queue_client
1900 239 : .push_layers(
1901 239 : self.tenant_shard_id,
1902 239 : self.timeline_id,
1903 239 : self.generation,
1904 239 : delete.layers.clone(),
1905 239 : )
1906 0 : .await
1907 239 : .map_err(|e| anyhow::anyhow!(e))
1908 : }
1909 0 : unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
1910 : // unreachable. Barrier operations are handled synchronously in
1911 : // launch_queued_tasks
1912 0 : warn!("unexpected {unexpected:?} operation in perform_upload_task");
1913 0 : break;
1914 : }
1915 : };
1916 :
1917 0 : match upload_result {
1918 : Ok(()) => {
1919 2958 : break;
1920 : }
1921 0 : Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => {
1922 0 : // loop around to do the proper stopping
1923 0 : continue;
1924 : }
1925 0 : Err(e) => {
1926 0 : let retries = task.retries.fetch_add(1, Ordering::SeqCst);
1927 0 :
1928 0 : // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
1929 0 : // or other external reasons. Such issues are relatively regular, so log them
1930 0 : // at info level at first, and only WARN if the operation fails repeatedly.
1931 0 : //
1932 0 : // (See similar logic for downloads in `download::download_retry`)
1933 0 : if retries < FAILED_UPLOAD_WARN_THRESHOLD {
1934 0 : info!(
1935 0 : "failed to perform remote task {}, will retry (attempt {}): {:#}",
1936 0 : task.op, retries, e
1937 : );
1938 : } else {
1939 0 : warn!(
1940 0 : "failed to perform remote task {}, will retry (attempt {}): {:?}",
1941 0 : task.op, retries, e
1942 : );
1943 : }
1944 :
1945 : // sleep until it's time to retry, or we're cancelled
1946 0 : exponential_backoff(
1947 0 : retries,
1948 0 : DEFAULT_BASE_BACKOFF_SECONDS,
1949 0 : DEFAULT_MAX_BACKOFF_SECONDS,
1950 0 : &cancel,
1951 0 : )
1952 0 : .await;
1953 : }
1954 : }
1955 : }
1956 :
1957 2958 : let retries = task.retries.load(Ordering::SeqCst);
1958 2958 : if retries > 0 {
1959 0 : info!(
1960 0 : "remote task {} completed successfully after {} retries",
1961 0 : task.op, retries
1962 : );
1963 : } else {
1964 2958 : debug!("remote task {} completed successfully", task.op);
1965 : }
1966 :
1967 : // The task has completed successfully. Remove it from the in-progress list.
1968 2958 : let lsn_update = {
1969 2958 : let mut upload_queue_guard = self.upload_queue.lock().unwrap();
1970 2958 : let upload_queue = match upload_queue_guard.deref_mut() {
1971 0 : UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
1972 0 : UploadQueue::Stopped(_stopped) => {
1973 0 : None
1974 : },
1975 2958 : UploadQueue::Initialized(qi) => { Some(qi) }
1976 : };
1977 :
1978 2958 : let upload_queue = match upload_queue {
1979 2958 : Some(upload_queue) => upload_queue,
1980 : None => {
1981 0 : info!("another concurrent task already stopped the queue");
1982 0 : return;
1983 : }
1984 : };
1985 :
1986 2958 : upload_queue.inprogress_tasks.remove(&task.task_id);
1987 :
1988 2958 : let lsn_update = match task.op {
1989 : UploadOp::UploadLayer(_, _) => {
1990 1324 : upload_queue.num_inprogress_layer_uploads -= 1;
1991 1324 : None
1992 : }
1993 1395 : UploadOp::UploadMetadata { ref uploaded } => {
1994 1395 : upload_queue.num_inprogress_metadata_uploads -= 1;
1995 1395 :
1996 1395 : // the task id is reused as a monotonicity check for storing the "clean"
1997 1395 : // IndexPart.
1998 1395 : let last_updater = upload_queue.clean.1;
1999 1395 : let is_later = last_updater.is_some_and(|task_id| task_id < task.task_id);
2000 1395 : let monotone = is_later || last_updater.is_none();
2001 :
2002 1395 : assert!(monotone, "no two index uploads should be completing at the same time, prev={last_updater:?}, task.task_id={}", task.task_id);
2003 :
2004 : // not taking ownership is wasteful
2005 1395 : upload_queue.clean.0.clone_from(uploaded);
2006 1395 : upload_queue.clean.1 = Some(task.task_id);
2007 1395 :
2008 1395 : let lsn = upload_queue.clean.0.metadata.disk_consistent_lsn();
2009 1395 :
2010 1395 : if self.generation.is_none() {
2011 : // Legacy mode: skip validating generation
2012 0 : upload_queue.visible_remote_consistent_lsn.store(lsn);
2013 0 : None
2014 : } else {
2015 1395 : Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
2016 : }
2017 : }
2018 : UploadOp::Delete(_) => {
2019 239 : upload_queue.num_inprogress_deletions -= 1;
2020 239 : None
2021 : }
2022 0 : UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
2023 : };
2024 :
2025 : // Launch any queued tasks that were unblocked by this one.
2026 2958 : self.launch_queued_tasks(upload_queue);
2027 2958 : lsn_update
2028 : };
2029 :
2030 2958 : if let Some((lsn, slot)) = lsn_update {
2031 : // Updates to the remote_consistent_lsn we advertise to pageservers
2032 : // are all routed through the DeletionQueue, to enforce important
2033 : // data safety guarantees (see docs/rfcs/025-generation-numbers.md)
2034 1395 : self.deletion_queue_client
2035 1395 : .update_remote_consistent_lsn(
2036 1395 : self.tenant_shard_id,
2037 1395 : self.timeline_id,
2038 1395 : self.generation,
2039 1395 : lsn,
2040 1395 : slot,
2041 1395 : )
2042 0 : .await;
2043 1563 : }
2044 :
2045 2958 : self.metric_end(&task.op);
2046 2958 : }
2047 :
2048 6364 : fn metric_impl(
2049 6364 : &self,
2050 6364 : op: &UploadOp,
2051 6364 : ) -> Option<(
2052 6364 : RemoteOpFileKind,
2053 6364 : RemoteOpKind,
2054 6364 : RemoteTimelineClientMetricsCallTrackSize,
2055 6364 : )> {
2056 : use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
2057 6364 : let res = match op {
2058 2838 : UploadOp::UploadLayer(_, m) => (
2059 2838 : RemoteOpFileKind::Layer,
2060 2838 : RemoteOpKind::Upload,
2061 2838 : RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
2062 2838 : ),
2063 2823 : UploadOp::UploadMetadata { .. } => (
2064 2823 : RemoteOpFileKind::Index,
2065 2823 : RemoteOpKind::Upload,
2066 2823 : DontTrackSize {
2067 2823 : reason: "metadata uploads are tiny",
2068 2823 : },
2069 2823 : ),
2070 697 : UploadOp::Delete(_delete) => (
2071 697 : RemoteOpFileKind::Layer,
2072 697 : RemoteOpKind::Delete,
2073 697 : DontTrackSize {
2074 697 : reason: "should we track deletes? positive or negative sign?",
2075 697 : },
2076 697 : ),
2077 : UploadOp::Barrier(..) | UploadOp::Shutdown => {
2078 : // we do not account these
2079 6 : return None;
2080 : }
2081 : };
2082 6358 : Some(res)
2083 6364 : }
2084 :
2085 3400 : fn metric_begin(&self, op: &UploadOp) {
2086 3400 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
2087 3400 : Some(x) => x,
2088 0 : None => return,
2089 : };
2090 3400 : let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
2091 3400 : guard.will_decrement_manually(); // in metric_end(), see right below
2092 3400 : }
2093 :
2094 2964 : fn metric_end(&self, op: &UploadOp) {
2095 2964 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
2096 2958 : Some(x) => x,
2097 6 : None => return,
2098 : };
2099 2958 : self.metrics.call_end(&file_kind, &op_kind, track_bytes);
2100 2964 : }
2101 :
2102 : /// Close the upload queue for new operations and cancel queued operations.
2103 : ///
2104 : /// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
2105 : ///
2106 : /// In-progress operations will still be running after this function returns.
2107 : /// Use `task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), Some(self.tenant_shard_id), Some(timeline_id))`
2108 : /// to wait for them to complete, after calling this function.
2109 14 : pub(crate) fn stop(&self) {
2110 14 : // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
2111 14 : // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
2112 14 : // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
2113 14 : let mut guard = self.upload_queue.lock().unwrap();
2114 14 : self.stop_impl(&mut guard);
2115 14 : }
2116 :
2117 14 : fn stop_impl(&self, guard: &mut std::sync::MutexGuard<UploadQueue>) {
2118 14 : match &mut **guard {
2119 : UploadQueue::Uninitialized => {
2120 0 : info!("UploadQueue is in state Uninitialized, nothing to do");
2121 0 : **guard = UploadQueue::Stopped(UploadQueueStopped::Uninitialized);
2122 : }
2123 : UploadQueue::Stopped(_) => {
2124 : // nothing to do
2125 6 : info!("another concurrent task already shut down the queue");
2126 : }
2127 8 : UploadQueue::Initialized(initialized) => {
2128 8 : info!("shutting down upload queue");
2129 :
2130 : // Replace the queue with the Stopped state, taking ownership of the old
2131 : // Initialized queue. We will do some checks on it, and then drop it.
2132 8 : let qi = {
2133 : // Here we preserve working version of the upload queue for possible use during deletions.
2134 : // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
2135 : // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
2136 : // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
2137 8 : let upload_queue_for_deletion = UploadQueueInitialized {
2138 8 : task_counter: 0,
2139 8 : dirty: initialized.dirty.clone(),
2140 8 : clean: initialized.clean.clone(),
2141 8 : latest_files_changes_since_metadata_upload_scheduled: 0,
2142 8 : visible_remote_consistent_lsn: initialized
2143 8 : .visible_remote_consistent_lsn
2144 8 : .clone(),
2145 8 : num_inprogress_layer_uploads: 0,
2146 8 : num_inprogress_metadata_uploads: 0,
2147 8 : num_inprogress_deletions: 0,
2148 8 : inprogress_tasks: HashMap::default(),
2149 8 : queued_operations: VecDeque::default(),
2150 8 : #[cfg(feature = "testing")]
2151 8 : dangling_files: HashMap::default(),
2152 8 : shutting_down: false,
2153 8 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
2154 8 : };
2155 8 :
2156 8 : let upload_queue = std::mem::replace(
2157 8 : &mut **guard,
2158 8 : UploadQueue::Stopped(UploadQueueStopped::Deletable(
2159 8 : UploadQueueStoppedDeletable {
2160 8 : upload_queue_for_deletion,
2161 8 : deleted_at: SetDeletedFlagProgress::NotRunning,
2162 8 : },
2163 8 : )),
2164 8 : );
2165 8 : if let UploadQueue::Initialized(qi) = upload_queue {
2166 8 : qi
2167 : } else {
2168 0 : unreachable!("we checked in the match above that it is Initialized");
2169 : }
2170 : };
2171 :
2172 : // consistency check
2173 8 : assert_eq!(
2174 8 : qi.num_inprogress_layer_uploads
2175 8 : + qi.num_inprogress_metadata_uploads
2176 8 : + qi.num_inprogress_deletions,
2177 8 : qi.inprogress_tasks.len()
2178 8 : );
2179 :
2180 : // We don't need to do anything here for in-progress tasks. They will finish
2181 : // on their own, decrement the unfinished-task counter themselves, and observe
2182 : // that the queue is Stopped.
2183 8 : drop(qi.inprogress_tasks);
2184 :
2185 : // Tear down queued ops
2186 8 : for op in qi.queued_operations.into_iter() {
2187 6 : self.metric_end(&op);
2188 6 : // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
2189 6 : // which is exactly what we want to happen.
2190 6 : drop(op);
2191 6 : }
2192 : }
2193 : }
2194 14 : }
2195 :
2196 : /// Returns an accessor which will hold the UploadQueue mutex for accessing the upload queue
2197 : /// externally to RemoteTimelineClient.
2198 0 : pub(crate) fn initialized_upload_queue(
2199 0 : &self,
2200 0 : ) -> Result<UploadQueueAccessor<'_>, NotInitialized> {
2201 0 : let mut inner = self.upload_queue.lock().unwrap();
2202 0 : inner.initialized_mut()?;
2203 0 : Ok(UploadQueueAccessor { inner })
2204 0 : }
2205 : }
2206 :
2207 : pub(crate) struct UploadQueueAccessor<'a> {
2208 : inner: std::sync::MutexGuard<'a, UploadQueue>,
2209 : }
2210 :
2211 : impl UploadQueueAccessor<'_> {
2212 0 : pub(crate) fn latest_uploaded_index_part(&self) -> &IndexPart {
2213 0 : match &*self.inner {
2214 0 : UploadQueue::Initialized(x) => &x.clean.0,
2215 : UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
2216 0 : unreachable!("checked before constructing")
2217 : }
2218 : }
2219 0 : }
2220 : }
2221 :
2222 0 : pub fn remote_tenant_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2223 0 : let path = format!("tenants/{tenant_shard_id}");
2224 0 : RemotePath::from_string(&path).expect("Failed to construct path")
2225 0 : }
2226 :
2227 186 : pub fn remote_tenant_manifest_path(
2228 186 : tenant_shard_id: &TenantShardId,
2229 186 : generation: Generation,
2230 186 : ) -> RemotePath {
2231 186 : let path = format!(
2232 186 : "tenants/{tenant_shard_id}/tenant-manifest{}.json",
2233 186 : generation.get_suffix()
2234 186 : );
2235 186 : RemotePath::from_string(&path).expect("Failed to construct path")
2236 186 : }
2237 :
2238 216 : pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2239 216 : let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
2240 216 : RemotePath::from_string(&path).expect("Failed to construct path")
2241 216 : }
2242 :
2243 0 : fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath {
2244 0 : let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
2245 0 : RemotePath::from_string(&path).expect("Failed to construct path")
2246 0 : }
2247 :
2248 30 : pub fn remote_timeline_path(
2249 30 : tenant_shard_id: &TenantShardId,
2250 30 : timeline_id: &TimelineId,
2251 30 : ) -> RemotePath {
2252 30 : remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
2253 30 : }
2254 :
2255 : /// Obtains the path of the given Layer in the remote
2256 : ///
2257 : /// Note that the shard component of a remote layer path is _not_ always the same
2258 : /// as in the TenantShardId of the caller: tenants may reference layers from a different
2259 : /// ShardIndex. Use the ShardIndex from the layer's metadata.
2260 1437 : pub fn remote_layer_path(
2261 1437 : tenant_id: &TenantId,
2262 1437 : timeline_id: &TimelineId,
2263 1437 : shard: ShardIndex,
2264 1437 : layer_file_name: &LayerName,
2265 1437 : generation: Generation,
2266 1437 : ) -> RemotePath {
2267 1437 : // Generation-aware key format
2268 1437 : let path = format!(
2269 1437 : "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
2270 1437 : shard.get_suffix(),
2271 1437 : layer_file_name,
2272 1437 : generation.get_suffix()
2273 1437 : );
2274 1437 :
2275 1437 : RemotePath::from_string(&path).expect("Failed to construct path")
2276 1437 : }
2277 :
2278 4 : pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
2279 4 : RemotePath::from_string(&format!(
2280 4 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
2281 4 : ))
2282 4 : .expect("Failed to construct path")
2283 4 : }
2284 :
2285 2 : pub fn remote_initdb_preserved_archive_path(
2286 2 : tenant_id: &TenantId,
2287 2 : timeline_id: &TimelineId,
2288 2 : ) -> RemotePath {
2289 2 : RemotePath::from_string(&format!(
2290 2 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PRESERVED_PATH}"
2291 2 : ))
2292 2 : .expect("Failed to construct path")
2293 2 : }
2294 :
2295 1450 : pub fn remote_index_path(
2296 1450 : tenant_shard_id: &TenantShardId,
2297 1450 : timeline_id: &TimelineId,
2298 1450 : generation: Generation,
2299 1450 : ) -> RemotePath {
2300 1450 : RemotePath::from_string(&format!(
2301 1450 : "tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
2302 1450 : IndexPart::FILE_NAME,
2303 1450 : generation.get_suffix()
2304 1450 : ))
2305 1450 : .expect("Failed to construct path")
2306 1450 : }
2307 :
2308 0 : pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
2309 0 : RemotePath::from_string(&format!(
2310 0 : "tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
2311 0 : ))
2312 0 : .expect("Failed to construct path")
2313 0 : }
2314 :
2315 : /// Given the key of an index, parse out the generation part of the name
2316 18 : pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
2317 18 : let file_name = match path.get_path().file_name() {
2318 18 : Some(f) => f,
2319 : None => {
2320 : // Unexpected: we should be seeing index_part.json paths only
2321 0 : tracing::warn!("Malformed index key {}", path);
2322 0 : return None;
2323 : }
2324 : };
2325 :
2326 18 : match file_name.split_once('-') {
2327 12 : Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
2328 6 : None => None,
2329 : }
2330 18 : }
2331 :
2332 : #[cfg(test)]
2333 : mod tests {
2334 : use super::*;
2335 : use crate::{
2336 : context::RequestContext,
2337 : tenant::{
2338 : harness::{TenantHarness, TIMELINE_ID},
2339 : storage_layer::layer::local_layer_path,
2340 : Tenant, Timeline,
2341 : },
2342 : DEFAULT_PG_VERSION,
2343 : };
2344 :
2345 : use std::collections::HashSet;
2346 :
2347 8 : pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
2348 8 : format!("contents for {name}").into()
2349 8 : }
2350 :
2351 2 : pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
2352 2 : let metadata = TimelineMetadata::new(
2353 2 : disk_consistent_lsn,
2354 2 : None,
2355 2 : None,
2356 2 : Lsn(0),
2357 2 : Lsn(0),
2358 2 : Lsn(0),
2359 2 : // Any version will do
2360 2 : // but it should be consistent with the one in the tests
2361 2 : crate::DEFAULT_PG_VERSION,
2362 2 : );
2363 2 :
2364 2 : // go through serialize + deserialize to fix the header, including checksum
2365 2 : TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
2366 2 : }
2367 :
2368 2 : fn assert_file_list(a: &HashSet<LayerName>, b: &[&str]) {
2369 6 : let mut avec: Vec<String> = a.iter().map(|x| x.to_string()).collect();
2370 2 : avec.sort();
2371 2 :
2372 2 : let mut bvec = b.to_vec();
2373 2 : bvec.sort_unstable();
2374 2 :
2375 2 : assert_eq!(avec, bvec);
2376 2 : }
2377 :
2378 4 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
2379 4 : let mut expected: Vec<String> = expected
2380 4 : .iter()
2381 16 : .map(|x| format!("{}{}", x, generation.get_suffix()))
2382 4 : .collect();
2383 4 : expected.sort();
2384 4 :
2385 4 : let mut found: Vec<String> = Vec::new();
2386 16 : for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
2387 16 : let entry_name = entry.file_name();
2388 16 : let fname = entry_name.to_str().unwrap();
2389 16 : found.push(String::from(fname));
2390 16 : }
2391 4 : found.sort();
2392 4 :
2393 4 : assert_eq!(found, expected);
2394 4 : }
2395 :
2396 : struct TestSetup {
2397 : harness: TenantHarness,
2398 : tenant: Arc<Tenant>,
2399 : timeline: Arc<Timeline>,
2400 : tenant_ctx: RequestContext,
2401 : }
2402 :
2403 : impl TestSetup {
2404 8 : async fn new(test_name: &str) -> anyhow::Result<Self> {
2405 8 : let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
2406 8 : let harness = TenantHarness::create(test_name).await?;
2407 40 : let (tenant, ctx) = harness.load().await;
2408 :
2409 8 : let timeline = tenant
2410 8 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2411 16 : .await?;
2412 :
2413 8 : Ok(Self {
2414 8 : harness,
2415 8 : tenant,
2416 8 : timeline,
2417 8 : tenant_ctx: ctx,
2418 8 : })
2419 8 : }
2420 :
2421 : /// Construct a RemoteTimelineClient in an arbitrary generation
2422 10 : fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
2423 10 : Arc::new(RemoteTimelineClient {
2424 10 : conf: self.harness.conf,
2425 10 : runtime: tokio::runtime::Handle::current(),
2426 10 : tenant_shard_id: self.harness.tenant_shard_id,
2427 10 : timeline_id: TIMELINE_ID,
2428 10 : generation,
2429 10 : storage_impl: self.harness.remote_storage.clone(),
2430 10 : deletion_queue_client: self.harness.deletion_queue.new_client(),
2431 10 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
2432 10 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
2433 10 : &self.harness.tenant_shard_id,
2434 10 : &TIMELINE_ID,
2435 10 : )),
2436 10 : cancel: CancellationToken::new(),
2437 10 : })
2438 10 : }
2439 :
2440 : /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
2441 : /// and timeline_id are present.
2442 6 : fn span(&self) -> tracing::Span {
2443 6 : tracing::info_span!(
2444 : "test",
2445 : tenant_id = %self.harness.tenant_shard_id.tenant_id,
2446 0 : shard_id = %self.harness.tenant_shard_id.shard_slug(),
2447 : timeline_id = %TIMELINE_ID
2448 : )
2449 6 : }
2450 : }
2451 :
2452 : // Test scheduling
2453 : #[tokio::test]
2454 2 : async fn upload_scheduling() {
2455 2 : // Test outline:
2456 2 : //
2457 2 : // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
2458 2 : // Schedule upload of index. Check that it is queued
2459 2 : // let the layer file uploads finish. Check that the index-upload is now started
2460 2 : // let the index-upload finish.
2461 2 : //
2462 2 : // Download back the index.json. Check that the list of files is correct
2463 2 : //
2464 2 : // Schedule upload. Schedule deletion. Check that the deletion is queued
2465 2 : // let upload finish. Check that deletion is now started
2466 2 : // Schedule another deletion. Check that it's launched immediately.
2467 2 : // Schedule index upload. Check that it's queued
2468 2 :
2469 14 : let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
2470 2 : let span = test_setup.span();
2471 2 : let _guard = span.enter();
2472 2 :
2473 2 : let TestSetup {
2474 2 : harness,
2475 2 : tenant: _tenant,
2476 2 : timeline,
2477 2 : tenant_ctx: _tenant_ctx,
2478 2 : } = test_setup;
2479 2 :
2480 2 : let client = &timeline.remote_client;
2481 2 :
2482 2 : // Download back the index.json, and check that the list of files is correct
2483 2 : let initial_index_part = match client
2484 2 : .download_index_file(&CancellationToken::new())
2485 6 : .await
2486 2 : .unwrap()
2487 2 : {
2488 2 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2489 2 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2490 2 : };
2491 2 : let initial_layers = initial_index_part
2492 2 : .layer_metadata
2493 2 : .keys()
2494 2 : .map(|f| f.to_owned())
2495 2 : .collect::<HashSet<LayerName>>();
2496 2 : let initial_layer = {
2497 2 : assert!(initial_layers.len() == 1);
2498 2 : initial_layers.into_iter().next().unwrap()
2499 2 : };
2500 2 :
2501 2 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
2502 2 :
2503 2 : println!("workdir: {}", harness.conf.workdir);
2504 2 :
2505 2 : let remote_timeline_dir = harness
2506 2 : .remote_fs_dir
2507 2 : .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
2508 2 : println!("remote_timeline_dir: {remote_timeline_dir}");
2509 2 :
2510 2 : let generation = harness.generation;
2511 2 : let shard = harness.shard;
2512 2 :
2513 2 : // Create a couple of dummy files, schedule upload for them
2514 2 :
2515 2 : let layers = [
2516 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
2517 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
2518 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
2519 2 : ]
2520 2 : .into_iter()
2521 6 : .map(|(name, contents): (LayerName, Vec<u8>)| {
2522 6 :
2523 6 : let local_path = local_layer_path(
2524 6 : harness.conf,
2525 6 : &timeline.tenant_shard_id,
2526 6 : &timeline.timeline_id,
2527 6 : &name,
2528 6 : &generation,
2529 6 : );
2530 6 : std::fs::write(&local_path, &contents).unwrap();
2531 6 :
2532 6 : Layer::for_resident(
2533 6 : harness.conf,
2534 6 : &timeline,
2535 6 : local_path,
2536 6 : name,
2537 6 : LayerFileMetadata::new(contents.len() as u64, generation, shard),
2538 6 : )
2539 6 : }).collect::<Vec<_>>();
2540 2 :
2541 2 : client
2542 2 : .schedule_layer_file_upload(layers[0].clone())
2543 2 : .unwrap();
2544 2 : client
2545 2 : .schedule_layer_file_upload(layers[1].clone())
2546 2 : .unwrap();
2547 2 :
2548 2 : // Check that they are started immediately, not queued
2549 2 : //
2550 2 : // this works because we running within block_on, so any futures are now queued up until
2551 2 : // our next await point.
2552 2 : {
2553 2 : let mut guard = client.upload_queue.lock().unwrap();
2554 2 : let upload_queue = guard.initialized_mut().unwrap();
2555 2 : assert!(upload_queue.queued_operations.is_empty());
2556 2 : assert!(upload_queue.inprogress_tasks.len() == 2);
2557 2 : assert!(upload_queue.num_inprogress_layer_uploads == 2);
2558 2 :
2559 2 : // also check that `latest_file_changes` was updated
2560 2 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
2561 2 : }
2562 2 :
2563 2 : // Schedule upload of index. Check that it is queued
2564 2 : let metadata = dummy_metadata(Lsn(0x20));
2565 2 : client
2566 2 : .schedule_index_upload_for_full_metadata_update(&metadata)
2567 2 : .unwrap();
2568 2 : {
2569 2 : let mut guard = client.upload_queue.lock().unwrap();
2570 2 : let upload_queue = guard.initialized_mut().unwrap();
2571 2 : assert!(upload_queue.queued_operations.len() == 1);
2572 2 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
2573 2 : }
2574 2 :
2575 2 : // Wait for the uploads to finish
2576 2 : client.wait_completion().await.unwrap();
2577 2 : {
2578 2 : let mut guard = client.upload_queue.lock().unwrap();
2579 2 : let upload_queue = guard.initialized_mut().unwrap();
2580 2 :
2581 2 : assert!(upload_queue.queued_operations.is_empty());
2582 2 : assert!(upload_queue.inprogress_tasks.is_empty());
2583 2 : }
2584 2 :
2585 2 : // Download back the index.json, and check that the list of files is correct
2586 2 : let index_part = match client
2587 2 : .download_index_file(&CancellationToken::new())
2588 6 : .await
2589 2 : .unwrap()
2590 2 : {
2591 2 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2592 2 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2593 2 : };
2594 2 :
2595 2 : assert_file_list(
2596 2 : &index_part
2597 2 : .layer_metadata
2598 2 : .keys()
2599 6 : .map(|f| f.to_owned())
2600 2 : .collect(),
2601 2 : &[
2602 2 : &initial_layer.to_string(),
2603 2 : &layers[0].layer_desc().layer_name().to_string(),
2604 2 : &layers[1].layer_desc().layer_name().to_string(),
2605 2 : ],
2606 2 : );
2607 2 : assert_eq!(index_part.metadata, metadata);
2608 2 :
2609 2 : // Schedule upload and then a deletion. Check that the deletion is queued
2610 2 : client
2611 2 : .schedule_layer_file_upload(layers[2].clone())
2612 2 : .unwrap();
2613 2 :
2614 2 : // this is no longer consistent with how deletion works with Layer::drop, but in this test
2615 2 : // keep using schedule_layer_file_deletion because we don't have a way to wait for the
2616 2 : // spawn_blocking started by the drop.
2617 2 : client
2618 2 : .schedule_layer_file_deletion(&[layers[0].layer_desc().layer_name()])
2619 2 : .unwrap();
2620 2 : {
2621 2 : let mut guard = client.upload_queue.lock().unwrap();
2622 2 : let upload_queue = guard.initialized_mut().unwrap();
2623 2 :
2624 2 : // Deletion schedules upload of the index file, and the file deletion itself
2625 2 : assert_eq!(upload_queue.queued_operations.len(), 2);
2626 2 : assert_eq!(upload_queue.inprogress_tasks.len(), 1);
2627 2 : assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
2628 2 : assert_eq!(upload_queue.num_inprogress_deletions, 0);
2629 2 : assert_eq!(
2630 2 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
2631 2 : 0
2632 2 : );
2633 2 : }
2634 2 : assert_remote_files(
2635 2 : &[
2636 2 : &initial_layer.to_string(),
2637 2 : &layers[0].layer_desc().layer_name().to_string(),
2638 2 : &layers[1].layer_desc().layer_name().to_string(),
2639 2 : "index_part.json",
2640 2 : ],
2641 2 : &remote_timeline_dir,
2642 2 : generation,
2643 2 : );
2644 2 :
2645 2 : // Finish them
2646 2 : client.wait_completion().await.unwrap();
2647 2 : harness.deletion_queue.pump().await;
2648 2 :
2649 2 : assert_remote_files(
2650 2 : &[
2651 2 : &initial_layer.to_string(),
2652 2 : &layers[1].layer_desc().layer_name().to_string(),
2653 2 : &layers[2].layer_desc().layer_name().to_string(),
2654 2 : "index_part.json",
2655 2 : ],
2656 2 : &remote_timeline_dir,
2657 2 : generation,
2658 2 : );
2659 2 : }
2660 :
2661 : #[tokio::test]
2662 2 : async fn bytes_unfinished_gauge_for_layer_file_uploads() {
2663 2 : // Setup
2664 2 :
2665 2 : let TestSetup {
2666 2 : harness,
2667 2 : tenant: _tenant,
2668 2 : timeline,
2669 2 : ..
2670 14 : } = TestSetup::new("metrics").await.unwrap();
2671 2 : let client = &timeline.remote_client;
2672 2 :
2673 2 : let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
2674 2 : let local_path = local_layer_path(
2675 2 : harness.conf,
2676 2 : &timeline.tenant_shard_id,
2677 2 : &timeline.timeline_id,
2678 2 : &layer_file_name_1,
2679 2 : &harness.generation,
2680 2 : );
2681 2 : let content_1 = dummy_contents("foo");
2682 2 : std::fs::write(&local_path, &content_1).unwrap();
2683 2 :
2684 2 : let layer_file_1 = Layer::for_resident(
2685 2 : harness.conf,
2686 2 : &timeline,
2687 2 : local_path,
2688 2 : layer_file_name_1.clone(),
2689 2 : LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
2690 2 : );
2691 2 :
2692 2 : #[derive(Debug, PartialEq, Clone, Copy)]
2693 2 : struct BytesStartedFinished {
2694 2 : started: Option<usize>,
2695 2 : finished: Option<usize>,
2696 2 : }
2697 2 : impl std::ops::Add for BytesStartedFinished {
2698 2 : type Output = Self;
2699 4 : fn add(self, rhs: Self) -> Self::Output {
2700 4 : Self {
2701 4 : started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
2702 4 : finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
2703 4 : }
2704 4 : }
2705 2 : }
2706 6 : let get_bytes_started_stopped = || {
2707 6 : let started = client
2708 6 : .metrics
2709 6 : .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2710 6 : .map(|v| v.try_into().unwrap());
2711 6 : let stopped = client
2712 6 : .metrics
2713 6 : .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2714 6 : .map(|v| v.try_into().unwrap());
2715 6 : BytesStartedFinished {
2716 6 : started,
2717 6 : finished: stopped,
2718 6 : }
2719 6 : };
2720 2 :
2721 2 : // Test
2722 2 : tracing::info!("now doing actual test");
2723 2 :
2724 2 : let actual_a = get_bytes_started_stopped();
2725 2 :
2726 2 : client
2727 2 : .schedule_layer_file_upload(layer_file_1.clone())
2728 2 : .unwrap();
2729 2 :
2730 2 : let actual_b = get_bytes_started_stopped();
2731 2 :
2732 2 : client.wait_completion().await.unwrap();
2733 2 :
2734 2 : let actual_c = get_bytes_started_stopped();
2735 2 :
2736 2 : // Validate
2737 2 :
2738 2 : let expected_b = actual_a
2739 2 : + BytesStartedFinished {
2740 2 : started: Some(content_1.len()),
2741 2 : // assert that the _finished metric is created eagerly so that subtractions work on first sample
2742 2 : finished: Some(0),
2743 2 : };
2744 2 : assert_eq!(actual_b, expected_b);
2745 2 :
2746 2 : let expected_c = actual_a
2747 2 : + BytesStartedFinished {
2748 2 : started: Some(content_1.len()),
2749 2 : finished: Some(content_1.len()),
2750 2 : };
2751 2 : assert_eq!(actual_c, expected_c);
2752 2 : }
2753 :
2754 12 : async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
2755 12 : // An empty IndexPart, just sufficient to ensure deserialization will succeed
2756 12 : let example_index_part = IndexPart::example();
2757 12 :
2758 12 : let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
2759 12 :
2760 12 : let index_path = test_state.harness.remote_fs_dir.join(
2761 12 : remote_index_path(
2762 12 : &test_state.harness.tenant_shard_id,
2763 12 : &TIMELINE_ID,
2764 12 : generation,
2765 12 : )
2766 12 : .get_path(),
2767 12 : );
2768 12 :
2769 12 : std::fs::create_dir_all(index_path.parent().unwrap())
2770 12 : .expect("creating test dir should work");
2771 12 :
2772 12 : eprintln!("Writing {index_path}");
2773 12 : std::fs::write(&index_path, index_part_bytes).unwrap();
2774 12 : example_index_part
2775 12 : }
2776 :
2777 : /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
2778 : /// index, the IndexPart returned is equal to `expected`
2779 10 : async fn assert_got_index_part(
2780 10 : test_state: &TestSetup,
2781 10 : get_generation: Generation,
2782 10 : expected: &IndexPart,
2783 10 : ) {
2784 10 : let client = test_state.build_client(get_generation);
2785 :
2786 10 : let download_r = client
2787 10 : .download_index_file(&CancellationToken::new())
2788 50 : .await
2789 10 : .expect("download should always succeed");
2790 10 : assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
2791 10 : match download_r {
2792 10 : MaybeDeletedIndexPart::IndexPart(index_part) => {
2793 10 : assert_eq!(&index_part, expected);
2794 : }
2795 0 : MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
2796 : }
2797 10 : }
2798 :
2799 : #[tokio::test]
2800 2 : async fn index_part_download_simple() -> anyhow::Result<()> {
2801 14 : let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
2802 2 : let span = test_state.span();
2803 2 : let _guard = span.enter();
2804 2 :
2805 2 : // Simple case: we are in generation N, load the index from generation N - 1
2806 2 : let generation_n = 5;
2807 2 : let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
2808 2 :
2809 8 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
2810 2 :
2811 2 : Ok(())
2812 2 : }
2813 :
2814 : #[tokio::test]
2815 2 : async fn index_part_download_ordering() -> anyhow::Result<()> {
2816 2 : let test_state = TestSetup::new("index_part_download_ordering")
2817 14 : .await
2818 2 : .unwrap();
2819 2 :
2820 2 : let span = test_state.span();
2821 2 : let _guard = span.enter();
2822 2 :
2823 2 : // A generation-less IndexPart exists in the bucket, we should find it
2824 2 : let generation_n = 5;
2825 2 : let injected_none = inject_index_part(&test_state, Generation::none()).await;
2826 12 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
2827 2 :
2828 2 : // If a more recent-than-none generation exists, we should prefer to load that
2829 2 : let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
2830 12 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
2831 2 :
2832 2 : // If a more-recent-than-me generation exists, we should ignore it.
2833 2 : let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
2834 12 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
2835 2 :
2836 2 : // If a directly previous generation exists, _and_ an index exists in my own
2837 2 : // generation, I should prefer my own generation.
2838 2 : let _injected_prev =
2839 2 : inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
2840 2 : let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
2841 2 : assert_got_index_part(
2842 2 : &test_state,
2843 2 : Generation::new(generation_n),
2844 2 : &injected_current,
2845 2 : )
2846 6 : .await;
2847 2 :
2848 2 : Ok(())
2849 2 : }
2850 : }
|