Line data Source code
1 : //! This module manages synchronizing local FS with remote storage.
2 : //!
3 : //! # Overview
4 : //!
5 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
6 : //! It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
7 : //! when it's safe to do so.
8 : //!
9 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
10 : //!
11 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
12 : //!
13 : //! # APIs & How To Use Them
14 : //!
15 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
16 : //! unless the pageserver is configured without remote storage.
17 : //!
18 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
19 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
20 : //! timeline.
21 : //! However, the client does not become ready for use until we've initialized its upload queue:
22 : //!
23 : //! - For timelines that already have some state on the remote storage, we use
24 : //! [`RemoteTimelineClient::init_upload_queue`] .
25 : //! - For newly created timelines, we use
26 : //! [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
27 : //!
28 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
29 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
30 : //!
31 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
32 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
33 : //!
34 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`] when we've created a new layer file.
35 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
36 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
37 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
38 : //!
39 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
40 : //!
41 : //! There are also APIs for downloading files.
42 : //! These are not part of the aforementioned queuing and will not be discussed
43 : //! further here, except in the section covering tenant attach.
44 : //!
45 : //! # Remote Storage Structure & [`IndexPart`] Index File
46 : //!
47 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
48 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
49 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
50 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
51 : //! files for a given timeline.
52 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
53 : //!
54 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
55 : //!
56 : //! # Consistency
57 : //!
58 : //! To have a consistent remote structure, it's important that uploads and
59 : //! deletions are performed in the right order. For example, the index file
60 : //! contains a list of layer files, so it must not be uploaded until all the
61 : //! layer files that are in its list have been successfully uploaded.
62 : //!
63 : //! The contract between client and its user is that the user is responsible of
64 : //! scheduling operations in an order that keeps the remote consistent as
65 : //! described above.
66 : //! From the user's perspective, the operations are executed sequentially.
67 : //! Internally, the client knows which operations can be performed in parallel,
68 : //! and which operations act like a "barrier" that require preceding operations
69 : //! to finish. The calling code just needs to call the schedule-functions in the
70 : //! correct order, and the client will parallelize the operations in a way that
71 : //! is safe.
72 : //!
73 : //! The caller should be careful with deletion, though. They should not delete
74 : //! local files that have been scheduled for upload but not yet finished uploading.
75 : //! Otherwise the upload will fail. To wait for an upload to finish, use
76 : //! the 'wait_completion' function (more on that later.)
77 : //!
78 : //! All of this relies on the following invariants:
79 : //!
80 : //! - We rely on read-after write consistency in the remote storage.
81 : //! - Layer files are immutable
82 : //!
83 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
84 : //! storage. Different tenants can be attached to different pageservers, but if the
85 : //! same tenant is attached to two pageservers at the same time, they will overwrite
86 : //! each other's index file updates, and confusion will ensue. There's no interlock or
87 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
88 : //! that that doesn't happen.
89 : //!
90 : //! ## Implementation Note
91 : //!
92 : //! The *actual* remote state lags behind the *desired* remote state while
93 : //! there are in-flight operations.
94 : //! We keep track of the desired remote state in
95 : //! [`UploadQueueInitialized::latest_files`] and [`UploadQueueInitialized::latest_metadata`].
96 : //! It is initialized based on the [`IndexPart`] that was passed during init
97 : //! and updated with every `schedule_*` function call.
98 : //! All this is necessary necessary to compute the future [`IndexPart`]s
99 : //! when scheduling an operation while other operations that also affect the
100 : //! remote [`IndexPart`] are in flight.
101 : //!
102 : //! # Retries & Error Handling
103 : //!
104 : //! The client retries operations indefinitely, using exponential back-off.
105 : //! There is no way to force a retry, i.e., interrupt the back-off.
106 : //! This could be built easily.
107 : //!
108 : //! # Cancellation
109 : //!
110 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
111 : //! the client's tenant and timeline.
112 : //! Dropping the client will drop queued operations but not executing operations.
113 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
114 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
115 : //!
116 : //! # Completion
117 : //!
118 : //! Once an operation has completed, we update
119 : //! [`UploadQueueInitialized::projected_remote_consistent_lsn`] immediately,
120 : //! and submit a request through the DeletionQueue to update
121 : //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
122 : //! validated that our generation is not stale. It is this visible value
123 : //! that is advertized to safekeepers as a signal that that they can
124 : //! delete the WAL up to that LSN.
125 : //!
126 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
127 : //! for all pending operations to complete. It does not prevent more
128 : //! operations from getting scheduled.
129 : //!
130 : //! # Crash Consistency
131 : //!
132 : //! We do not persist the upload queue state.
133 : //! If we drop the client, or crash, all unfinished operations are lost.
134 : //!
135 : //! To recover, the following steps need to be taken:
136 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
137 : //! consistent remote state, assuming the user scheduled the operations in
138 : //! the correct order.
139 : //! - Initiate upload queue with that [`IndexPart`].
140 : //! - Reschedule all lost operations by comparing the local filesystem state
141 : //! and remote state as per [`IndexPart`]. This is done in
142 : //! [`Tenant::timeline_init_and_sync`].
143 : //!
144 : //! Note that if we crash during file deletion between the index update
145 : //! that removes the file from the list of files, and deleting the remote file,
146 : //! the file is leaked in the remote storage. Similarly, if a new file is created
147 : //! and uploaded, but the pageserver dies permanently before updating the
148 : //! remote index file, the new file is leaked in remote storage. We accept and
149 : //! tolerate that for now.
150 : //! Note further that we cannot easily fix this by scheduling deletes for every
151 : //! file that is present only on the remote, because we cannot distinguish the
152 : //! following two cases:
153 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
154 : //! but crashed before it finished remotely.
155 : //! - (2) We never had the file locally because we haven't on-demand downloaded
156 : //! it yet.
157 : //!
158 : //! # Downloads
159 : //!
160 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
161 : //! downloading files from the remote storage. Downloads are performed immediately
162 : //! against the `RemoteStorage`, independently of the upload queue.
163 : //!
164 : //! When we attach a tenant, we perform the following steps:
165 : //! - create `Tenant` object in `TenantState::Attaching` state
166 : //! - List timelines that are present in remote storage, and for each:
167 : //! - download their remote [`IndexPart`]s
168 : //! - create `Timeline` struct and a `RemoteTimelineClient`
169 : //! - initialize the client's upload queue with its `IndexPart`
170 : //! - schedule uploads for layers that are only present locally.
171 : //! - After the above is done for each timeline, open the tenant for business by
172 : //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
173 : //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
174 : //!
175 : //! # Operating Without Remote Storage
176 : //!
177 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
178 : //! not created and the uploads are skipped.
179 : //!
180 : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
181 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
182 :
183 : pub(crate) mod download;
184 : pub mod index;
185 : pub(crate) mod upload;
186 :
187 : use anyhow::Context;
188 : use camino::Utf8Path;
189 : use chrono::{NaiveDateTime, Utc};
190 :
191 : pub(crate) use download::download_initdb_tar_zst;
192 : use pageserver_api::shard::{ShardIndex, TenantShardId};
193 : use scopeguard::ScopeGuard;
194 : use tokio_util::sync::CancellationToken;
195 : pub(crate) use upload::upload_initdb_dir;
196 : use utils::backoff::{
197 : self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
198 : };
199 : use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
200 :
201 : use std::collections::{HashMap, VecDeque};
202 : use std::sync::atomic::{AtomicU32, Ordering};
203 : use std::sync::{Arc, Mutex};
204 : use std::time::Duration;
205 :
206 : use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
207 : use std::ops::DerefMut;
208 : use tracing::{debug, error, info, instrument, warn};
209 : use tracing::{info_span, Instrument};
210 : use utils::lsn::Lsn;
211 :
212 : use crate::deletion_queue::DeletionQueueClient;
213 : use crate::metrics::{
214 : MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
215 : RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
216 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
217 : };
218 : use crate::task_mgr::shutdown_token;
219 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
220 : use crate::tenant::storage_layer::AsLayerDesc;
221 : use crate::tenant::upload_queue::Delete;
222 : use crate::tenant::TIMELINES_SEGMENT_NAME;
223 : use crate::{
224 : config::PageServerConf,
225 : task_mgr,
226 : task_mgr::TaskKind,
227 : task_mgr::BACKGROUND_RUNTIME,
228 : tenant::metadata::TimelineMetadata,
229 : tenant::upload_queue::{
230 : UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
231 : },
232 : TENANT_HEATMAP_BASENAME,
233 : };
234 :
235 : use utils::id::{TenantId, TimelineId};
236 :
237 : use self::index::IndexPart;
238 :
239 : use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
240 : use super::upload_queue::SetDeletedFlagProgress;
241 : use super::Generation;
242 :
243 : pub(crate) use download::{is_temp_download_file, list_remote_timelines};
244 : pub(crate) use index::LayerFileMetadata;
245 :
246 : // Occasional network issues and such can cause remote operations to fail, and
247 : // that's expected. If a download fails, we log it at info-level, and retry.
248 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
249 : // level instead, as repeated failures can mean a more serious problem. If it
250 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
251 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
252 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
253 :
254 : // Similarly log failed uploads and deletions at WARN level, after this many
255 : // retries. Uploads and deletions are retried forever, though.
256 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
257 :
258 : pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
259 :
260 : pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst";
261 :
262 : /// Default buffer size when interfacing with [`tokio::fs::File`].
263 : pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
264 :
265 : pub enum MaybeDeletedIndexPart {
266 : IndexPart(IndexPart),
267 : Deleted(IndexPart),
268 : }
269 :
270 : /// Errors that can arise when calling [`RemoteTimelineClient::stop`].
271 0 : #[derive(Debug, thiserror::Error)]
272 : pub enum StopError {
273 : /// Returned if the upload queue was never initialized.
274 : /// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
275 : #[error("queue is not initialized")]
276 : QueueUninitialized,
277 : }
278 :
279 0 : #[derive(Debug, thiserror::Error)]
280 : pub enum PersistIndexPartWithDeletedFlagError {
281 : #[error("another task is already setting the deleted_flag, started at {0:?}")]
282 : AlreadyInProgress(NaiveDateTime),
283 : #[error("the deleted_flag was already set, value is {0:?}")]
284 : AlreadyDeleted(NaiveDateTime),
285 : #[error(transparent)]
286 : Other(#[from] anyhow::Error),
287 : }
288 :
289 : /// A client for accessing a timeline's data in remote storage.
290 : ///
291 : /// This takes care of managing the number of connections, and balancing them
292 : /// across tenants. This also handles retries of failed uploads.
293 : ///
294 : /// Upload and delete requests are ordered so that before a deletion is
295 : /// performed, we wait for all preceding uploads to finish. This ensures sure
296 : /// that if you perform a compaction operation that reshuffles data in layer
297 : /// files, we don't have a transient state where the old files have already been
298 : /// deleted, but new files have not yet been uploaded.
299 : ///
300 : /// Similarly, this enforces an order between index-file uploads, and layer
301 : /// uploads. Before an index-file upload is performed, all preceding layer
302 : /// uploads must be finished.
303 : ///
304 : /// This also maintains a list of remote files, and automatically includes that
305 : /// in the index part file, whenever timeline metadata is uploaded.
306 : ///
307 : /// Downloads are not queued, they are performed immediately.
308 : pub struct RemoteTimelineClient {
309 : conf: &'static PageServerConf,
310 :
311 : runtime: tokio::runtime::Handle,
312 :
313 : tenant_shard_id: TenantShardId,
314 : timeline_id: TimelineId,
315 : generation: Generation,
316 :
317 : upload_queue: Mutex<UploadQueue>,
318 :
319 : metrics: Arc<RemoteTimelineClientMetrics>,
320 :
321 : storage_impl: GenericRemoteStorage,
322 :
323 : deletion_queue_client: DeletionQueueClient,
324 :
325 : cancel: CancellationToken,
326 : }
327 :
328 : /// This timeout is intended to deal with hangs in lower layers, e.g. stuck TCP flows. It is not
329 : /// intended to be snappy enough for prompt shutdown, as we have a CancellationToken for that.
330 : const UPLOAD_TIMEOUT: Duration = Duration::from_secs(120);
331 : const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(120);
332 :
333 : /// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to anyhow.
334 : ///
335 : /// This is a convenience for the various upload functions. In future
336 : /// the anyhow::Error result should be replaced with a more structured type that
337 : /// enables callers to avoid handling shutdown as an error.
338 30786 : async fn upload_cancellable<F>(cancel: &CancellationToken, future: F) -> anyhow::Result<()>
339 30786 : where
340 30786 : F: std::future::Future<Output = anyhow::Result<()>>,
341 30786 : {
342 1101913 : match timeout_cancellable(UPLOAD_TIMEOUT, cancel, future).await {
343 28138 : Ok(Ok(())) => Ok(()),
344 2642 : Ok(Err(e)) => Err(e),
345 0 : Err(TimeoutCancellableError::Timeout) => Err(anyhow::anyhow!("Timeout")),
346 0 : Err(TimeoutCancellableError::Cancelled) => Err(anyhow::anyhow!("Shutting down")),
347 : }
348 30780 : }
349 : /// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to DownloaDError.
350 13095 : async fn download_cancellable<F, R>(
351 13095 : cancel: &CancellationToken,
352 13095 : future: F,
353 13095 : ) -> Result<R, DownloadError>
354 13095 : where
355 13095 : F: std::future::Future<Output = Result<R, DownloadError>>,
356 13095 : {
357 34425 : match timeout_cancellable(DOWNLOAD_TIMEOUT, cancel, future).await {
358 12323 : Ok(Ok(r)) => Ok(r),
359 767 : Ok(Err(e)) => Err(e),
360 : Err(TimeoutCancellableError::Timeout) => {
361 0 : Err(DownloadError::Other(anyhow::anyhow!("Timed out")))
362 : }
363 5 : Err(TimeoutCancellableError::Cancelled) => Err(DownloadError::Cancelled),
364 : }
365 13095 : }
366 :
367 : impl RemoteTimelineClient {
368 : ///
369 : /// Create a remote storage client for given timeline
370 : ///
371 : /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
372 : /// by calling init_upload_queue.
373 : ///
374 1571 : pub fn new(
375 1571 : remote_storage: GenericRemoteStorage,
376 1571 : deletion_queue_client: DeletionQueueClient,
377 1571 : conf: &'static PageServerConf,
378 1571 : tenant_shard_id: TenantShardId,
379 1571 : timeline_id: TimelineId,
380 1571 : generation: Generation,
381 1571 : ) -> RemoteTimelineClient {
382 1571 : RemoteTimelineClient {
383 1571 : conf,
384 1571 : runtime: if cfg!(test) {
385 : // remote_timeline_client.rs tests rely on current-thread runtime
386 290 : tokio::runtime::Handle::current()
387 : } else {
388 1281 : BACKGROUND_RUNTIME.handle().clone()
389 : },
390 1571 : tenant_shard_id,
391 1571 : timeline_id,
392 1571 : generation,
393 1571 : storage_impl: remote_storage,
394 1571 : deletion_queue_client,
395 1571 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
396 1571 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
397 1571 : &tenant_shard_id,
398 1571 : &timeline_id,
399 1571 : )),
400 1571 : cancel: CancellationToken::new(),
401 1571 : }
402 1571 : }
403 :
404 : /// Initialize the upload queue for a remote storage that already received
405 : /// an index file upload, i.e., it's not empty.
406 : /// The given `index_part` must be the one on the remote.
407 412 : pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
408 412 : let mut upload_queue = self.upload_queue.lock().unwrap();
409 412 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
410 412 : self.update_remote_physical_size_gauge(Some(index_part));
411 412 : info!(
412 412 : "initialized upload queue from remote index with {} layer files",
413 412 : index_part.layer_metadata.len()
414 412 : );
415 412 : Ok(())
416 412 : }
417 :
418 : /// Initialize the upload queue for the case where the remote storage is empty,
419 : /// i.e., it doesn't have an `IndexPart`.
420 1144 : pub fn init_upload_queue_for_empty_remote(
421 1144 : &self,
422 1144 : local_metadata: &TimelineMetadata,
423 1144 : ) -> anyhow::Result<()> {
424 1144 : let mut upload_queue = self.upload_queue.lock().unwrap();
425 1144 : upload_queue.initialize_empty_remote(local_metadata)?;
426 1144 : self.update_remote_physical_size_gauge(None);
427 1144 : info!("initialized upload queue as empty");
428 1144 : Ok(())
429 1144 : }
430 :
431 : /// Initialize the queue in stopped state. Used in startup path
432 : /// to continue deletion operation interrupted by pageserver crash or restart.
433 12 : pub fn init_upload_queue_stopped_to_continue_deletion(
434 12 : &self,
435 12 : index_part: &IndexPart,
436 12 : ) -> anyhow::Result<()> {
437 : // FIXME: consider newtype for DeletedIndexPart.
438 12 : let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
439 12 : "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
440 12 : ))?;
441 :
442 : {
443 12 : let mut upload_queue = self.upload_queue.lock().unwrap();
444 12 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
445 12 : self.update_remote_physical_size_gauge(Some(index_part));
446 12 : }
447 12 : // also locks upload queue, without dropping the guard above it will be a deadlock
448 12 : self.stop().expect("initialized line above");
449 12 :
450 12 : let mut upload_queue = self.upload_queue.lock().unwrap();
451 12 :
452 12 : upload_queue
453 12 : .stopped_mut()
454 12 : .expect("stopped above")
455 12 : .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
456 12 :
457 12 : Ok(())
458 12 : }
459 :
460 3042 : pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
461 3042 : match &mut *self.upload_queue.lock().unwrap() {
462 0 : UploadQueue::Uninitialized => None,
463 2929 : UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
464 113 : UploadQueue::Stopped(q) => q
465 113 : .upload_queue_for_deletion
466 113 : .get_last_remote_consistent_lsn_projected(),
467 : }
468 3042 : }
469 :
470 798512 : pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
471 798512 : match &mut *self.upload_queue.lock().unwrap() {
472 0 : UploadQueue::Uninitialized => None,
473 798399 : UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
474 113 : UploadQueue::Stopped(q) => Some(
475 113 : q.upload_queue_for_deletion
476 113 : .get_last_remote_consistent_lsn_visible(),
477 113 : ),
478 : }
479 798512 : }
480 :
481 7629 : fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
482 7629 : let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
483 6485 : current_remote_index_part
484 6485 : .layer_metadata
485 6485 : .values()
486 6485 : // If we don't have the file size for the layer, don't account for it in the metric.
487 662471 : .map(|ilmd| ilmd.file_size)
488 6485 : .sum()
489 : } else {
490 1144 : 0
491 : };
492 7629 : self.metrics.remote_physical_size_set(size);
493 7629 : }
494 :
495 15 : pub fn get_remote_physical_size(&self) -> u64 {
496 15 : self.metrics.remote_physical_size_get()
497 15 : }
498 :
499 : //
500 : // Download operations.
501 : //
502 : // These don't use the per-timeline queue. They do use the global semaphore in
503 : // S3Bucket, to limit the total number of concurrent operations, though.
504 : //
505 :
506 : /// Download index file
507 441 : pub async fn download_index_file(
508 441 : &self,
509 441 : cancel: CancellationToken,
510 441 : ) -> Result<MaybeDeletedIndexPart, DownloadError> {
511 441 : let _unfinished_gauge_guard = self.metrics.call_begin(
512 441 : &RemoteOpFileKind::Index,
513 441 : &RemoteOpKind::Download,
514 441 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
515 441 : reason: "no need for a downloads gauge",
516 441 : },
517 441 : );
518 :
519 441 : let index_part = download::download_index_part(
520 441 : &self.storage_impl,
521 441 : &self.tenant_shard_id,
522 441 : &self.timeline_id,
523 441 : self.generation,
524 441 : cancel,
525 441 : )
526 441 : .measure_remote_op(
527 441 : RemoteOpFileKind::Index,
528 441 : RemoteOpKind::Download,
529 441 : Arc::clone(&self.metrics),
530 441 : )
531 2985 : .await?;
532 :
533 438 : if index_part.deleted_at.is_some() {
534 12 : Ok(MaybeDeletedIndexPart::Deleted(index_part))
535 : } else {
536 426 : Ok(MaybeDeletedIndexPart::IndexPart(index_part))
537 : }
538 441 : }
539 :
540 : /// Download a (layer) file from `path`, into local filesystem.
541 : ///
542 : /// 'layer_metadata' is the metadata from the remote index file.
543 : ///
544 : /// On success, returns the size of the downloaded file.
545 9784 : pub async fn download_layer_file(
546 9784 : &self,
547 9784 : layer_file_name: &LayerFileName,
548 9784 : layer_metadata: &LayerFileMetadata,
549 9784 : cancel: &CancellationToken,
550 9784 : ) -> anyhow::Result<u64> {
551 9769 : let downloaded_size = {
552 9784 : let _unfinished_gauge_guard = self.metrics.call_begin(
553 9784 : &RemoteOpFileKind::Layer,
554 9784 : &RemoteOpKind::Download,
555 9784 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
556 9784 : reason: "no need for a downloads gauge",
557 9784 : },
558 9784 : );
559 9784 : download::download_layer_file(
560 9784 : self.conf,
561 9784 : &self.storage_impl,
562 9784 : self.tenant_shard_id,
563 9784 : self.timeline_id,
564 9784 : layer_file_name,
565 9784 : layer_metadata,
566 9784 : cancel,
567 9784 : )
568 9784 : .measure_remote_op(
569 9784 : RemoteOpFileKind::Layer,
570 9784 : RemoteOpKind::Download,
571 9784 : Arc::clone(&self.metrics),
572 9784 : )
573 406494 : .await?
574 : };
575 :
576 9769 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
577 9769 : REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
578 9769 :
579 9769 : Ok(downloaded_size)
580 9783 : }
581 :
582 : //
583 : // Upload operations.
584 : //
585 :
586 : ///
587 : /// Launch an index-file upload operation in the background, with
588 : /// updated metadata.
589 : ///
590 : /// The upload will be added to the queue immediately, but it
591 : /// won't be performed until all previously scheduled layer file
592 : /// upload operations have completed successfully. This is to
593 : /// ensure that when the index file claims that layers X, Y and Z
594 : /// exist in remote storage, they really do. To wait for the upload
595 : /// to complete, use `wait_completion`.
596 : ///
597 : /// If there were any changes to the list of files, i.e. if any
598 : /// layer file uploads were scheduled, since the last index file
599 : /// upload, those will be included too.
600 5751 : pub fn schedule_index_upload_for_metadata_update(
601 5751 : self: &Arc<Self>,
602 5751 : metadata: &TimelineMetadata,
603 5751 : ) -> anyhow::Result<()> {
604 5751 : let mut guard = self.upload_queue.lock().unwrap();
605 5751 : let upload_queue = guard.initialized_mut()?;
606 :
607 : // As documented in the struct definition, it's ok for latest_metadata to be
608 : // ahead of what's _actually_ on the remote during index upload.
609 5751 : upload_queue.latest_metadata = metadata.clone();
610 5751 :
611 5751 : self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
612 5751 :
613 5751 : Ok(())
614 5751 : }
615 :
616 : ///
617 : /// Launch an index-file upload operation in the background, if necessary.
618 : ///
619 : /// Use this function to schedule the update of the index file after
620 : /// scheduling file uploads or deletions. If no file uploads or deletions
621 : /// have been scheduled since the last index file upload, this does
622 : /// nothing.
623 : ///
624 : /// Like schedule_index_upload_for_metadata_update(), this merely adds
625 : /// the upload to the upload queue and returns quickly.
626 1929 : pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
627 1929 : let mut guard = self.upload_queue.lock().unwrap();
628 1929 : let upload_queue = guard.initialized_mut()?;
629 :
630 1929 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
631 93 : self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
632 1836 : }
633 :
634 1929 : Ok(())
635 1929 : }
636 :
637 : /// Launch an index-file upload operation in the background (internal function)
638 6162 : fn schedule_index_upload(
639 6162 : self: &Arc<Self>,
640 6162 : upload_queue: &mut UploadQueueInitialized,
641 6162 : metadata: TimelineMetadata,
642 6162 : ) {
643 6162 : info!(
644 6162 : "scheduling metadata upload with {} files ({} changed)",
645 6162 : upload_queue.latest_files.len(),
646 6162 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
647 6162 : );
648 :
649 6162 : let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
650 6162 :
651 6162 : let index_part = IndexPart::new(
652 6162 : upload_queue.latest_files.clone(),
653 6162 : disk_consistent_lsn,
654 6162 : metadata,
655 6162 : );
656 6162 : let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
657 6162 : self.calls_unfinished_metric_begin(&op);
658 6162 : upload_queue.queued_operations.push_back(op);
659 6162 : upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
660 6162 :
661 6162 : // Launch the task immediately, if possible
662 6162 : self.launch_queued_tasks(upload_queue);
663 6162 : }
664 :
665 : ///
666 : /// Launch an upload operation in the background.
667 : ///
668 11220 : pub(crate) fn schedule_layer_file_upload(
669 11220 : self: &Arc<Self>,
670 11220 : layer: ResidentLayer,
671 11220 : ) -> anyhow::Result<()> {
672 11220 : let mut guard = self.upload_queue.lock().unwrap();
673 11220 : let upload_queue = guard.initialized_mut()?;
674 :
675 11220 : self.schedule_layer_file_upload0(upload_queue, layer);
676 11220 : self.launch_queued_tasks(upload_queue);
677 11220 : Ok(())
678 11220 : }
679 :
680 21990 : fn schedule_layer_file_upload0(
681 21990 : self: &Arc<Self>,
682 21990 : upload_queue: &mut UploadQueueInitialized,
683 21990 : layer: ResidentLayer,
684 21990 : ) {
685 21990 : let metadata = layer.metadata();
686 21990 :
687 21990 : upload_queue
688 21990 : .latest_files
689 21990 : .insert(layer.layer_desc().filename(), metadata.clone());
690 21990 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
691 21990 :
692 21990 : info!(
693 21990 : "scheduled layer file upload {layer} gen={:?} shard={:?}",
694 21990 : metadata.generation, metadata.shard
695 21990 : );
696 21990 : let op = UploadOp::UploadLayer(layer, metadata);
697 21990 : self.calls_unfinished_metric_begin(&op);
698 21990 : upload_queue.queued_operations.push_back(op);
699 21990 : }
700 :
701 : /// Launch a delete operation in the background.
702 : ///
703 : /// The operation does not modify local filesystem state.
704 : ///
705 : /// Note: This schedules an index file upload before the deletions. The
706 : /// deletion won't actually be performed, until all previously scheduled
707 : /// upload operations, and the index file upload, have completed
708 : /// successfully.
709 414 : pub fn schedule_layer_file_deletion(
710 414 : self: &Arc<Self>,
711 414 : names: &[LayerFileName],
712 414 : ) -> anyhow::Result<()> {
713 414 : let mut guard = self.upload_queue.lock().unwrap();
714 414 : let upload_queue = guard.initialized_mut()?;
715 :
716 414 : let with_metadata =
717 414 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
718 414 :
719 414 : self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
720 414 :
721 414 : // Launch the tasks immediately, if possible
722 414 : self.launch_queued_tasks(upload_queue);
723 414 : Ok(())
724 414 : }
725 :
726 : /// Unlinks the layer files from `index_part.json` but does not yet schedule deletion for the
727 : /// layer files, leaving them dangling.
728 : ///
729 : /// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
730 : /// is invoked on them.
731 20 : pub(crate) fn schedule_gc_update(self: &Arc<Self>, gc_layers: &[Layer]) -> anyhow::Result<()> {
732 20 : let mut guard = self.upload_queue.lock().unwrap();
733 20 : let upload_queue = guard.initialized_mut()?;
734 :
735 : // just forget the return value; after uploading the next index_part.json, we can consider
736 : // the layer files as "dangling". this is fine, at worst case we create work for the
737 : // scrubber.
738 :
739 1197 : let names = gc_layers.iter().map(|x| x.layer_desc().filename());
740 20 :
741 20 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
742 20 :
743 20 : self.launch_queued_tasks(upload_queue);
744 20 :
745 20 : Ok(())
746 20 : }
747 :
748 : /// Update the remote index file, removing the to-be-deleted files from the index,
749 : /// allowing scheduling of actual deletions later.
750 729 : fn schedule_unlinking_of_layers_from_index_part0<I>(
751 729 : self: &Arc<Self>,
752 729 : upload_queue: &mut UploadQueueInitialized,
753 729 : names: I,
754 729 : ) -> Vec<(LayerFileName, LayerFileMetadata)>
755 729 : where
756 729 : I: IntoIterator<Item = LayerFileName>,
757 729 : {
758 729 : // Deleting layers doesn't affect the values stored in TimelineMetadata,
759 729 : // so we don't need update it. Just serialize it.
760 729 : let metadata = upload_queue.latest_metadata.clone();
761 729 :
762 729 : // Decorate our list of names with each name's metadata, dropping
763 729 : // names that are unexpectedly missing from our metadata. This metadata
764 729 : // is later used when physically deleting layers, to construct key paths.
765 729 : let with_metadata: Vec<_> = names
766 729 : .into_iter()
767 5414 : .filter_map(|name| {
768 5414 : let meta = upload_queue.latest_files.remove(&name);
769 :
770 5414 : if let Some(meta) = meta {
771 5413 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
772 5413 : Some((name, meta))
773 : } else {
774 : // This can only happen if we forgot to to schedule the file upload
775 : // before scheduling the delete. Log it because it is a rare/strange
776 : // situation, and in case something is misbehaving, we'd like to know which
777 : // layers experienced this.
778 1 : info!("Deleting layer {name} not found in latest_files list, never uploaded?");
779 1 : None
780 : }
781 5414 : })
782 729 : .collect();
783 :
784 : #[cfg(feature = "testing")]
785 6142 : for (name, metadata) in &with_metadata {
786 5413 : let gen = metadata.generation;
787 5413 : if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen) {
788 0 : if unexpected == gen {
789 0 : tracing::error!("{name} was unlinked twice with same generation");
790 : } else {
791 0 : tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
792 : }
793 5413 : }
794 : }
795 :
796 : // after unlinking files from the upload_queue.latest_files we must always schedule an
797 : // index_part update, because that needs to be uploaded before we can actually delete the
798 : // files.
799 729 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
800 318 : self.schedule_index_upload(upload_queue, metadata);
801 411 : }
802 :
803 729 : with_metadata
804 729 : }
805 :
806 : /// Schedules deletion for layer files which have previously been unlinked from the
807 : /// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
808 5410 : pub(crate) fn schedule_deletion_of_unlinked(
809 5410 : self: &Arc<Self>,
810 5410 : layers: Vec<(LayerFileName, LayerFileMetadata)>,
811 5410 : ) -> anyhow::Result<()> {
812 5410 : let mut guard = self.upload_queue.lock().unwrap();
813 5410 : let upload_queue = guard.initialized_mut()?;
814 :
815 5396 : self.schedule_deletion_of_unlinked0(upload_queue, layers);
816 5396 : self.launch_queued_tasks(upload_queue);
817 5396 : Ok(())
818 5410 : }
819 :
820 5810 : fn schedule_deletion_of_unlinked0(
821 5810 : self: &Arc<Self>,
822 5810 : upload_queue: &mut UploadQueueInitialized,
823 5810 : mut with_metadata: Vec<(LayerFileName, LayerFileMetadata)>,
824 5810 : ) {
825 5810 : // Filter out any layers which were not created by this tenant shard. These are
826 5810 : // layers that originate from some ancestor shard after a split, and may still
827 5810 : // be referenced by other shards. We are free to delete them locally and remove
828 5810 : // them from our index (and would have already done so when we reach this point
829 5810 : // in the code), but we may not delete them remotely.
830 5810 : with_metadata.retain(|(name, meta)| {
831 5399 : let retain = meta.shard.shard_number == self.tenant_shard_id.shard_number
832 5399 : && meta.shard.shard_count == self.tenant_shard_id.shard_count;
833 5399 : if !retain {
834 0 : tracing::debug!(
835 0 : "Skipping deletion of ancestor-shard layer {name}, from shard {}",
836 0 : meta.shard
837 0 : );
838 5399 : }
839 5399 : retain
840 5810 : });
841 :
842 11209 : for (name, meta) in &with_metadata {
843 5399 : info!(
844 5399 : "scheduling deletion of layer {}{} (shard {})",
845 5399 : name,
846 5399 : meta.generation.get_suffix(),
847 5399 : meta.shard
848 5399 : );
849 : }
850 :
851 : #[cfg(feature = "testing")]
852 11209 : for (name, meta) in &with_metadata {
853 5399 : let gen = meta.generation;
854 5399 : match upload_queue.dangling_files.remove(name) {
855 5399 : Some(same) if same == gen => { /* expected */ }
856 0 : Some(other) => {
857 0 : tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
858 : }
859 : None => {
860 0 : tracing::error!("{name} was unlinked but was not dangling");
861 : }
862 : }
863 : }
864 :
865 : // schedule the actual deletions
866 5810 : let op = UploadOp::Delete(Delete {
867 5810 : layers: with_metadata,
868 5810 : });
869 5810 : self.calls_unfinished_metric_begin(&op);
870 5810 : upload_queue.queued_operations.push_back(op);
871 5810 : }
872 :
873 : /// Schedules a compaction update to the remote `index_part.json`.
874 : ///
875 : /// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
876 295 : pub(crate) fn schedule_compaction_update(
877 295 : self: &Arc<Self>,
878 295 : compacted_from: &[Layer],
879 295 : compacted_to: &[ResidentLayer],
880 295 : ) -> anyhow::Result<()> {
881 295 : let mut guard = self.upload_queue.lock().unwrap();
882 295 : let upload_queue = guard.initialized_mut()?;
883 :
884 11065 : for layer in compacted_to {
885 10770 : self.schedule_layer_file_upload0(upload_queue, layer.clone());
886 10770 : }
887 :
888 4213 : let names = compacted_from.iter().map(|x| x.layer_desc().filename());
889 295 :
890 295 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
891 295 : self.launch_queued_tasks(upload_queue);
892 295 :
893 295 : Ok(())
894 295 : }
895 :
896 : /// Wait for all previously scheduled uploads/deletions to complete
897 1194 : pub(crate) async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
898 1194 : let mut receiver = {
899 1194 : let mut guard = self.upload_queue.lock().unwrap();
900 1194 : let upload_queue = guard.initialized_mut()?;
901 1194 : self.schedule_barrier0(upload_queue)
902 1194 : };
903 1194 :
904 1194 : if receiver.changed().await.is_err() {
905 0 : anyhow::bail!("wait_completion aborted because upload queue was stopped");
906 1190 : }
907 1190 :
908 1190 : Ok(())
909 1190 : }
910 :
911 412 : pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
912 412 : let mut guard = self.upload_queue.lock().unwrap();
913 412 : let upload_queue = guard.initialized_mut()?;
914 412 : self.schedule_barrier0(upload_queue);
915 412 : Ok(())
916 412 : }
917 :
918 1606 : fn schedule_barrier0(
919 1606 : self: &Arc<Self>,
920 1606 : upload_queue: &mut UploadQueueInitialized,
921 1606 : ) -> tokio::sync::watch::Receiver<()> {
922 1606 : let (sender, receiver) = tokio::sync::watch::channel(());
923 1606 : let barrier_op = UploadOp::Barrier(sender);
924 1606 :
925 1606 : upload_queue.queued_operations.push_back(barrier_op);
926 1606 : // Don't count this kind of operation!
927 1606 :
928 1606 : // Launch the task immediately, if possible
929 1606 : self.launch_queued_tasks(upload_queue);
930 1606 :
931 1606 : receiver
932 1606 : }
933 :
934 : /// Wait for all previously scheduled operations to complete, and then stop.
935 : ///
936 : /// Not cancellation safe
937 205 : pub(crate) async fn shutdown(self: &Arc<Self>) -> Result<(), StopError> {
938 205 : // On cancellation the queue is left in ackward state of refusing new operations but
939 205 : // proper stop is yet to be called. On cancel the original or some later task must call
940 205 : // `stop` or `shutdown`.
941 205 : let sg = scopeguard::guard((), |_| {
942 0 : tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
943 205 : });
944 :
945 205 : let fut = {
946 205 : let mut guard = self.upload_queue.lock().unwrap();
947 205 : let upload_queue = match &mut *guard {
948 0 : UploadQueue::Stopped(_) => return Ok(()),
949 0 : UploadQueue::Uninitialized => return Err(StopError::QueueUninitialized),
950 205 : UploadQueue::Initialized(ref mut init) => init,
951 205 : };
952 205 :
953 205 : // if the queue is already stuck due to a shutdown operation which was cancelled, then
954 205 : // just don't add more of these as they would never complete.
955 205 : //
956 205 : // TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue
957 205 : // in every place we would not have to jump through this hoop, and this method could be
958 205 : // made cancellable.
959 205 : if !upload_queue.shutting_down {
960 205 : upload_queue.shutting_down = true;
961 205 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
962 205 : // this operation is not counted similar to Barrier
963 205 :
964 205 : self.launch_queued_tasks(upload_queue);
965 205 : }
966 :
967 205 : upload_queue.shutdown_ready.clone().acquire_owned()
968 : };
969 :
970 205 : let res = fut.await;
971 :
972 205 : scopeguard::ScopeGuard::into_inner(sg);
973 205 :
974 205 : match res {
975 0 : Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
976 205 : Err(_closed) => {
977 205 : // expected
978 205 : }
979 205 : }
980 205 :
981 205 : self.stop()
982 205 : }
983 :
984 : /// Set the deleted_at field in the remote index file.
985 : ///
986 : /// This fails if the upload queue has not been `stop()`ed.
987 : ///
988 : /// The caller is responsible for calling `stop()` AND for waiting
989 : /// for any ongoing upload tasks to finish after `stop()` has succeeded.
990 : /// Check method [`RemoteTimelineClient::stop`] for details.
991 0 : #[instrument(skip_all)]
992 : pub(crate) async fn persist_index_part_with_deleted_flag(
993 : self: &Arc<Self>,
994 : ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
995 : let index_part_with_deleted_at = {
996 : let mut locked = self.upload_queue.lock().unwrap();
997 :
998 : // We must be in stopped state because otherwise
999 : // we can have inprogress index part upload that can overwrite the file
1000 : // with missing is_deleted flag that we going to set below
1001 : let stopped = locked.stopped_mut()?;
1002 :
1003 : match stopped.deleted_at {
1004 : SetDeletedFlagProgress::NotRunning => (), // proceed
1005 : SetDeletedFlagProgress::InProgress(at) => {
1006 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
1007 : }
1008 : SetDeletedFlagProgress::Successful(at) => {
1009 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
1010 : }
1011 : };
1012 : let deleted_at = Utc::now().naive_utc();
1013 : stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
1014 :
1015 : let mut index_part = IndexPart::try_from(&stopped.upload_queue_for_deletion)
1016 : .context("IndexPart serialize")?;
1017 : index_part.deleted_at = Some(deleted_at);
1018 : index_part
1019 : };
1020 :
1021 0 : let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
1022 0 : let mut locked = self_clone.upload_queue.lock().unwrap();
1023 0 : let stopped = locked
1024 0 : .stopped_mut()
1025 0 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1026 0 : stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
1027 0 : });
1028 :
1029 177 : pausable_failpoint!("persist_deleted_index_part");
1030 :
1031 : backoff::retry(
1032 232 : || {
1033 232 : upload::upload_index_part(
1034 232 : &self.storage_impl,
1035 232 : &self.tenant_shard_id,
1036 232 : &self.timeline_id,
1037 232 : self.generation,
1038 232 : &index_part_with_deleted_at,
1039 232 : &self.cancel,
1040 232 : )
1041 232 : },
1042 55 : |_e| false,
1043 : 1,
1044 : // have just a couple of attempts
1045 : // when executed as part of timeline deletion this happens in context of api call
1046 : // when executed as part of tenant deletion this happens in the background
1047 : 2,
1048 : "persist_index_part_with_deleted_flag",
1049 : &self.cancel,
1050 : )
1051 : .await
1052 0 : .ok_or_else(|| anyhow::anyhow!("Cancelled"))
1053 177 : .and_then(|x| x)?;
1054 :
1055 : // all good, disarm the guard and mark as success
1056 : ScopeGuard::into_inner(undo_deleted_at);
1057 : {
1058 : let mut locked = self.upload_queue.lock().unwrap();
1059 :
1060 : let stopped = locked
1061 : .stopped_mut()
1062 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1063 : stopped.deleted_at = SetDeletedFlagProgress::Successful(
1064 : index_part_with_deleted_at
1065 : .deleted_at
1066 : .expect("we set it above"),
1067 : );
1068 : }
1069 :
1070 : Ok(())
1071 : }
1072 :
1073 2 : pub(crate) async fn preserve_initdb_archive(
1074 2 : self: &Arc<Self>,
1075 2 : tenant_id: &TenantId,
1076 2 : timeline_id: &TimelineId,
1077 2 : cancel: &CancellationToken,
1078 2 : ) -> anyhow::Result<()> {
1079 2 : backoff::retry(
1080 2 : || async {
1081 2 : upload::preserve_initdb_archive(&self.storage_impl, tenant_id, timeline_id, cancel)
1082 2 : .await
1083 2 : },
1084 2 : |_e| false,
1085 2 : FAILED_DOWNLOAD_WARN_THRESHOLD,
1086 2 : FAILED_REMOTE_OP_RETRIES,
1087 2 : "preserve_initdb_tar_zst",
1088 2 : &cancel.clone(),
1089 2 : )
1090 2 : .await
1091 2 : .ok_or_else(|| anyhow::anyhow!("Cancellled"))
1092 2 : .and_then(|x| x)
1093 2 : .context("backing up initdb archive")?;
1094 2 : Ok(())
1095 2 : }
1096 :
1097 : /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
1098 : /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
1099 : /// deletes leaked files if any and proceeds with deletion of index file at the end.
1100 189 : pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
1101 189 : debug_assert_current_span_has_tenant_and_timeline_id();
1102 :
1103 189 : let layers: Vec<RemotePath> = {
1104 189 : let mut locked = self.upload_queue.lock().unwrap();
1105 189 : let stopped = locked.stopped_mut()?;
1106 :
1107 189 : if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
1108 0 : anyhow::bail!("deleted_at is not set")
1109 189 : }
1110 :
1111 189 : debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
1112 :
1113 189 : stopped
1114 189 : .upload_queue_for_deletion
1115 189 : .latest_files
1116 189 : .drain()
1117 4528 : .map(|(file_name, meta)| {
1118 4528 : remote_layer_path(
1119 4528 : &self.tenant_shard_id.tenant_id,
1120 4528 : &self.timeline_id,
1121 4528 : meta.shard,
1122 4528 : &file_name,
1123 4528 : meta.generation,
1124 4528 : )
1125 4528 : })
1126 189 : .collect()
1127 189 : };
1128 189 :
1129 189 : let layer_deletion_count = layers.len();
1130 189 : self.deletion_queue_client.push_immediate(layers).await?;
1131 :
1132 : // Delete the initdb.tar.zst, which is not always present, but deletion attempts of
1133 : // inexistant objects are not considered errors.
1134 189 : let initdb_path =
1135 189 : remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id);
1136 189 : self.deletion_queue_client
1137 189 : .push_immediate(vec![initdb_path])
1138 0 : .await?;
1139 :
1140 : // Do not delete index part yet, it is needed for possible retry. If we remove it first
1141 : // and retry will arrive to different pageserver there wont be any traces of it on remote storage
1142 189 : let timeline_storage_path = remote_timeline_path(&self.tenant_shard_id, &self.timeline_id);
1143 189 :
1144 189 : // Execute all pending deletions, so that when we proceed to do a list_prefixes below, we aren't
1145 189 : // taking the burden of listing all the layers that we already know we should delete.
1146 189 : self.deletion_queue_client.flush_immediate().await?;
1147 :
1148 189 : let cancel = shutdown_token();
1149 :
1150 189 : let remaining = backoff::retry(
1151 247 : || async {
1152 247 : self.storage_impl
1153 247 : .list_files(Some(&timeline_storage_path))
1154 779 : .await
1155 247 : },
1156 189 : |_e| false,
1157 189 : FAILED_DOWNLOAD_WARN_THRESHOLD,
1158 189 : FAILED_REMOTE_OP_RETRIES,
1159 189 : "list_prefixes",
1160 189 : &cancel,
1161 189 : )
1162 779 : .await
1163 189 : .ok_or_else(|| anyhow::anyhow!("Cancelled!"))
1164 189 : .and_then(|x| x)
1165 189 : .context("list prefixes")?;
1166 :
1167 : // We will delete the current index_part object last, since it acts as a deletion
1168 : // marker via its deleted_at attribute
1169 189 : let latest_index = remaining
1170 189 : .iter()
1171 2152 : .filter(|p| {
1172 2152 : p.object_name()
1173 2152 : .map(|n| n.starts_with(IndexPart::FILE_NAME))
1174 2152 : .unwrap_or(false)
1175 2152 : })
1176 224 : .filter_map(|path| parse_remote_index_path(path.clone()).map(|gen| (path, gen)))
1177 224 : .max_by_key(|i| i.1)
1178 189 : .map(|i| i.0.clone())
1179 189 : .unwrap_or(
1180 189 : // No generation-suffixed indices, assume we are dealing with
1181 189 : // a legacy index.
1182 189 : remote_index_path(&self.tenant_shard_id, &self.timeline_id, Generation::none()),
1183 189 : );
1184 189 :
1185 189 : let remaining_layers: Vec<RemotePath> = remaining
1186 189 : .into_iter()
1187 2152 : .filter(|p| {
1188 2152 : if p == &latest_index {
1189 184 : return false;
1190 1968 : }
1191 1968 : if p.object_name() == Some(INITDB_PRESERVED_PATH) {
1192 2 : return false;
1193 1966 : }
1194 1966 : true
1195 2152 : })
1196 1966 : .inspect(|path| {
1197 1966 : if let Some(name) = path.object_name() {
1198 1966 : info!(%name, "deleting a file not referenced from index_part.json");
1199 : } else {
1200 0 : warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
1201 : }
1202 1966 : })
1203 189 : .collect();
1204 189 :
1205 189 : let not_referenced_count = remaining_layers.len();
1206 189 : if !remaining_layers.is_empty() {
1207 103 : self.deletion_queue_client
1208 103 : .push_immediate(remaining_layers)
1209 0 : .await?;
1210 86 : }
1211 :
1212 189 : fail::fail_point!("timeline-delete-before-index-delete", |_| {
1213 8 : Err(anyhow::anyhow!(
1214 8 : "failpoint: timeline-delete-before-index-delete"
1215 8 : ))?
1216 189 : });
1217 :
1218 0 : debug!("enqueuing index part deletion");
1219 181 : self.deletion_queue_client
1220 181 : .push_immediate([latest_index].to_vec())
1221 0 : .await?;
1222 :
1223 : // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
1224 : // for a flush to a persistent deletion list so that we may be sure deletion will occur.
1225 182 : self.deletion_queue_client.flush_immediate().await?;
1226 :
1227 181 : fail::fail_point!("timeline-delete-after-index-delete", |_| {
1228 2 : Err(anyhow::anyhow!(
1229 2 : "failpoint: timeline-delete-after-index-delete"
1230 2 : ))?
1231 181 : });
1232 :
1233 179 : info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
1234 :
1235 179 : Ok(())
1236 189 : }
1237 :
1238 : ///
1239 : /// Pick next tasks from the queue, and start as many of them as possible without violating
1240 : /// the ordering constraints.
1241 : ///
1242 : /// The caller needs to already hold the `upload_queue` lock.
1243 54138 : fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
1244 87565 : while let Some(next_op) = upload_queue.queued_operations.front() {
1245 : // Can we run this task now?
1246 63805 : let can_run_now = match next_op {
1247 : UploadOp::UploadLayer(_, _) => {
1248 : // Can always be scheduled.
1249 21441 : true
1250 : }
1251 : UploadOp::UploadMetadata(_, _) => {
1252 : // These can only be performed after all the preceding operations
1253 : // have finished.
1254 33812 : upload_queue.inprogress_tasks.is_empty()
1255 : }
1256 : UploadOp::Delete(_) => {
1257 : // Wait for preceding uploads to finish. Concurrent deletions are OK, though.
1258 4632 : upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
1259 : }
1260 :
1261 : UploadOp::Barrier(_) | UploadOp::Shutdown => {
1262 3920 : upload_queue.inprogress_tasks.is_empty()
1263 : }
1264 : };
1265 :
1266 : // If we cannot launch this task, don't look any further.
1267 : //
1268 : // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
1269 : // them now, but we don't try to do that currently. For example, if the frontmost task
1270 : // is an index-file upload that cannot proceed until preceding uploads have finished, we
1271 : // could still start layer uploads that were scheduled later.
1272 63805 : if !can_run_now {
1273 30173 : break;
1274 33632 : }
1275 33632 :
1276 33632 : if let UploadOp::Shutdown = next_op {
1277 : // leave the op in the queue but do not start more tasks; it will be dropped when
1278 : // the stop is called.
1279 205 : upload_queue.shutdown_ready.close();
1280 205 : break;
1281 33427 : }
1282 33427 :
1283 33427 : // We can launch this task. Remove it from the queue first.
1284 33427 : let next_op = upload_queue.queued_operations.pop_front().unwrap();
1285 33427 :
1286 33427 : debug!("starting op: {}", next_op);
1287 :
1288 : // Update the counters
1289 33427 : match next_op {
1290 21441 : UploadOp::UploadLayer(_, _) => {
1291 21441 : upload_queue.num_inprogress_layer_uploads += 1;
1292 21441 : }
1293 6072 : UploadOp::UploadMetadata(_, _) => {
1294 6072 : upload_queue.num_inprogress_metadata_uploads += 1;
1295 6072 : }
1296 4317 : UploadOp::Delete(_) => {
1297 4317 : upload_queue.num_inprogress_deletions += 1;
1298 4317 : }
1299 1597 : UploadOp::Barrier(sender) => {
1300 1597 : sender.send_replace(());
1301 1597 : continue;
1302 : }
1303 0 : UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
1304 : };
1305 :
1306 : // Assign unique ID to this task
1307 31830 : upload_queue.task_counter += 1;
1308 31830 : let upload_task_id = upload_queue.task_counter;
1309 31830 :
1310 31830 : // Add it to the in-progress map
1311 31830 : let task = Arc::new(UploadTask {
1312 31830 : task_id: upload_task_id,
1313 31830 : op: next_op,
1314 31830 : retries: AtomicU32::new(0),
1315 31830 : });
1316 31830 : upload_queue
1317 31830 : .inprogress_tasks
1318 31830 : .insert(task.task_id, Arc::clone(&task));
1319 31830 :
1320 31830 : // Spawn task to perform the task
1321 31830 : let self_rc = Arc::clone(self);
1322 31830 : let tenant_shard_id = self.tenant_shard_id;
1323 31830 : let timeline_id = self.timeline_id;
1324 31830 : task_mgr::spawn(
1325 31830 : &self.runtime,
1326 31830 : TaskKind::RemoteUploadTask,
1327 31830 : Some(self.tenant_shard_id),
1328 31830 : Some(self.timeline_id),
1329 31830 : "remote upload",
1330 : false,
1331 31820 : async move {
1332 1153860 : self_rc.perform_upload_task(task).await;
1333 31809 : Ok(())
1334 31809 : }
1335 31830 : .instrument(info_span!(parent: None, "remote_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, %upload_task_id)),
1336 : );
1337 :
1338 : // Loop back to process next task
1339 : }
1340 54138 : }
1341 :
1342 : ///
1343 : /// Perform an upload task.
1344 : ///
1345 : /// The task is in the `inprogress_tasks` list. This function will try to
1346 : /// execute it, retrying forever. On successful completion, the task is
1347 : /// removed it from the `inprogress_tasks` list, and any next task(s) in the
1348 : /// queue that were waiting by the completion are launched.
1349 : ///
1350 : /// The task can be shut down, however. That leads to stopping the whole
1351 : /// queue.
1352 : ///
1353 31820 : async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
1354 : // Loop to retry until it completes.
1355 34346 : loop {
1356 34346 : // If we're requested to shut down, close up shop and exit.
1357 34346 : //
1358 34346 : // Note: We only check for the shutdown requests between retries, so
1359 34346 : // if a shutdown request arrives while we're busy uploading, in the
1360 34346 : // upload::upload:*() call below, we will wait not exit until it has
1361 34346 : // finished. We probably could cancel the upload by simply dropping
1362 34346 : // the Future, but we're not 100% sure if the remote storage library
1363 34346 : // is cancellation safe, so we don't dare to do that. Hopefully, the
1364 34346 : // upload finishes or times out soon enough.
1365 34346 : if task_mgr::is_shutdown_requested() {
1366 103 : info!("upload task cancelled by shutdown request");
1367 103 : match self.stop() {
1368 103 : Ok(()) => {}
1369 : Err(StopError::QueueUninitialized) => {
1370 0 : unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
1371 : }
1372 : }
1373 103 : return;
1374 34243 : }
1375 :
1376 34243 : let upload_result: anyhow::Result<()> = match &task.op {
1377 23207 : UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
1378 23207 : let path = layer.local_path();
1379 23207 : upload::upload_timeline_layer(
1380 23207 : self.conf,
1381 23207 : &self.storage_impl,
1382 23207 : path,
1383 23207 : layer_metadata,
1384 23207 : self.generation,
1385 23207 : &self.cancel,
1386 23207 : )
1387 23207 : .measure_remote_op(
1388 23207 : RemoteOpFileKind::Layer,
1389 23207 : RemoteOpKind::Upload,
1390 23207 : Arc::clone(&self.metrics),
1391 23207 : )
1392 1127005 : .await
1393 : }
1394 6723 : UploadOp::UploadMetadata(ref index_part, _lsn) => {
1395 6723 : let mention_having_future_layers = if cfg!(feature = "testing") {
1396 6723 : index_part
1397 6723 : .layer_metadata
1398 6723 : .keys()
1399 610515 : .any(|x| x.is_in_future(*_lsn))
1400 : } else {
1401 0 : false
1402 : };
1403 :
1404 6723 : let res = upload::upload_index_part(
1405 6723 : &self.storage_impl,
1406 6723 : &self.tenant_shard_id,
1407 6723 : &self.timeline_id,
1408 6723 : self.generation,
1409 6723 : index_part,
1410 6723 : &self.cancel,
1411 6723 : )
1412 6723 : .measure_remote_op(
1413 6723 : RemoteOpFileKind::Index,
1414 6723 : RemoteOpKind::Upload,
1415 6723 : Arc::clone(&self.metrics),
1416 6723 : )
1417 22495 : .await;
1418 6715 : if res.is_ok() {
1419 6061 : self.update_remote_physical_size_gauge(Some(index_part));
1420 6061 : if mention_having_future_layers {
1421 : // find rationale near crate::tenant::timeline::init::cleanup_future_layer
1422 14 : tracing::info!(disk_consistent_lsn=%_lsn, "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup");
1423 6047 : }
1424 654 : }
1425 6715 : res
1426 : }
1427 4313 : UploadOp::Delete(delete) => {
1428 4313 : pausable_failpoint!("before-delete-layer-pausable");
1429 4312 : self.deletion_queue_client
1430 4312 : .push_layers(
1431 4312 : self.tenant_shard_id,
1432 4312 : self.timeline_id,
1433 4312 : self.generation,
1434 4312 : delete.layers.clone(),
1435 4312 : )
1436 147 : .await
1437 4312 : .map_err(|e| anyhow::anyhow!(e))
1438 : }
1439 0 : unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
1440 : // unreachable. Barrier operations are handled synchronously in
1441 : // launch_queued_tasks
1442 0 : warn!("unexpected {unexpected:?} operation in perform_upload_task");
1443 0 : break;
1444 : }
1445 : };
1446 :
1447 34233 : match upload_result {
1448 : Ok(()) => {
1449 31706 : break;
1450 : }
1451 2527 : Err(e) => {
1452 2527 : let retries = task.retries.fetch_add(1, Ordering::SeqCst);
1453 2527 :
1454 2527 : // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
1455 2527 : // or other external reasons. Such issues are relatively regular, so log them
1456 2527 : // at info level at first, and only WARN if the operation fails repeatedly.
1457 2527 : //
1458 2527 : // (See similar logic for downloads in `download::download_retry`)
1459 2527 : if retries < FAILED_UPLOAD_WARN_THRESHOLD {
1460 2526 : info!(
1461 2526 : "failed to perform remote task {}, will retry (attempt {}): {:#}",
1462 2526 : task.op, retries, e
1463 2526 : );
1464 : } else {
1465 1 : warn!(
1466 1 : "failed to perform remote task {}, will retry (attempt {}): {:?}",
1467 1 : task.op, retries, e
1468 1 : );
1469 : }
1470 :
1471 : // sleep until it's time to retry, or we're cancelled
1472 2527 : exponential_backoff(
1473 2527 : retries,
1474 2527 : DEFAULT_BASE_BACKOFF_SECONDS,
1475 2527 : DEFAULT_MAX_BACKOFF_SECONDS,
1476 2527 : &shutdown_token(),
1477 2527 : )
1478 4 : .await;
1479 : }
1480 : }
1481 : }
1482 :
1483 31706 : let retries = task.retries.load(Ordering::SeqCst);
1484 31706 : if retries > 0 {
1485 2418 : info!(
1486 2418 : "remote task {} completed successfully after {} retries",
1487 2418 : task.op, retries
1488 2418 : );
1489 : } else {
1490 0 : debug!("remote task {} completed successfully", task.op);
1491 : }
1492 :
1493 : // The task has completed successfully. Remove it from the in-progress list.
1494 28820 : let lsn_update = {
1495 31706 : let mut upload_queue_guard = self.upload_queue.lock().unwrap();
1496 31706 : let upload_queue = match upload_queue_guard.deref_mut() {
1497 0 : UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
1498 2886 : UploadQueue::Stopped(_stopped) => {
1499 2886 : None
1500 : },
1501 28820 : UploadQueue::Initialized(qi) => { Some(qi) }
1502 : };
1503 :
1504 31706 : let upload_queue = match upload_queue {
1505 28820 : Some(upload_queue) => upload_queue,
1506 : None => {
1507 2886 : info!("another concurrent task already stopped the queue");
1508 2886 : return;
1509 : }
1510 : };
1511 :
1512 28820 : upload_queue.inprogress_tasks.remove(&task.task_id);
1513 :
1514 28820 : let lsn_update = match task.op {
1515 : UploadOp::UploadLayer(_, _) => {
1516 18449 : upload_queue.num_inprogress_layer_uploads -= 1;
1517 18449 : None
1518 : }
1519 6059 : UploadOp::UploadMetadata(_, lsn) => {
1520 6059 : upload_queue.num_inprogress_metadata_uploads -= 1;
1521 6059 : // XXX monotonicity check?
1522 6059 :
1523 6059 : upload_queue.projected_remote_consistent_lsn = Some(lsn);
1524 6059 : if self.generation.is_none() {
1525 : // Legacy mode: skip validating generation
1526 17 : upload_queue.visible_remote_consistent_lsn.store(lsn);
1527 17 : None
1528 : } else {
1529 6042 : Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
1530 : }
1531 : }
1532 : UploadOp::Delete(_) => {
1533 4312 : upload_queue.num_inprogress_deletions -= 1;
1534 4312 : None
1535 : }
1536 0 : UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
1537 : };
1538 :
1539 : // Launch any queued tasks that were unblocked by this one.
1540 28820 : self.launch_queued_tasks(upload_queue);
1541 28820 : lsn_update
1542 : };
1543 :
1544 28820 : if let Some((lsn, slot)) = lsn_update {
1545 : // Updates to the remote_consistent_lsn we advertise to pageservers
1546 : // are all routed through the DeletionQueue, to enforce important
1547 : // data safety guarantees (see docs/rfcs/025-generation-numbers.md)
1548 6042 : self.deletion_queue_client
1549 6042 : .update_remote_consistent_lsn(
1550 6042 : self.tenant_shard_id,
1551 6042 : self.timeline_id,
1552 6042 : self.generation,
1553 6042 : lsn,
1554 6042 : slot,
1555 6042 : )
1556 0 : .await;
1557 22778 : }
1558 :
1559 28820 : self.calls_unfinished_metric_end(&task.op);
1560 31809 : }
1561 :
1562 65071 : fn calls_unfinished_metric_impl(
1563 65071 : &self,
1564 65071 : op: &UploadOp,
1565 65071 : ) -> Option<(
1566 65071 : RemoteOpFileKind,
1567 65071 : RemoteOpKind,
1568 65071 : RemoteTimelineClientMetricsCallTrackSize,
1569 65071 : )> {
1570 : use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
1571 65071 : let res = match op {
1572 40988 : UploadOp::UploadLayer(_, m) => (
1573 40988 : RemoteOpFileKind::Layer,
1574 40988 : RemoteOpKind::Upload,
1575 40988 : RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
1576 40988 : ),
1577 12306 : UploadOp::UploadMetadata(_, _) => (
1578 12306 : RemoteOpFileKind::Index,
1579 12306 : RemoteOpKind::Upload,
1580 12306 : DontTrackSize {
1581 12306 : reason: "metadata uploads are tiny",
1582 12306 : },
1583 12306 : ),
1584 11572 : UploadOp::Delete(_delete) => (
1585 11572 : RemoteOpFileKind::Layer,
1586 11572 : RemoteOpKind::Delete,
1587 11572 : DontTrackSize {
1588 11572 : reason: "should we track deletes? positive or negative sign?",
1589 11572 : },
1590 11572 : ),
1591 : UploadOp::Barrier(..) | UploadOp::Shutdown => {
1592 : // we do not account these
1593 205 : return None;
1594 : }
1595 : };
1596 64866 : Some(res)
1597 65071 : }
1598 :
1599 33962 : fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
1600 33962 : let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
1601 33962 : Some(x) => x,
1602 0 : None => return,
1603 : };
1604 33962 : let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
1605 33962 : guard.will_decrement_manually(); // in unfinished_ops_metric_end()
1606 33962 : }
1607 :
1608 31109 : fn calls_unfinished_metric_end(&self, op: &UploadOp) {
1609 31109 : let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
1610 30904 : Some(x) => x,
1611 205 : None => return,
1612 : };
1613 30904 : self.metrics.call_end(&file_kind, &op_kind, track_bytes);
1614 31109 : }
1615 :
1616 : /// Close the upload queue for new operations and cancel queued operations.
1617 : ///
1618 : /// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
1619 : ///
1620 : /// In-progress operations will still be running after this function returns.
1621 : /// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
1622 : /// to wait for them to complete, after calling this function.
1623 1094 : pub(crate) fn stop(&self) -> Result<(), StopError> {
1624 1094 : // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
1625 1094 : // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
1626 1094 : // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
1627 1094 : let mut guard = self.upload_queue.lock().unwrap();
1628 1094 : match &mut *guard {
1629 0 : UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
1630 : UploadQueue::Stopped(_) => {
1631 : // nothing to do
1632 507 : info!("another concurrent task already shut down the queue");
1633 507 : Ok(())
1634 : }
1635 587 : UploadQueue::Initialized(initialized) => {
1636 587 : info!("shutting down upload queue");
1637 :
1638 : // Replace the queue with the Stopped state, taking ownership of the old
1639 : // Initialized queue. We will do some checks on it, and then drop it.
1640 587 : let qi = {
1641 : // Here we preserve working version of the upload queue for possible use during deletions.
1642 : // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
1643 : // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
1644 : // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
1645 587 : let upload_queue_for_deletion = UploadQueueInitialized {
1646 587 : task_counter: 0,
1647 587 : latest_files: initialized.latest_files.clone(),
1648 587 : latest_files_changes_since_metadata_upload_scheduled: 0,
1649 587 : latest_metadata: initialized.latest_metadata.clone(),
1650 587 : projected_remote_consistent_lsn: None,
1651 587 : visible_remote_consistent_lsn: initialized
1652 587 : .visible_remote_consistent_lsn
1653 587 : .clone(),
1654 587 : num_inprogress_layer_uploads: 0,
1655 587 : num_inprogress_metadata_uploads: 0,
1656 587 : num_inprogress_deletions: 0,
1657 587 : inprogress_tasks: HashMap::default(),
1658 587 : queued_operations: VecDeque::default(),
1659 587 : #[cfg(feature = "testing")]
1660 587 : dangling_files: HashMap::default(),
1661 587 : shutting_down: false,
1662 587 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
1663 587 : };
1664 587 :
1665 587 : let upload_queue = std::mem::replace(
1666 587 : &mut *guard,
1667 587 : UploadQueue::Stopped(UploadQueueStopped {
1668 587 : upload_queue_for_deletion,
1669 587 : deleted_at: SetDeletedFlagProgress::NotRunning,
1670 587 : }),
1671 587 : );
1672 587 : if let UploadQueue::Initialized(qi) = upload_queue {
1673 587 : qi
1674 : } else {
1675 0 : unreachable!("we checked in the match above that it is Initialized");
1676 : }
1677 : };
1678 :
1679 : // consistency check
1680 587 : assert_eq!(
1681 587 : qi.num_inprogress_layer_uploads
1682 587 : + qi.num_inprogress_metadata_uploads
1683 587 : + qi.num_inprogress_deletions,
1684 587 : qi.inprogress_tasks.len()
1685 587 : );
1686 :
1687 : // We don't need to do anything here for in-progress tasks. They will finish
1688 : // on their own, decrement the unfinished-task counter themselves, and observe
1689 : // that the queue is Stopped.
1690 587 : drop(qi.inprogress_tasks);
1691 :
1692 : // Tear down queued ops
1693 2289 : for op in qi.queued_operations.into_iter() {
1694 2289 : self.calls_unfinished_metric_end(&op);
1695 2289 : // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
1696 2289 : // which is exactly what we want to happen.
1697 2289 : drop(op);
1698 2289 : }
1699 :
1700 : // We're done.
1701 587 : drop(guard);
1702 587 : Ok(())
1703 : }
1704 : }
1705 1094 : }
1706 :
1707 8 : pub(crate) fn get_layers_metadata(
1708 8 : &self,
1709 8 : layers: Vec<LayerFileName>,
1710 8 : ) -> anyhow::Result<Vec<Option<LayerFileMetadata>>> {
1711 8 : let q = self.upload_queue.lock().unwrap();
1712 8 : let q = match &*q {
1713 : UploadQueue::Stopped(_) | UploadQueue::Uninitialized => {
1714 0 : anyhow::bail!("queue is in state {}", q.as_str())
1715 : }
1716 8 : UploadQueue::Initialized(inner) => inner,
1717 8 : };
1718 8 :
1719 2493 : let decorated = layers.into_iter().map(|l| q.latest_files.get(&l).cloned());
1720 8 :
1721 8 : Ok(decorated.collect())
1722 8 : }
1723 : }
1724 :
1725 4790 : pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
1726 4790 : let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
1727 4790 : RemotePath::from_string(&path).expect("Failed to construct path")
1728 4790 : }
1729 :
1730 1 : fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath {
1731 1 : let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
1732 1 : RemotePath::from_string(&path).expect("Failed to construct path")
1733 1 : }
1734 :
1735 3944 : pub fn remote_timeline_path(
1736 3944 : tenant_shard_id: &TenantShardId,
1737 3944 : timeline_id: &TimelineId,
1738 3944 : ) -> RemotePath {
1739 3944 : remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
1740 3944 : }
1741 :
1742 : /// Note that the shard component of a remote layer path is _not_ always the same
1743 : /// as in the TenantShardId of the caller: tenants may reference layers from a different
1744 : /// ShardIndex. Use the ShardIndex from the layer's metadata.
1745 19331 : pub fn remote_layer_path(
1746 19331 : tenant_id: &TenantId,
1747 19331 : timeline_id: &TimelineId,
1748 19331 : shard: ShardIndex,
1749 19331 : layer_file_name: &LayerFileName,
1750 19331 : generation: Generation,
1751 19331 : ) -> RemotePath {
1752 19331 : // Generation-aware key format
1753 19331 : let path = format!(
1754 19331 : "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
1755 19331 : shard.get_suffix(),
1756 19331 : layer_file_name.file_name(),
1757 19331 : generation.get_suffix()
1758 19331 : );
1759 19331 :
1760 19331 : RemotePath::from_string(&path).expect("Failed to construct path")
1761 19331 : }
1762 :
1763 829 : pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
1764 829 : RemotePath::from_string(&format!(
1765 829 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
1766 829 : ))
1767 829 : .expect("Failed to construct path")
1768 829 : }
1769 :
1770 6 : pub fn remote_initdb_preserved_archive_path(
1771 6 : tenant_id: &TenantId,
1772 6 : timeline_id: &TimelineId,
1773 6 : ) -> RemotePath {
1774 6 : RemotePath::from_string(&format!(
1775 6 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PRESERVED_PATH}"
1776 6 : ))
1777 6 : .expect("Failed to construct path")
1778 6 : }
1779 :
1780 8396 : pub fn remote_index_path(
1781 8396 : tenant_shard_id: &TenantShardId,
1782 8396 : timeline_id: &TimelineId,
1783 8396 : generation: Generation,
1784 8396 : ) -> RemotePath {
1785 8396 : RemotePath::from_string(&format!(
1786 8396 : "tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
1787 8396 : IndexPart::FILE_NAME,
1788 8396 : generation.get_suffix()
1789 8396 : ))
1790 8396 : .expect("Failed to construct path")
1791 8396 : }
1792 :
1793 18 : pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
1794 18 : RemotePath::from_string(&format!(
1795 18 : "tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
1796 18 : ))
1797 18 : .expect("Failed to construct path")
1798 18 : }
1799 :
1800 : /// Given the key of an index, parse out the generation part of the name
1801 678 : pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
1802 678 : let file_name = match path.get_path().file_name() {
1803 678 : Some(f) => f,
1804 : None => {
1805 : // Unexpected: we should be seeing index_part.json paths only
1806 0 : tracing::warn!("Malformed index key {}", path);
1807 0 : return None;
1808 : }
1809 : };
1810 :
1811 678 : match file_name.split_once('-') {
1812 671 : Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
1813 7 : None => None,
1814 : }
1815 678 : }
1816 :
1817 : /// Files on the remote storage are stored with paths, relative to the workdir.
1818 : /// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
1819 : ///
1820 : /// Errors if the path provided does not start from pageserver's workdir.
1821 23204 : pub fn remote_path(
1822 23204 : conf: &PageServerConf,
1823 23204 : local_path: &Utf8Path,
1824 23204 : generation: Generation,
1825 23204 : ) -> anyhow::Result<RemotePath> {
1826 23204 : let stripped = local_path
1827 23204 : .strip_prefix(&conf.workdir)
1828 23204 : .context("Failed to strip workdir prefix")?;
1829 :
1830 23204 : let suffixed = format!("{0}{1}", stripped, generation.get_suffix());
1831 23204 :
1832 23204 : RemotePath::new(Utf8Path::new(&suffixed)).with_context(|| {
1833 0 : format!(
1834 0 : "to resolve remote part of path {:?} for base {:?}",
1835 0 : local_path, conf.workdir
1836 0 : )
1837 23204 : })
1838 23204 : }
1839 :
1840 : #[cfg(test)]
1841 : mod tests {
1842 : use super::*;
1843 : use crate::{
1844 : context::RequestContext,
1845 : tenant::{
1846 : harness::{TenantHarness, TIMELINE_ID},
1847 : storage_layer::Layer,
1848 : Generation, Tenant, Timeline,
1849 : },
1850 : DEFAULT_PG_VERSION,
1851 : };
1852 :
1853 : use std::collections::HashSet;
1854 : use utils::lsn::Lsn;
1855 :
1856 8 : pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
1857 8 : format!("contents for {name}").into()
1858 8 : }
1859 :
1860 2 : pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
1861 2 : let metadata = TimelineMetadata::new(
1862 2 : disk_consistent_lsn,
1863 2 : None,
1864 2 : None,
1865 2 : Lsn(0),
1866 2 : Lsn(0),
1867 2 : Lsn(0),
1868 2 : // Any version will do
1869 2 : // but it should be consistent with the one in the tests
1870 2 : crate::DEFAULT_PG_VERSION,
1871 2 : );
1872 2 :
1873 2 : // go through serialize + deserialize to fix the header, including checksum
1874 2 : TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
1875 2 : }
1876 :
1877 2 : fn assert_file_list(a: &HashSet<LayerFileName>, b: &[&str]) {
1878 6 : let mut avec: Vec<String> = a.iter().map(|x| x.file_name()).collect();
1879 2 : avec.sort();
1880 2 :
1881 2 : let mut bvec = b.to_vec();
1882 2 : bvec.sort_unstable();
1883 2 :
1884 2 : assert_eq!(avec, bvec);
1885 2 : }
1886 :
1887 4 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
1888 4 : let mut expected: Vec<String> = expected
1889 4 : .iter()
1890 16 : .map(|x| format!("{}{}", x, generation.get_suffix()))
1891 4 : .collect();
1892 4 : expected.sort();
1893 4 :
1894 4 : let mut found: Vec<String> = Vec::new();
1895 16 : for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
1896 16 : let entry_name = entry.file_name();
1897 16 : let fname = entry_name.to_str().unwrap();
1898 16 : found.push(String::from(fname));
1899 16 : }
1900 4 : found.sort();
1901 4 :
1902 4 : assert_eq!(found, expected);
1903 4 : }
1904 :
1905 : struct TestSetup {
1906 : harness: TenantHarness,
1907 : tenant: Arc<Tenant>,
1908 : timeline: Arc<Timeline>,
1909 : tenant_ctx: RequestContext,
1910 : }
1911 :
1912 : impl TestSetup {
1913 8 : async fn new(test_name: &str) -> anyhow::Result<Self> {
1914 8 : let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
1915 8 : let harness = TenantHarness::create(test_name)?;
1916 8 : let (tenant, ctx) = harness.load().await;
1917 :
1918 8 : let timeline = tenant
1919 8 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1920 28 : .await?;
1921 :
1922 8 : Ok(Self {
1923 8 : harness,
1924 8 : tenant,
1925 8 : timeline,
1926 8 : tenant_ctx: ctx,
1927 8 : })
1928 8 : }
1929 :
1930 : /// Construct a RemoteTimelineClient in an arbitrary generation
1931 10 : fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
1932 10 : Arc::new(RemoteTimelineClient {
1933 10 : conf: self.harness.conf,
1934 10 : runtime: tokio::runtime::Handle::current(),
1935 10 : tenant_shard_id: self.harness.tenant_shard_id,
1936 10 : timeline_id: TIMELINE_ID,
1937 10 : generation,
1938 10 : storage_impl: self.harness.remote_storage.clone(),
1939 10 : deletion_queue_client: self.harness.deletion_queue.new_client(),
1940 10 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
1941 10 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
1942 10 : &self.harness.tenant_shard_id,
1943 10 : &TIMELINE_ID,
1944 10 : )),
1945 10 : cancel: CancellationToken::new(),
1946 10 : })
1947 10 : }
1948 :
1949 : /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
1950 : /// and timeline_id are present.
1951 6 : fn span(&self) -> tracing::Span {
1952 6 : tracing::info_span!(
1953 : "test",
1954 : tenant_id = %self.harness.tenant_shard_id.tenant_id,
1955 6 : shard_id = %self.harness.tenant_shard_id.shard_slug(),
1956 : timeline_id = %TIMELINE_ID
1957 : )
1958 6 : }
1959 : }
1960 :
1961 : // Test scheduling
1962 2 : #[tokio::test]
1963 2 : async fn upload_scheduling() {
1964 : // Test outline:
1965 : //
1966 : // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
1967 : // Schedule upload of index. Check that it is queued
1968 : // let the layer file uploads finish. Check that the index-upload is now started
1969 : // let the index-upload finish.
1970 : //
1971 : // Download back the index.json. Check that the list of files is correct
1972 : //
1973 : // Schedule upload. Schedule deletion. Check that the deletion is queued
1974 : // let upload finish. Check that deletion is now started
1975 : // Schedule another deletion. Check that it's launched immediately.
1976 : // Schedule index upload. Check that it's queued
1977 :
1978 10 : let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
1979 2 : let span = test_setup.span();
1980 2 : let _guard = span.enter();
1981 2 :
1982 2 : let TestSetup {
1983 2 : harness,
1984 2 : tenant: _tenant,
1985 2 : timeline,
1986 2 : tenant_ctx: _tenant_ctx,
1987 2 : } = test_setup;
1988 2 :
1989 2 : let client = timeline.remote_client.as_ref().unwrap();
1990 :
1991 : // Download back the index.json, and check that the list of files is correct
1992 2 : let initial_index_part = match client
1993 2 : .download_index_file(CancellationToken::new())
1994 6 : .await
1995 2 : .unwrap()
1996 : {
1997 2 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
1998 0 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
1999 : };
2000 2 : let initial_layers = initial_index_part
2001 2 : .layer_metadata
2002 2 : .keys()
2003 2 : .map(|f| f.to_owned())
2004 2 : .collect::<HashSet<LayerFileName>>();
2005 2 : let initial_layer = {
2006 2 : assert!(initial_layers.len() == 1);
2007 2 : initial_layers.into_iter().next().unwrap()
2008 2 : };
2009 2 :
2010 2 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
2011 2 :
2012 2 : println!("workdir: {}", harness.conf.workdir);
2013 2 :
2014 2 : let remote_timeline_dir = harness
2015 2 : .remote_fs_dir
2016 2 : .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
2017 2 : println!("remote_timeline_dir: {remote_timeline_dir}");
2018 2 :
2019 2 : let generation = harness.generation;
2020 2 : let shard = harness.shard;
2021 2 :
2022 2 : // Create a couple of dummy files, schedule upload for them
2023 2 :
2024 2 : let layers = [
2025 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
2026 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
2027 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
2028 2 : ]
2029 2 : .into_iter()
2030 6 : .map(|(name, contents): (LayerFileName, Vec<u8>)| {
2031 6 : std::fs::write(timeline_path.join(name.file_name()), &contents).unwrap();
2032 6 :
2033 6 : Layer::for_resident(
2034 6 : harness.conf,
2035 6 : &timeline,
2036 6 : name,
2037 6 : LayerFileMetadata::new(contents.len() as u64, generation, shard),
2038 6 : )
2039 6 : }).collect::<Vec<_>>();
2040 2 :
2041 2 : client
2042 2 : .schedule_layer_file_upload(layers[0].clone())
2043 2 : .unwrap();
2044 2 : client
2045 2 : .schedule_layer_file_upload(layers[1].clone())
2046 2 : .unwrap();
2047 2 :
2048 2 : // Check that they are started immediately, not queued
2049 2 : //
2050 2 : // this works because we running within block_on, so any futures are now queued up until
2051 2 : // our next await point.
2052 2 : {
2053 2 : let mut guard = client.upload_queue.lock().unwrap();
2054 2 : let upload_queue = guard.initialized_mut().unwrap();
2055 2 : assert!(upload_queue.queued_operations.is_empty());
2056 2 : assert!(upload_queue.inprogress_tasks.len() == 2);
2057 2 : assert!(upload_queue.num_inprogress_layer_uploads == 2);
2058 :
2059 : // also check that `latest_file_changes` was updated
2060 2 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
2061 : }
2062 :
2063 : // Schedule upload of index. Check that it is queued
2064 2 : let metadata = dummy_metadata(Lsn(0x20));
2065 2 : client
2066 2 : .schedule_index_upload_for_metadata_update(&metadata)
2067 2 : .unwrap();
2068 2 : {
2069 2 : let mut guard = client.upload_queue.lock().unwrap();
2070 2 : let upload_queue = guard.initialized_mut().unwrap();
2071 2 : assert!(upload_queue.queued_operations.len() == 1);
2072 2 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
2073 : }
2074 :
2075 : // Wait for the uploads to finish
2076 2 : client.wait_completion().await.unwrap();
2077 2 : {
2078 2 : let mut guard = client.upload_queue.lock().unwrap();
2079 2 : let upload_queue = guard.initialized_mut().unwrap();
2080 :
2081 2 : assert!(upload_queue.queued_operations.is_empty());
2082 2 : assert!(upload_queue.inprogress_tasks.is_empty());
2083 : }
2084 :
2085 : // Download back the index.json, and check that the list of files is correct
2086 2 : let index_part = match client
2087 2 : .download_index_file(CancellationToken::new())
2088 6 : .await
2089 2 : .unwrap()
2090 : {
2091 2 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2092 0 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2093 : };
2094 :
2095 2 : assert_file_list(
2096 2 : &index_part
2097 2 : .layer_metadata
2098 2 : .keys()
2099 6 : .map(|f| f.to_owned())
2100 2 : .collect(),
2101 2 : &[
2102 2 : &initial_layer.file_name(),
2103 2 : &layers[0].layer_desc().filename().file_name(),
2104 2 : &layers[1].layer_desc().filename().file_name(),
2105 2 : ],
2106 2 : );
2107 2 : assert_eq!(index_part.metadata, metadata);
2108 :
2109 : // Schedule upload and then a deletion. Check that the deletion is queued
2110 2 : client
2111 2 : .schedule_layer_file_upload(layers[2].clone())
2112 2 : .unwrap();
2113 2 :
2114 2 : // this is no longer consistent with how deletion works with Layer::drop, but in this test
2115 2 : // keep using schedule_layer_file_deletion because we don't have a way to wait for the
2116 2 : // spawn_blocking started by the drop.
2117 2 : client
2118 2 : .schedule_layer_file_deletion(&[layers[0].layer_desc().filename()])
2119 2 : .unwrap();
2120 2 : {
2121 2 : let mut guard = client.upload_queue.lock().unwrap();
2122 2 : let upload_queue = guard.initialized_mut().unwrap();
2123 2 :
2124 2 : // Deletion schedules upload of the index file, and the file deletion itself
2125 2 : assert_eq!(upload_queue.queued_operations.len(), 2);
2126 2 : assert_eq!(upload_queue.inprogress_tasks.len(), 1);
2127 2 : assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
2128 2 : assert_eq!(upload_queue.num_inprogress_deletions, 0);
2129 2 : assert_eq!(
2130 2 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
2131 2 : 0
2132 2 : );
2133 : }
2134 2 : assert_remote_files(
2135 2 : &[
2136 2 : &initial_layer.file_name(),
2137 2 : &layers[0].layer_desc().filename().file_name(),
2138 2 : &layers[1].layer_desc().filename().file_name(),
2139 2 : "index_part.json",
2140 2 : ],
2141 2 : &remote_timeline_dir,
2142 2 : generation,
2143 2 : );
2144 2 :
2145 2 : // Finish them
2146 2 : client.wait_completion().await.unwrap();
2147 2 : harness.deletion_queue.pump().await;
2148 :
2149 2 : assert_remote_files(
2150 2 : &[
2151 2 : &initial_layer.file_name(),
2152 2 : &layers[1].layer_desc().filename().file_name(),
2153 2 : &layers[2].layer_desc().filename().file_name(),
2154 2 : "index_part.json",
2155 2 : ],
2156 2 : &remote_timeline_dir,
2157 2 : generation,
2158 2 : );
2159 : }
2160 :
2161 2 : #[tokio::test]
2162 2 : async fn bytes_unfinished_gauge_for_layer_file_uploads() {
2163 : // Setup
2164 :
2165 : let TestSetup {
2166 2 : harness,
2167 2 : tenant: _tenant,
2168 2 : timeline,
2169 : ..
2170 9 : } = TestSetup::new("metrics").await.unwrap();
2171 2 : let client = timeline.remote_client.as_ref().unwrap();
2172 2 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
2173 2 :
2174 2 : let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
2175 2 : let content_1 = dummy_contents("foo");
2176 2 : std::fs::write(
2177 2 : timeline_path.join(layer_file_name_1.file_name()),
2178 2 : &content_1,
2179 2 : )
2180 2 : .unwrap();
2181 2 :
2182 2 : let layer_file_1 = Layer::for_resident(
2183 2 : harness.conf,
2184 2 : &timeline,
2185 2 : layer_file_name_1.clone(),
2186 2 : LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
2187 2 : );
2188 2 :
2189 4 : #[derive(Debug, PartialEq, Clone, Copy)]
2190 2 : struct BytesStartedFinished {
2191 2 : started: Option<usize>,
2192 2 : finished: Option<usize>,
2193 2 : }
2194 2 : impl std::ops::Add for BytesStartedFinished {
2195 2 : type Output = Self;
2196 4 : fn add(self, rhs: Self) -> Self::Output {
2197 4 : Self {
2198 4 : started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
2199 4 : finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
2200 4 : }
2201 4 : }
2202 2 : }
2203 6 : let get_bytes_started_stopped = || {
2204 6 : let started = client
2205 6 : .metrics
2206 6 : .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2207 6 : .map(|v| v.try_into().unwrap());
2208 6 : let stopped = client
2209 6 : .metrics
2210 6 : .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2211 6 : .map(|v| v.try_into().unwrap());
2212 6 : BytesStartedFinished {
2213 6 : started,
2214 6 : finished: stopped,
2215 6 : }
2216 6 : };
2217 :
2218 : // Test
2219 2 : tracing::info!("now doing actual test");
2220 :
2221 2 : let actual_a = get_bytes_started_stopped();
2222 2 :
2223 2 : client
2224 2 : .schedule_layer_file_upload(layer_file_1.clone())
2225 2 : .unwrap();
2226 2 :
2227 2 : let actual_b = get_bytes_started_stopped();
2228 2 :
2229 2 : client.wait_completion().await.unwrap();
2230 2 :
2231 2 : let actual_c = get_bytes_started_stopped();
2232 2 :
2233 2 : // Validate
2234 2 :
2235 2 : let expected_b = actual_a
2236 2 : + BytesStartedFinished {
2237 2 : started: Some(content_1.len()),
2238 2 : // assert that the _finished metric is created eagerly so that subtractions work on first sample
2239 2 : finished: Some(0),
2240 2 : };
2241 2 : assert_eq!(actual_b, expected_b);
2242 :
2243 2 : let expected_c = actual_a
2244 2 : + BytesStartedFinished {
2245 2 : started: Some(content_1.len()),
2246 2 : finished: Some(content_1.len()),
2247 2 : };
2248 2 : assert_eq!(actual_c, expected_c);
2249 : }
2250 :
2251 12 : async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
2252 12 : // An empty IndexPart, just sufficient to ensure deserialization will succeed
2253 12 : let example_metadata = TimelineMetadata::example();
2254 12 : let example_index_part = IndexPart::new(
2255 12 : HashMap::new(),
2256 12 : example_metadata.disk_consistent_lsn(),
2257 12 : example_metadata,
2258 12 : );
2259 12 :
2260 12 : let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
2261 12 :
2262 12 : let index_path = test_state.harness.remote_fs_dir.join(
2263 12 : remote_index_path(
2264 12 : &test_state.harness.tenant_shard_id,
2265 12 : &TIMELINE_ID,
2266 12 : generation,
2267 12 : )
2268 12 : .get_path(),
2269 12 : );
2270 12 :
2271 12 : std::fs::create_dir_all(index_path.parent().unwrap())
2272 12 : .expect("creating test dir should work");
2273 12 :
2274 12 : eprintln!("Writing {index_path}");
2275 12 : std::fs::write(&index_path, index_part_bytes).unwrap();
2276 12 : example_index_part
2277 12 : }
2278 :
2279 : /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
2280 : /// index, the IndexPart returned is equal to `expected`
2281 10 : async fn assert_got_index_part(
2282 10 : test_state: &TestSetup,
2283 10 : get_generation: Generation,
2284 10 : expected: &IndexPart,
2285 10 : ) {
2286 10 : let client = test_state.build_client(get_generation);
2287 :
2288 10 : let download_r = client
2289 10 : .download_index_file(CancellationToken::new())
2290 42 : .await
2291 10 : .expect("download should always succeed");
2292 10 : assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
2293 10 : match download_r {
2294 10 : MaybeDeletedIndexPart::IndexPart(index_part) => {
2295 10 : assert_eq!(&index_part, expected);
2296 : }
2297 0 : MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
2298 : }
2299 10 : }
2300 :
2301 2 : #[tokio::test]
2302 2 : async fn index_part_download_simple() -> anyhow::Result<()> {
2303 8 : let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
2304 2 : let span = test_state.span();
2305 2 : let _guard = span.enter();
2306 2 :
2307 2 : // Simple case: we are in generation N, load the index from generation N - 1
2308 2 : let generation_n = 5;
2309 2 : let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
2310 :
2311 6 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
2312 :
2313 2 : Ok(())
2314 : }
2315 :
2316 2 : #[tokio::test]
2317 2 : async fn index_part_download_ordering() -> anyhow::Result<()> {
2318 2 : let test_state = TestSetup::new("index_part_download_ordering")
2319 9 : .await
2320 2 : .unwrap();
2321 2 :
2322 2 : let span = test_state.span();
2323 2 : let _guard = span.enter();
2324 2 :
2325 2 : // A generation-less IndexPart exists in the bucket, we should find it
2326 2 : let generation_n = 5;
2327 2 : let injected_none = inject_index_part(&test_state, Generation::none()).await;
2328 10 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
2329 :
2330 : // If a more recent-than-none generation exists, we should prefer to load that
2331 2 : let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
2332 10 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
2333 :
2334 : // If a more-recent-than-me generation exists, we should ignore it.
2335 2 : let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
2336 10 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
2337 :
2338 : // If a directly previous generation exists, _and_ an index exists in my own
2339 : // generation, I should prefer my own generation.
2340 2 : let _injected_prev =
2341 2 : inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
2342 2 : let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
2343 2 : assert_got_index_part(
2344 2 : &test_state,
2345 2 : Generation::new(generation_n),
2346 2 : &injected_current,
2347 2 : )
2348 6 : .await;
2349 :
2350 2 : Ok(())
2351 : }
2352 : }
|